Add TaskTracker to AsyncLINQ

pull/61/head
neuecc 2020-05-19 04:13:46 +09:00
parent 997b0b3710
commit f99910d802
20 changed files with 86 additions and 6 deletions

View File

@ -65,6 +65,8 @@ namespace Cysharp.Threading.Tasks.Linq
this.element = element;
this.state = append ? State.RequireAppend : State.RequirePrepend;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@ -136,6 +138,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -76,6 +76,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
this.source = source;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 4);
}
// abstract
@ -178,6 +179,7 @@ namespace Cysharp.Threading.Tasks.Linq
// if require additional resource to dispose, override and call base.DisposeAsync.
public virtual UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();
@ -204,6 +206,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
this.source = source;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 4);
}
// abstract
@ -399,6 +402,7 @@ namespace Cysharp.Threading.Tasks.Linq
// if require additional resource to dispose, override and call base.DisposeAsync.
public virtual UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -61,6 +61,8 @@ namespace Cysharp.Threading.Tasks.Linq
this.source = source;
this.count = count;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public IList<TSource> Current { get; private set; }
@ -167,6 +169,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();
@ -217,6 +220,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.count = count;
this.skip = skip;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public IList<TSource> Current { get; private set; }
@ -329,6 +333,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -57,6 +57,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.second = second;
this.cancellationToken = cancellationToken;
this.iteratingState = IteratingState.IteratingFirst;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@ -150,6 +151,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -63,6 +63,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.cancellationToken = cancellationToken;
this.iteratingState = IteratingState.Empty;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@ -128,6 +129,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -124,6 +124,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.onError = onError;
this.onCompleted = onCompleted;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@ -244,6 +245,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -255,6 +255,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.elementSelector = elementSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public IGrouping<TKey, TElement> Current { get; private set; }
@ -313,6 +314,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();
@ -364,6 +366,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -423,6 +426,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();
@ -470,6 +474,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.elementSelector = elementSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public IGrouping<TKey, TElement> Current { get; private set; }
@ -528,6 +533,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();
@ -582,6 +588,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -661,6 +668,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();
@ -708,6 +716,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.elementSelector = elementSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public IGrouping<TKey, TElement> Current { get; private set; }
@ -766,6 +775,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();
@ -820,6 +830,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -899,6 +910,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();

View File

@ -129,6 +129,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -208,6 +209,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();
@ -273,6 +275,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -401,6 +404,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();
@ -466,6 +470,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -594,6 +599,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -131,6 +131,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -248,6 +249,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (valueEnumerator != null)
{
valueEnumerator.Dispose();
@ -321,6 +323,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -476,6 +479,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (valueEnumerator != null)
{
valueEnumerator.Dispose();
@ -549,6 +553,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -704,6 +709,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (valueEnumerator != null)
{
valueEnumerator.Dispose();

View File

@ -422,6 +422,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
this.parent = parent;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TElement Current { get; private set; }
@ -477,6 +478,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
return default;
}
}

View File

@ -33,6 +33,7 @@ namespace Cysharp.Threading.Tasks.Linq
Channel<TSource> channel;
IUniTaskAsyncEnumerator<TSource> channelEnumerator;
IUniTaskAsyncEnumerator<TSource> sourceEnumerator;
bool channelClosed;
public _Queue(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
{
@ -53,13 +54,13 @@ namespace Cysharp.Threading.Tasks.Linq
channelEnumerator = channel.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
ConsumeAll(sourceEnumerator, channel).Forget();
ConsumeAll(this, sourceEnumerator, channel).Forget();
}
return channelEnumerator.MoveNextAsync();
}
static async UniTaskVoid ConsumeAll(IUniTaskAsyncEnumerator<TSource> enumerator, ChannelWriter<TSource> writer)
static async UniTaskVoid ConsumeAll(_Queue self, IUniTaskAsyncEnumerator<TSource> enumerator, ChannelWriter<TSource> writer)
{
try
{
@ -75,6 +76,7 @@ namespace Cysharp.Threading.Tasks.Linq
}
finally
{
self.channelClosed = true;
await enumerator.DisposeAsync();
}
}
@ -89,6 +91,12 @@ namespace Cysharp.Threading.Tasks.Linq
{
await channelEnumerator.DisposeAsync();
}
if (!channelClosed)
{
channelClosed = true;
channel.Writer.TryComplete(new OperationCanceledException());
}
}
}
}

View File

@ -40,6 +40,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
this.source = source;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@ -69,6 +70,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
return default;
}
}

View File

@ -160,6 +160,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.selector2 = selector2;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -324,6 +325,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (selectedEnumerator != null)
{
await selectedEnumerator.DisposeAsync();
@ -398,6 +400,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.selector2 = selector2;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -598,6 +601,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (selectedEnumerator != null)
{
await selectedEnumerator.DisposeAsync();
@ -672,6 +676,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.selector2 = selector2;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -872,6 +877,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (selectedEnumerator != null)
{
await selectedEnumerator.DisposeAsync();

View File

@ -56,6 +56,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.source = source;
this.count = count;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@ -146,6 +147,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -48,6 +48,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.source = source;
this.cancellationToken1 = cancellationToken1;
this.cancellationToken2 = cancellationToken2;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@ -131,6 +132,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -47,6 +47,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.source = source;
this.count = count;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@ -111,6 +112,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -57,6 +57,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.source = source;
this.count = count;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@ -162,6 +163,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@ -61,6 +61,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
this.cancellationTokenRegistration2 = cancellationToken2.RegisterWithoutCaptureExecutionContext(CancelDelegate2, this);
}
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@ -149,6 +150,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
cancellationTokenRegistration1.Dispose();
cancellationTokenRegistration2.Dispose();
if (enumerator != null)

View File

@ -84,6 +84,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.second = second;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -181,6 +182,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (firstEnumerator != null)
{
await firstEnumerator.DisposeAsync();
@ -236,6 +238,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.second = second;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -351,6 +354,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (firstEnumerator != null)
{
await firstEnumerator.DisposeAsync();
@ -406,6 +410,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.second = second;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@ -521,6 +526,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (firstEnumerator != null)
{
await firstEnumerator.DisposeAsync();

View File

@ -190,7 +190,7 @@ public class SandboxMain : MonoBehaviour
Debug.Log("Done");
}
async UniTaskVoid Start()
void Start()
{
//var rp = new AsyncReactiveProperty<int>(10);
@ -200,11 +200,15 @@ public class SandboxMain : MonoBehaviour
//rp.Dispose();
var channel = Channel.CreateSingleConsumerUnbounded<int>();
Debug.Log("wait channel");
await channel.Reader.ReadAllAsync(this.GetCancellationTokenOnDestroy()).ForEachAsync(_ => { });
//var channel = Channel.CreateSingleConsumerUnbounded<int>();
//Debug.Log("wait channel");
//await channel.Reader.ReadAllAsync(this.GetCancellationTokenOnDestroy()).ForEachAsync(_ => { });
var rp = new AsyncReactiveProperty<int>(10);
rp.Append(10).Select(x => x * 100).Take(30).Prepend(99).SkipLast(9).Where(x => x % 2 == 0).ForEachAsync(_ => { }).Forget();
}
async UniTaskVoid Running(CancellationToken ct)