pull/101/head
neuecc 2020-06-15 12:29:46 +09:00
parent bbfb8354bb
commit a2783d3c8a
2 changed files with 61 additions and 0 deletions

View File

@ -29,6 +29,16 @@ namespace Cysharp.Threading.Tasks.Linq
return cts;
}
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTaskVoid> action)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(action, nameof(action));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> action, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
@ -45,6 +55,14 @@ namespace Cysharp.Threading.Tasks.Linq
Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
}
public static void Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTaskVoid> action, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(action, nameof(action));
Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
}
// OnNext, OnError
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
@ -234,6 +252,38 @@ namespace Cysharp.Threading.Tasks.Linq
}
}
public static async UniTaskVoid SubscribeCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTaskVoid> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
while (await e.MoveNextAsync())
{
onNext(e.Current, cancellationToken).Forget();
}
onCompleted();
}
catch (Exception ex)
{
if (onError == NopError)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
return;
}
if (ex is OperationCanceledException) return;
onError(ex);
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
public static async UniTaskVoid SubscribeCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, IObserver<TSource> observer, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);

View File

@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 263479eb04c189741931fc0e2f615c2d
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant: