Changed AsyncReactiveProperty produce current value at first, Add AsyncReactiveProperty.WithoutCurrent

pull/61/head
neuecc 2020-05-18 11:30:49 +09:00
parent ec0a8f5a8b
commit d003597662
3 changed files with 93 additions and 24 deletions

View File

@ -86,36 +86,18 @@ namespace NetCoreSandbox
await Task.Delay(10, cancellationToken);
}
private async UniTaskVoid HogeAsync()
static void Main(string[] args)
{
}
static async Task Main(string[] args)
{
await foreach (var item in UniTaskAsyncEnumerable.Range(1, 10)
.SelectAwait(x => UniTask.Run(() => x))
.TakeLast(6)
var channel = Channel.CreateSingleConsumerUnbounded<int>();
)
{
Console.WriteLine(item);
}
// AsyncEnumerable.Range(1,10).FirstAsync(
// AsyncEnumerable.Range(1, 10).GroupBy(x=>x).Select(x=>x.first
// AsyncEnumerable.Range(1,10).WithCancellation(CancellationToken.None).WithCancellation
//Enumerable.Range(1,10).ToHashSet(
}

View File

@ -0,0 +1,57 @@
using Cysharp.Threading.Tasks;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using Cysharp.Threading.Tasks.Linq;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests
{
public class AsyncReactivePropertyTest
{
[Fact]
public async Task Iteration()
{
var rp = new AsyncReactiveProperty<int>(99);
var f = await rp.FirstAsync();
f.Should().Be(99);
var array = rp.Take(5).ToArrayAsync();
rp.Value = 100;
rp.Value = 100;
rp.Value = 100;
rp.Value = 131;
var ar = await array;
ar.Should().BeEquivalentTo(new[] { 99, 100, 100, 100, 131 });
}
[Fact]
public async Task WithoutCurrent()
{
var rp = new AsyncReactiveProperty<int>(99);
var array = rp.WithoutCurrent().Take(5).ToArrayAsync();
rp.Value = 100;
rp.Value = 100;
rp.Value = 100;
rp.Value = 131;
rp.Value = 191;
var ar = await array;
ar.Should().BeEquivalentTo(new[] { 100, 100, 100, 131, 191 });
}
}
}

View File

@ -43,9 +43,14 @@ namespace Cysharp.Threading.Tasks
this.triggerEvent = default;
}
public IUniTaskAsyncEnumerable<T> WithoutCurrent()
{
return new WithoutCurrentEnumerable(this);
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
{
return new Enumerator(this, cancellationToken);
return new Enumerator(this, cancellationToken, true);
}
public void Dispose()
@ -53,6 +58,21 @@ namespace Cysharp.Threading.Tasks
triggerEvent.SetCanceled(CancellationToken.None);
}
class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
{
readonly AsyncReactiveProperty<T> parent;
public WithoutCurrentEnumerable(AsyncReactiveProperty<T> parent)
{
this.parent = parent;
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new Enumerator(parent, cancellationToken, false);
}
}
sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, IResolveCancelPromise<T>
{
static Action<object> cancellationCallback = CancellationCallback;
@ -62,11 +82,13 @@ namespace Cysharp.Threading.Tasks
readonly CancellationTokenRegistration cancellationTokenRegistration;
T value;
bool isDisposed;
bool firstCall;
public Enumerator(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken)
public Enumerator(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
{
this.parent = parent;
this.cancellationToken = cancellationToken;
this.firstCall = publishCurrentValue;
parent.triggerEvent.Add(this);
TaskTracker.TrackActiveTask(this, 3);
@ -81,6 +103,14 @@ namespace Cysharp.Threading.Tasks
public UniTask<bool> MoveNextAsync()
{
// raise latest value on first call.
if (firstCall)
{
firstCall = false;
value = parent.Value;
return CompletedTasks.True;
}
completionSource.Reset();
return new UniTask<bool>(this, completionSource.Version);
}