Fix ChannelReader.Completion throws UnobservedException when not touched

pull/61/head
neuecc 2020-05-18 23:33:13 +09:00
parent e33d572104
commit 2e4fe90956
3 changed files with 134 additions and 145 deletions
src/UniTask/Assets

View File

@ -91,7 +91,8 @@ namespace Cysharp.Threading.Tasks
{
readonly Queue<T> items;
readonly SingleConsumerUnboundedChannelReader readerSource;
readonly UniTaskCompletionSource completedTask;
UniTaskCompletionSource completedTaskSource;
UniTask completedTask;
Exception completionError;
bool closed;
@ -99,7 +100,6 @@ namespace Cysharp.Threading.Tasks
public SingleConsumerUnboundedChannel()
{
items = new Queue<T>();
completedTask = new UniTaskCompletionSource();
Writer = new SingleConsumerUnboundedChannelWriter(this);
readerSource = new SingleConsumerUnboundedChannelReader(this);
Reader = readerSource;
@ -146,11 +146,25 @@ namespace Cysharp.Threading.Tasks
{
if (error == null)
{
parent.completedTask.TrySetResult();
if (parent.completedTaskSource != null)
{
parent.completedTaskSource.TrySetResult();
}
else
{
parent.completedTask.TrySetException(error);
parent.completedTask = UniTask.CompletedTask;
}
}
else
{
if (parent.completedTaskSource != null)
{
parent.completedTaskSource.TrySetException(error);
}
else
{
parent.completedTask = UniTask.FromException(error);
}
}
if (waiting)
@ -181,7 +195,21 @@ namespace Cysharp.Threading.Tasks
this.parent = parent;
}
public override UniTask Completion => parent.completedTask.Task;
public override UniTask Completion
{
get
{
if (parent.completedTaskSource != null) return parent.completedTaskSource.Task;
if (parent.closed)
{
return parent.completedTask;
}
parent.completedTaskSource = new UniTaskCompletionSource();
return parent.completedTaskSource.Task;
}
}
public override bool TryRead(out T item)
{
@ -196,11 +224,25 @@ namespace Cysharp.Threading.Tasks
{
if (parent.completionError != null)
{
parent.completedTask.TrySetException(parent.completionError);
if (parent.completedTaskSource != null)
{
parent.completedTaskSource.TrySetException(parent.completionError);
}
else
{
parent.completedTask.TrySetResult();
parent.completedTask = UniTask.FromException(parent.completionError);
}
}
else
{
if (parent.completedTaskSource != null)
{
parent.completedTaskSource.TrySetResult();
}
else
{
parent.completedTask = UniTask.CompletedTask;
}
}
}
}

View File

@ -97,6 +97,11 @@ namespace Cysharp.Threading.Tasks
}
}
internal void MarkHandled()
{
hasUnhandledError = false;
}
/// <summary>Completes with a successful result.</summary>
/// <param name="result">The result.</param>
[DebuggerHidden]
@ -293,12 +298,12 @@ namespace Cysharp.Threading.Tasks
}
[DebuggerHidden]
[Conditional("UNITY_EDITOR")]
internal void MarkHandled()
{
if (!handled)
{
handled = true;
core.MarkHandled();
TaskTracker.RemoveTracking(this);
}
}
@ -504,12 +509,12 @@ namespace Cysharp.Threading.Tasks
}
[DebuggerHidden]
[Conditional("UNITY_EDITOR")]
internal void MarkHandled()
{
if (!handled)
{
handled = true;
core.MarkHandled();
TaskTracker.RemoveTracking(this);
}
}

View File

