Add UniTaskAsyncEnumerable.SubscribeAwait

pull/139/head
Mayuki Sawatari 2020-08-24 14:35:20 +09:00
parent 7718d345c8
commit 9e45c0a4d1
1 changed files with 195 additions and 0 deletions

View File

@ -63,6 +63,42 @@ namespace Cysharp.Threading.Tasks.Linq
Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
}
// OnNext, OnError
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
@ -105,6 +141,46 @@ namespace Cysharp.Threading.Tasks.Linq
Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
}
// OnNext, OnCompleted
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action onCompleted)
@ -147,6 +223,46 @@ namespace Cysharp.Threading.Tasks.Linq
Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action onCompleted)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action onCompleted, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action onCompleted)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action onCompleted, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
}
// IObserver
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, IObserver<TSource> observer)
@ -309,5 +425,84 @@ namespace Cysharp.Threading.Tasks.Linq
}
}
}
public static async UniTaskVoid SubscribeAwaitCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
while (await e.MoveNextAsync())
{
try
{
await onNext(e.Current);
}
catch (Exception ex)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
}
}
onCompleted();
}
catch (Exception ex)
{
if (onError == NopError)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
return;
}
if (ex is OperationCanceledException) return;
onError(ex);
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
public static async UniTaskVoid SubscribeAwaitCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
while (await e.MoveNextAsync())
{
try
{
await onNext(e.Current, cancellationToken);
}
catch (Exception ex)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
}
}
onCompleted();
}
catch (Exception ex)
{
if (onError == NopError)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
return;
}
if (ex is OperationCanceledException) return;
onError(ex);
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
}
}