diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs index 31bb86b..9643b06 100644 --- a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs @@ -29,6 +29,16 @@ namespace Cysharp.Threading.Tasks.Linq return cts; } + public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Func action) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(action, nameof(action)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + public static void Subscribe(this IUniTaskAsyncEnumerable source, Action action, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); @@ -45,6 +55,14 @@ namespace Cysharp.Threading.Tasks.Linq Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); } + public static void Subscribe(this IUniTaskAsyncEnumerable source, Func action, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(action, nameof(action)); + + Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + // OnNext, OnError public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onError) @@ -234,6 +252,38 @@ namespace Cysharp.Threading.Tasks.Linq } } + public static async UniTaskVoid SubscribeCore(IUniTaskAsyncEnumerable source, Func onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) + { + var e = source.GetAsyncEnumerator(cancellationToken); + try + { + while (await e.MoveNextAsync()) + { + onNext(e.Current, cancellationToken).Forget(); + } + onCompleted(); + } + catch (Exception ex) + { + if (onError == NopError) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + return; + } + + if (ex is OperationCanceledException) return; + + onError(ex); + } + finally + { + if (e != null) + { + await e.DisposeAsync(); + } + } + } + public static async UniTaskVoid SubscribeCore(IUniTaskAsyncEnumerable source, IObserver observer, CancellationToken cancellationToken) { var e = source.GetAsyncEnumerator(cancellationToken); diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs.meta b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs.meta new file mode 100644 index 0000000..ea83567 --- /dev/null +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 263479eb04c189741931fc0e2f615c2d +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: