From a9e5fd4589551121854744b9cae1b4c1d5fad861 Mon Sep 17 00:00:00 2001 From: neuecc Date: Sat, 13 Jun 2020 18:58:43 +0900 Subject: [PATCH] Add UniTaskAsyncEnumerable.Subscribe --- .../Plugins/UniTask/Runtime/Linq/Subscribe.cs | 263 ++++++++++++++++++ 1 file changed, 263 insertions(+) create mode 100644 src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs new file mode 100644 index 0000000..31bb86b --- /dev/null +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs @@ -0,0 +1,263 @@ +using Cysharp.Threading.Tasks.Internal; +using System; +using System.Threading; +using Subscribes = Cysharp.Threading.Tasks.Linq.Subscribe; + +namespace Cysharp.Threading.Tasks.Linq +{ + public static partial class UniTaskAsyncEnumerable + { + // OnNext + + public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action action) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(action, nameof(action)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Func action) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(action, nameof(action)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static void Subscribe(this IUniTaskAsyncEnumerable source, Action action, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(action, nameof(action)); + + Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + + public static void Subscribe(this IUniTaskAsyncEnumerable source, Func action, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(action, nameof(action)); + + Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + + // OnNext, OnError + + public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onError) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Func onNext, Action onError) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static void Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onError, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + + public static void Subscribe(this IUniTaskAsyncEnumerable source, Func onNext, Action onError, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + + // OnNext, OnCompleted + + public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onCompleted) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget(); + return cts; + } + + public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget(); + return cts; + } + + public static void Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onCompleted, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); + } + + public static void Subscribe(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); + } + + // IObserver + + public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, IObserver observer) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(observer, nameof(observer)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeCore(source, observer, cts.Token).Forget(); + return cts; + } + + public static void Subscribe(this IUniTaskAsyncEnumerable source, IObserver observer, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(observer, nameof(observer)); + + Subscribes.SubscribeCore(source, observer, cancellationToken).Forget(); + } + } + + internal sealed class CancellationTokenDisposable : IDisposable + { + readonly CancellationTokenSource cts = new CancellationTokenSource(); + + public CancellationToken Token => cts.Token; + + public void Dispose() + { + if (!cts.IsCancellationRequested) + { + cts.Cancel(); + } + } + } + + internal static class Subscribe + { + public static readonly Action NopError = _ => { }; + public static readonly Action NopCompleted = () => { }; + + public static async UniTaskVoid SubscribeCore(IUniTaskAsyncEnumerable source, Action onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) + { + var e = source.GetAsyncEnumerator(cancellationToken); + try + { + while (await e.MoveNextAsync()) + { + onNext(e.Current); + } + onCompleted(); + } + catch (Exception ex) + { + if (onError == NopError) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + return; + } + + if (ex is OperationCanceledException) return; + + onError(ex); + } + finally + { + if (e != null) + { + await e.DisposeAsync(); + } + } + } + + public static async UniTaskVoid SubscribeCore(IUniTaskAsyncEnumerable source, Func onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) + { + var e = source.GetAsyncEnumerator(cancellationToken); + try + { + while (await e.MoveNextAsync()) + { + onNext(e.Current).Forget(); + } + onCompleted(); + } + catch (Exception ex) + { + if (onError == NopError) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + return; + } + + if (ex is OperationCanceledException) return; + + onError(ex); + } + finally + { + if (e != null) + { + await e.DisposeAsync(); + } + } + } + + public static async UniTaskVoid SubscribeCore(IUniTaskAsyncEnumerable source, IObserver observer, CancellationToken cancellationToken) + { + var e = source.GetAsyncEnumerator(cancellationToken); + try + { + while (await e.MoveNextAsync()) + { + observer.OnNext(e.Current); + } + observer.OnCompleted(); + } + catch (Exception ex) + { + if (ex is OperationCanceledException) return; + + observer.OnError(ex); + } + finally + { + if (e != null) + { + await e.DisposeAsync(); + } + } + } + } +} \ No newline at end of file