TriggerAsyncEnumerable

pull/61/head
neuecc 2020-05-12 13:15:26 +09:00
parent bd6906792d
commit d3538bdc8f
2 changed files with 107 additions and 14 deletions

View File

@ -4,6 +4,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading; using System.Threading;
using Cysharp.Threading.Tasks.Internal; using Cysharp.Threading.Tasks.Internal;
using Cysharp.Threading.Tasks.Linq;
using UnityEngine; using UnityEngine;
namespace Cysharp.Threading.Tasks.Triggers namespace Cysharp.Threading.Tasks.Triggers
@ -87,7 +88,7 @@ namespace Cysharp.Threading.Tasks.Triggers
} }
} }
public sealed partial class AsyncTriggerHandler<T> : IUniTaskSource<T>, IResolvePromise<T>, ICancelPromise, IDisposable public sealed partial class AsyncTriggerHandler<T> : IUniTaskSource<T>, IResolveCancelPromise<T>, IDisposable
{ {
static Action<object> cancellationCallback = CancellationCallback; static Action<object> cancellationCallback = CancellationCallback;
@ -207,17 +208,101 @@ namespace Cysharp.Threading.Tasks.Triggers
} }
} }
public sealed class TriggerEvent<T> : IResolvePromise<T>, ICancelPromise public sealed class TriggerAsyncEnumerable<T> : IUniTaskAsyncEnumerable<T>
{
readonly TriggerEvent<T> triggerEvent;
public TriggerAsyncEnumerable(TriggerEvent<T> triggerEvent)
{
this.triggerEvent = triggerEvent;
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new Enumerator(triggerEvent, cancellationToken);
}
sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, IResolveCancelPromise<T>
{
static Action<object> cancellationCallback = CancellationCallback;
readonly TriggerEvent<T> triggerEvent;
CancellationToken cancellationToken;
CancellationTokenRegistration registration;
bool called;
bool isDisposed;
public Enumerator(TriggerEvent<T> triggerEvent, CancellationToken cancellationToken)
{
this.triggerEvent = triggerEvent;
this.cancellationToken = cancellationToken;
}
public bool TrySetCanceled(CancellationToken cancellationToken = default)
{
return completionSource.TrySetCanceled(cancellationToken);
}
public bool TrySetResult(T value)
{
Current = value;
return completionSource.TrySetResult(true);
}
static void CancellationCallback(object state)
{
var self = (Enumerator)state;
self.DisposeAsync().Forget(); // sync
self.completionSource.TrySetCanceled(self.cancellationToken);
}
public T Current { get; private set; }
public UniTask<bool> MoveNextAsync()
{
cancellationToken.ThrowIfCancellationRequested();
if (!called)
{
TaskTracker.TrackActiveTask(this, 3);
triggerEvent.Add(this);
if (cancellationToken.CanBeCanceled)
{
registration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
}
}
completionSource.Reset();
return new UniTask<bool>(this, completionSource.Version);
}
public UniTask DisposeAsync()
{
if (!isDisposed)
{
isDisposed = true;
TaskTracker.RemoveTracking(this);
registration.Dispose();
triggerEvent.Remove(this);
}
return default;
}
}
}
public sealed class TriggerEvent<T> : IResolveCancelPromise<T>
{ {
// optimize: many cases, handler is single. // optimize: many cases, handler is single.
AsyncTriggerHandler<T> singleHandler; IResolveCancelPromise<T> singleHandler;
AsyncTriggerHandler<T>[] handlers; IResolveCancelPromise<T>[] handlers;
// when running(in TrySetResult), does not add immediately. // when running(in TrySetResult), does not add immediately.
bool isRunning; bool isRunning;
AsyncTriggerHandler<T> waitHandler; IResolveCancelPromise<T> waitHandler;
MinimumQueue<AsyncTriggerHandler<T>> waitQueue; MinimumQueue<IResolveCancelPromise<T>> waitQueue;
public bool TrySetResult(T value) public bool TrySetResult(T value)
{ {
@ -227,7 +312,7 @@ namespace Cysharp.Threading.Tasks.Triggers
{ {
try try
{ {
((IResolvePromise<T>)singleHandler).TrySetResult(value); singleHandler.TrySetResult(value);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -243,7 +328,7 @@ namespace Cysharp.Threading.Tasks.Triggers
{ {
try try
{ {
((IResolvePromise<T>)handlers[i]).TrySetResult(value); handlers[i].TrySetResult(value);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -329,7 +414,7 @@ namespace Cysharp.Threading.Tasks.Triggers
return true; return true;
} }
public void Add(AsyncTriggerHandler<T> handler) public void Add(IResolveCancelPromise<T> handler)
{ {
if (isRunning) if (isRunning)
{ {
@ -341,7 +426,7 @@ namespace Cysharp.Threading.Tasks.Triggers
if (waitQueue == null) if (waitQueue == null)
{ {
waitQueue = new MinimumQueue<AsyncTriggerHandler<T>>(4); waitQueue = new MinimumQueue<IResolveCancelPromise<T>>(4);
} }
waitQueue.Enqueue(handler); waitQueue.Enqueue(handler);
return; return;
@ -355,7 +440,7 @@ namespace Cysharp.Threading.Tasks.Triggers
{ {
if (handlers == null) if (handlers == null)
{ {
handlers = new AsyncTriggerHandler<T>[4]; handlers = new IResolveCancelPromise<T>[4];
} }
// check empty // check empty
@ -377,15 +462,15 @@ namespace Cysharp.Threading.Tasks.Triggers
} }
} }
static void EnsureCapacity(ref AsyncTriggerHandler<T>[] array) static void EnsureCapacity(ref IResolveCancelPromise<T>[] array)
{ {
var newSize = array.Length * 2; var newSize = array.Length * 2;
var newArray = new AsyncTriggerHandler<T>[newSize]; var newArray = new IResolveCancelPromise<T>[newSize];
Array.Copy(array, 0, newArray, 0, array.Length); Array.Copy(array, 0, newArray, 0, array.Length);
array = newArray; array = newArray;
} }
public void Remove(AsyncTriggerHandler<T> handler) public void Remove(IResolveCancelPromise<T> handler)
{ {
if (singleHandler == handler) if (singleHandler == handler)
{ {

View File

@ -38,6 +38,14 @@ namespace Cysharp.Threading.Tasks
{ {
} }
public interface IResolveCancelPromise : IResolvePromise, ICancelPromise
{
}
public interface IResolveCancelPromise<T> : IResolvePromise<T>, ICancelPromise
{
}
[StructLayout(LayoutKind.Auto)] [StructLayout(LayoutKind.Auto)]
public struct UniTaskCompletionSourceCore<TResult> public struct UniTaskCompletionSourceCore<TResult>
{ {