pull/498/head
hadashiA 2023-09-09 14:27:06 +09:00
parent 6db872236e
commit 730d68132d
1 changed files with 11 additions and 25 deletions

View File

@ -66,7 +66,7 @@ namespace Cysharp.Threading.Tasks.Linq
readonly int length; readonly int length;
readonly IUniTaskAsyncEnumerator<T>[] enumerators; readonly IUniTaskAsyncEnumerator<T>[] enumerators;
readonly MergeSourceState[] states; readonly MergeSourceState[] states;
readonly Queue<(T, Exception)> resultQueue = new Queue<(T, Exception)>(); readonly Queue<(T, Exception)> queuedResult = new Queue<(T, Exception)>();
readonly CancellationToken cancellationToken; readonly CancellationToken cancellationToken;
public T Current { get; private set; } public T Current { get; private set; }
@ -89,10 +89,14 @@ namespace Cysharp.Threading.Tasks.Linq
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
completionSource.Reset(); completionSource.Reset();
lock (states) lock (queuedResult)
{ {
if (TryDequeue(out var queuedValue, out var queuedException)) if (queuedResult.Count > 0)
{ {
var result = queuedResult.Dequeue();
var queuedValue = result.Item1;
var queuedException = result.Item2;
if (queuedException != null) if (queuedException != null)
{ {
completionSource.TrySetException(queuedException); completionSource.TrySetException(queuedException);
@ -162,10 +166,9 @@ namespace Cysharp.Threading.Tasks.Linq
{ {
if (!completionSource.TrySetException(ex)) if (!completionSource.TrySetException(ex))
{ {
// lock (queuedResult)
lock (states)
{ {
resultQueue.Enqueue((default, ex)); queuedResult.Enqueue((default, ex));
} }
} }
return; return;
@ -174,11 +177,11 @@ namespace Cysharp.Threading.Tasks.Linq
var completedAll = IsCompletedAll(); var completedAll = IsCompletedAll();
if (hasNext || completedAll) if (hasNext || completedAll)
{ {
lock (states) lock (queuedResult)
{ {
if (completionSource.GetStatus(completionSource.Version).IsCompleted()) if (completionSource.GetStatus(completionSource.Version).IsCompleted())
{ {
resultQueue.Enqueue((enumerators[index].Current, null)); queuedResult.Enqueue((enumerators[index].Current, null));
} }
else else
{ {
@ -189,23 +192,6 @@ namespace Cysharp.Threading.Tasks.Linq
} }
} }
bool TryDequeue(out T value, out Exception ex)
{
lock (states)
{
if (resultQueue.Count > 0)
{
var result = resultQueue.Dequeue();
value = result.Item1;
ex = result.Item2;
return true;
}
}
value = default;
ex = default;
return false;
}
bool IsCompletedAll() bool IsCompletedAll()
{ {
lock (states) lock (states)