Merge remote-tracking branch 'origin/master'

pull/142/head 2.0.31
neuecc 2020-08-25 07:16:16 +09:00
commit 53907a3719
3 changed files with 291 additions and 6 deletions

View File

@ -30,10 +30,10 @@ namespace Cysharp.Threading.Tasks
return ToCancellationToken(task); return ToCancellationToken(task);
} }
var cts = new CancellationTokenSource(); var cts = CancellationTokenSource.CreateLinkedTokenSource(linkToken);
ToCancellationTokenCore(task, cts).Forget(); ToCancellationTokenCore(task, cts).Forget();
return CancellationTokenSource.CreateLinkedTokenSource(linkToken).Token; return cts.Token;
} }
public static CancellationToken ToCancellationToken<T>(this UniTask<T> task) public static CancellationToken ToCancellationToken<T>(this UniTask<T> task)

View File

@ -9,6 +9,7 @@ namespace Cysharp.Threading.Tasks
{ {
public static partial class TextMeshProAsyncExtensions public static partial class TextMeshProAsyncExtensions
{ {
// <string> -> Text
public static void BindTo(this IUniTaskAsyncEnumerable<string> source, TMP_Text text, bool rebindOnError = true) public static void BindTo(this IUniTaskAsyncEnumerable<string> source, TMP_Text text, bool rebindOnError = true)
{ {
BindToCore(source, text, text.GetCancellationTokenOnDestroy(), rebindOnError).Forget(); BindToCore(source, text, text.GetCancellationTokenOnDestroy(), rebindOnError).Forget();
@ -62,6 +63,67 @@ namespace Cysharp.Threading.Tasks
} }
} }
} }
// <T> -> Text
public static void BindTo<T>(this IUniTaskAsyncEnumerable<T> source, TMP_Text text, bool rebindOnError = true)
{
BindToCore(source, text, text.GetCancellationTokenOnDestroy(), rebindOnError).Forget();
}
public static void BindTo<T>(this IUniTaskAsyncEnumerable<T> source, TMP_Text text, CancellationToken cancellationToken, bool rebindOnError = true)
{
BindToCore(source, text, cancellationToken, rebindOnError).Forget();
}
public static void BindTo<T>(this AsyncReactiveProperty<T> source, TMP_Text text, bool rebindOnError = true)
{
BindToCore(source, text, text.GetCancellationTokenOnDestroy(), rebindOnError).Forget();
}
static async UniTaskVoid BindToCore<T>(IUniTaskAsyncEnumerable<T> source, TMP_Text text, CancellationToken cancellationToken, bool rebindOnError)
{
var repeat = false;
BIND_AGAIN:
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
while (true)
{
bool moveNext;
try
{
moveNext = await e.MoveNextAsync();
repeat = false;
}
catch (Exception ex)
{
if (ex is OperationCanceledException) return;
if (rebindOnError && !repeat)
{
repeat = true;
goto BIND_AGAIN;
}
else
{
throw;
}
}
if (!moveNext) return;
text.text = e.Current.ToString();
}
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
} }
} }

View File

@ -63,6 +63,42 @@ namespace Cysharp.Threading.Tasks.Linq
Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
} }
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
}
// OnNext, OnError // OnNext, OnError
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError) public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
@ -105,6 +141,46 @@ namespace Cysharp.Threading.Tasks.Linq
Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
} }
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
}
// OnNext, OnCompleted // OnNext, OnCompleted
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action onCompleted) public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action onCompleted)
@ -147,6 +223,46 @@ namespace Cysharp.Threading.Tasks.Linq
Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
} }
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action onCompleted)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action onCompleted, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action onCompleted)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action onCompleted, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
}
// IObserver // IObserver
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, IObserver<TSource> observer) public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, IObserver<TSource> observer)
@ -194,9 +310,16 @@ namespace Cysharp.Threading.Tasks.Linq
try try
{ {
while (await e.MoveNextAsync()) while (await e.MoveNextAsync())
{
try
{ {
onNext(e.Current); onNext(e.Current);
} }
catch (Exception ex)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
}
}
onCompleted(); onCompleted();
} }
catch (Exception ex) catch (Exception ex)
@ -226,9 +349,16 @@ namespace Cysharp.Threading.Tasks.Linq
try try
{ {
while (await e.MoveNextAsync()) while (await e.MoveNextAsync())
{
try
{ {
onNext(e.Current).Forget(); onNext(e.Current).Forget();
} }
catch (Exception ex)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
}
}
onCompleted(); onCompleted();
} }
catch (Exception ex) catch (Exception ex)
@ -258,9 +388,16 @@ namespace Cysharp.Threading.Tasks.Linq
try try
{ {
while (await e.MoveNextAsync()) while (await e.MoveNextAsync())
{
try
{ {
onNext(e.Current, cancellationToken).Forget(); onNext(e.Current, cancellationToken).Forget();
} }
catch (Exception ex)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
}
}
onCompleted(); onCompleted();
} }
catch (Exception ex) catch (Exception ex)
@ -290,9 +427,16 @@ namespace Cysharp.Threading.Tasks.Linq
try try
{ {
while (await e.MoveNextAsync()) while (await e.MoveNextAsync())
{
try
{ {
observer.OnNext(e.Current); observer.OnNext(e.Current);
} }
catch (Exception ex)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
}
}
observer.OnCompleted(); observer.OnCompleted();
} }
catch (Exception ex) catch (Exception ex)
@ -309,5 +453,84 @@ namespace Cysharp.Threading.Tasks.Linq
} }
} }
} }
public static async UniTaskVoid SubscribeAwaitCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
while (await e.MoveNextAsync())
{
try
{
await onNext(e.Current);
}
catch (Exception ex)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
}
}
onCompleted();
}
catch (Exception ex)
{
if (onError == NopError)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
return;
}
if (ex is OperationCanceledException) return;
onError(ex);
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
public static async UniTaskVoid SubscribeAwaitCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
while (await e.MoveNextAsync())
{
try
{
await onNext(e.Current, cancellationToken);
}
catch (Exception ex)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
}
}
onCompleted();
}
catch (Exception ex)
{
if (onError == NopError)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
return;
}
if (ex is OperationCanceledException) return;
onError(ex);
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
} }
} }