diff --git a/src/UniTask.NetCoreTests/Linq/QueueTest.cs b/src/UniTask.NetCoreTests/Linq/QueueTest.cs new file mode 100644 index 0000000..d2f5fc5 --- /dev/null +++ b/src/UniTask.NetCoreTests/Linq/QueueTest.cs @@ -0,0 +1,29 @@ +using Cysharp.Threading.Tasks; +using Cysharp.Threading.Tasks.Linq; +using FluentAssertions; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Xunit; + +namespace NetCoreTests.Linq +{ + public class QueueTest + { + [Fact] + public async Task Q() + { + var rp = new AsyncReactiveProperty<int>(100); + + var l = new List<int>(); + await rp.Take(10).Queue().ForEachAsync(x => + { + rp.Value += 10; + l.Add(x); + }); + + l.Should().BeEquivalentTo(100, 110, 120, 130, 140, 150, 160, 170, 180, 190); + } + } +} diff --git a/src/UniTask.NetCoreTests/Linq/TakeInfinityTest.cs b/src/UniTask.NetCoreTests/Linq/TakeInfinityTest.cs new file mode 100644 index 0000000..1c1ea48 --- /dev/null +++ b/src/UniTask.NetCoreTests/Linq/TakeInfinityTest.cs @@ -0,0 +1,44 @@ +using Cysharp.Threading.Tasks; +using Cysharp.Threading.Tasks.Linq; +using FluentAssertions; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Xunit; + +namespace NetCoreTests.Linq +{ + public class TakeInfinityTest + { + [Fact] + public async Task Take() + { + var rp = new AsyncReactiveProperty<int>(1); + + var xs = rp.Take(5).ToArrayAsync(); + + rp.Value = 2; + rp.Value = 3; + rp.Value = 4; + rp.Value = 5; + + (await xs).Should().BeEquivalentTo(1, 2, 3, 4, 5); + } + + [Fact] + public async Task TakeWhile() + { + var rp = new AsyncReactiveProperty<int>(1); + + var xs = rp.TakeWhile(x => x != 5).ToArrayAsync(); + + rp.Value = 2; + rp.Value = 3; + rp.Value = 4; + rp.Value = 5; + + (await xs).Should().BeEquivalentTo(1, 2, 3, 4); + } + } +} diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Queue.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Queue.cs new file mode 100644 index 0000000..032c3ce --- /dev/null +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Queue.cs @@ -0,0 +1,95 @@ +using System; +using System.Threading; + +namespace Cysharp.Threading.Tasks.Linq +{ + public static partial class UniTaskAsyncEnumerable + { + public static IUniTaskAsyncEnumerable<TSource> Queue<TSource>(this IUniTaskAsyncEnumerable<TSource> source) + { + return new QueueOperator<TSource>(source); + } + } + + internal sealed class QueueOperator<TSource> : IUniTaskAsyncEnumerable<TSource> + { + readonly IUniTaskAsyncEnumerable<TSource> source; + + public QueueOperator(IUniTaskAsyncEnumerable<TSource> source) + { + this.source = source; + } + + public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + return new _Queue(source, cancellationToken); + } + + sealed class _Queue : IUniTaskAsyncEnumerator<TSource> + { + readonly IUniTaskAsyncEnumerable<TSource> source; + CancellationToken cancellationToken; + + Channel<TSource> channel; + IUniTaskAsyncEnumerator<TSource> channelEnumerator; + IUniTaskAsyncEnumerator<TSource> sourceEnumerator; + + public _Queue(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken) + { + this.source = source; + this.cancellationToken = cancellationToken; + } + + public TSource Current => channelEnumerator.Current; + + public UniTask<bool> MoveNextAsync() + { + cancellationToken.ThrowIfCancellationRequested(); + + if (sourceEnumerator == null) + { + sourceEnumerator = source.GetAsyncEnumerator(cancellationToken); + channel = Channel.CreateSingleConsumerUnbounded<TSource>(); + + channelEnumerator = channel.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken); + + ConsumeAll(sourceEnumerator, channel).Forget(); + } + + return channelEnumerator.MoveNextAsync(); + } + + static async UniTaskVoid ConsumeAll(IUniTaskAsyncEnumerator<TSource> enumerator, ChannelWriter<TSource> writer) + { + try + { + while (await enumerator.MoveNextAsync()) + { + writer.TryWrite(enumerator.Current); + } + writer.TryComplete(); + } + catch (Exception ex) + { + writer.TryComplete(ex); + } + finally + { + await enumerator.DisposeAsync(); + } + } + + public async UniTask DisposeAsync() + { + if (sourceEnumerator != null) + { + await sourceEnumerator.DisposeAsync(); + } + if (channelEnumerator != null) + { + await channelEnumerator.DisposeAsync(); + } + } + } + } +} \ No newline at end of file