@ -50,7 +50,62 @@ public static partial class UnityUIComponentExtensions
public class AsyncMessageBroker<T> : IDisposable
{
Channel<T> channel;
List<Func<T, UniTask>> asyncEvents;
public AsyncMessageBroker()
{
channel = Channel.CreateSingleConsumerUnbounded<T>();
asyncEvents = new List<Func<T, UniTask>>();
}
async UniTaskVoid PublishAll()
{
await channel.Reader.ReadAllAsync().ForEachAwaitAsync(async x =>
{
foreach (var item in asyncEvents)
{
await item.Invoke(x);
}
});
}
public void Publish(T value)
{
channel.Writer.TryWrite(value);
}
public Subscription Subscribe(Func<T, UniTask> func)
{
asyncEvents.Add(func);
return new Subscription(this, func);
}
public void Dispose()
{
channel.Writer.TryComplete();
asyncEvents.Clear();
}
public readonly struct Subscription : IDisposable
{
readonly AsyncMessageBroker<T> broker;
readonly Func<T, UniTask> func;
public Subscription(AsyncMessageBroker<T> broker, Func<T, UniTask> func)
{
this.broker = broker;
this.func = func;
}
public void Dispose()
{
broker.asyncEvents.Remove(func);
}
}
}
public class SandboxMain : MonoBehaviour
@ -141,150 +196,37 @@ public class SandboxMain : MonoBehaviour
void Start()
{
Application.SetStackTraceLogType(LogType.Error, StackTraceLogType.Full);
Application.SetStackTraceLogType(LogType.Exception, StackTraceLogType.Full);
var channel = Channel.CreateSingleConsumerUnbounded<int>();
var playerLoop = UnityEngine.LowLevel.PlayerLoop.GetCurrentPlayerLoop();
//ShowPlayerLoop.DumpPlayerLoop("Current", playerLoop);
var reader = channel.Reader;
WaitForChannelAsync(reader, this.GetCancellationTokenOnDestroy()).Forget();
RP1 = new AsyncReactiveProperty<int>(999);
var writer = channel.Writer;
HogeAsync().Forget();
writer.TryWrite(1);
writer.TryWrite(2);
writer.TryWrite(3);
writer.Complete(new Exception());
RP1.Select(x => x * x).BindTo(text);
//Update2().Forget();
//RunStandardDelayAsync().Forget();
//for (int i = 0; i < 14; i++)
//{
// TimingDump((PlayerLoopTiming)i).Forget();
//}
//StartCoroutine(CoroutineDump("yield WaitForEndOfFrame", new WaitForEndOfFrame()));
//StartCoroutine(CoroutineDump("yield WaitForFixedUpdate", new WaitForFixedUpdate()));
//StartCoroutine(CoroutineDump("yield null", null));
// -----
// RunJobAsync().Forget();
//ClickOnce().Forget();
//ClickForever().Forget();
//var cor = UniTask.ToCoroutine(async () =>
// {
// var job = new MyJob() { loopCount = 999, inOut = new NativeArray<int>(1, Allocator.TempJob) };
// JobHandle.ScheduleBatchedJobs();
// await job.Schedule().WaitAsync(PlayerLoopTiming.Update);
// job.inOut.Dispose();
// });
//StartCoroutine(cor);
// UniTaskAsyncEnumerable.EveryUpdate(PlayerLoopTiming.FixedUpdate)
//await UniTask.Yield(PlayerLoopTiming.Update);
//Debug.Log("Start:" + Time.frameCount);
//await UniTaskAsyncEnumerable.TimerFrame(3, 5, PlayerLoopTiming.PostLateUpdate)
// .Select(x => x)
// .Do(x => Debug.Log("DODODO"))
// .ForEachAsync(_ =>
// {
// Debug.Log("Call:" + Time.frameCount);
// }, cancellationToken: this.GetCancellationTokenOnDestroy());
//try
//{
// await this.GetAsyncUpdateTrigger().ForEachAsync(_ =>
// {
// UnityEngine.Debug.Log("EveryUpdate:" + Time.frameCount);
// });
//}
//catch (OperationCanceledException ex)
//{
// UnityEngine.Debug.Log("END");
//}
CancellationTokenSource cts = new CancellationTokenSource();
//var trigger = this.GetAsyncUpdateTrigger();
//Go(trigger, 1, cts.Token).Forget();
//Go(trigger, 2, cts.Token).Forget();
//Go(trigger, 3, cts.Token).Forget();
//Go(trigger, 4, cts.Token).Forget();
//Go(trigger, 5, cts.Token).Forget();
Application.logMessageReceived += Application_logMessageReceived;
// foo.Status.IsCanceled
// 5回クリックされるまで待つ、とか。
//Debug.Log("Await start.");
//await okButton.GetAsyncClickEventHandler().DisableAutoClose()
// .Select((_, clickCount) => clickCount + 1)
// .FirstAsync(x => x == 5);
//Debug.Log("Click 5 times.");
// await this.GetAsyncUpdateTrigger().UpdateAsAsyncEnumerable()
//ucs = new UniTaskCompletionSource();
//okButton.onClick.AddListener(async () =>
//{
// await InnerAsync(false);
//});
okButton.onClick.AddListener(() =>
{
// FooAsync().Forget();
RP1.Value += 3;
});
cancelButton.onClick.AddListener(() =>
{
text.text = "";
// ucs.TrySetResult();
cts.Cancel();
});
}
static void Foo(UniTask t)
async UniTaskVoid WaitForChannelAsync(ChannelReader<int> reader, CancellationToken token)
{
try
{
//var result1 = await reader.ReadAsync(token);
//Debug.Log(result1);
await reader.ReadAllAsync().ForEachAsync(x => Debug.Log(x)/*, token*/);
Debug.Log("done");
}
catch (Exception ex)
{
Debug.Log("here");
Debug.LogException(ex);
}
}
async UniTaskVoid Go(AsyncUpdateTrigger trigger, int i, CancellationToken ct)