diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowAggregateAsyncSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowAggregateAsyncSpec.cs index 4ec9d9bfcf1..56f935d3747 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowAggregateAsyncSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowAggregateAsyncSpec.cs @@ -67,9 +67,9 @@ await this.AssertAllStagesStoppedAsync(async() => } [Fact] - public void A_AggregateAsync_must_work_when_using_Sink_AggregateAsync() + public async Task A_AggregateAsync_must_work_when_using_Sink_AggregateAsync() { - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var task = InputSource.RunWith(AggregateSink, Materializer); var complete = await task.ShouldCompleteWithin(3.Seconds()); @@ -78,10 +78,10 @@ public void A_AggregateAsync_must_work_when_using_Sink_AggregateAsync() } [LocalFact(SkipLocal = "Racy on Azure DevOps")] - public void A_AggregateAsync_must_work_when_using_Flow_AggregateAsync() + public async Task A_AggregateAsync_must_work_when_using_Flow_AggregateAsync() { var flowTimeout = TimeSpan.FromMilliseconds(FlowDelayInMs*Input.Count()) + TimeSpan.FromSeconds(3); - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var task = InputSource.Via(AggregateFlow).RunWith(Sink.First(), Materializer); var complete = await task.ShouldCompleteWithin(flowTimeout); @@ -90,9 +90,9 @@ public void A_AggregateAsync_must_work_when_using_Flow_AggregateAsync() } [Fact] - public void A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow_AggregateAsync_and_Sink_AggregateAsync() + public async Task A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow_AggregateAsync_and_Sink_AggregateAsync() { - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var task = AggregateSource.Via(AggregateFlow).RunWith(AggregateSink, Materializer); var complete = await task.ShouldCompleteWithin(3.Seconds()); @@ -101,10 +101,9 @@ public void A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow } [Fact] - public void A_AggregateAsync_must_propagate_an_error() + public async Task A_AggregateAsync_must_propagate_an_error() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var error = new TestException("buh"); var future = InputSource.Select(x => { @@ -116,14 +115,14 @@ public void A_AggregateAsync_must_propagate_an_error() future.Invoking(f => f.Wait(TimeSpan.FromSeconds(3))) .Should().Throw() .And.Should().Be(error); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_AggregateAsync_must_complete_task_with_failure_when_Aggregating_functions_throws() + public async Task A_AggregateAsync_must_complete_task_with_failure_when_Aggregating_functions_throws() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var error = new TestException("buh"); var future = InputSource.RunAggregateAsync(0, (x, y) => { @@ -139,6 +138,7 @@ public void A_AggregateAsync_must_complete_task_with_failure_when_Aggregating_fu future.Invoking(f => f.Wait(TimeSpan.FromSeconds(3))) .Should().Throw() .And.Should().Be(error); + return Task.CompletedTask; }, Materializer); } @@ -159,10 +159,9 @@ public void A_AggregateAsync_must_not_blow_up_with_high_request_count() } [Fact] - public void A_AggregateAsync_must_signal_task_failure() + public async Task A_AggregateAsync_must_signal_task_failure() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe = this.CreateSubscriberProbe(); Source.From(Enumerable.Range(1, 5)).AggregateAsync(0, (_, n) => Task.Run(() => { @@ -174,14 +173,14 @@ public void A_AggregateAsync_must_signal_task_failure() var subscription = probe.ExpectSubscription(); subscription.Request(100); probe.ExpectError().InnerException.Message.Should().Be("err1"); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_AggregateAsync_must_signal_error_from_AggregateAsync() + public async Task A_AggregateAsync_must_signal_error_from_AggregateAsync() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var c = this.CreateManualSubscriberProbe(); Source.From(Enumerable.Range(1, 5)).AggregateAsync(0, (_, n) => @@ -195,14 +194,14 @@ public void A_AggregateAsync_must_signal_error_from_AggregateAsync() var subscription = c.ExpectSubscription(); subscription.Request(10); c.ExpectError().Message.Should().Be("err2"); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_AggregateAsync_must_resume_after_task_failure() + public async Task A_AggregateAsync_must_resume_after_task_failure() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe = this.CreateSubscriberProbe<(int, int)>(); Source.From(Enumerable.Range(1, 5)).AggregateAsync((0, 1), (t, n) => { @@ -213,7 +212,7 @@ public void A_AggregateAsync_must_resume_after_task_failure() if (n == 3) throw new Exception("err3"); - return (n, i + res*n); + return (n, i + res * n); }); }) .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) @@ -224,14 +223,14 @@ public void A_AggregateAsync_must_resume_after_task_failure() subscription.Request(10); probe.ExpectNext((5, 74)); probe.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_AggregateAsync_must_restart_after_task_failure() + public async Task A_AggregateAsync_must_restart_after_task_failure() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe = this.CreateSubscriberProbe<(int, int)>(); Source.From(Enumerable.Range(1, 5)).AggregateAsync((0, 1), (t, n) => { @@ -253,6 +252,7 @@ public void A_AggregateAsync_must_restart_after_task_failure() subscription.Request(10); probe.ExpectNext((5, 24)); probe.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } @@ -399,10 +399,9 @@ public void A_AggregateAsync_must_restart_when_task_is_completed_with_null() } [Fact] - public void A_AggregateAsync_must_handle_cancel_properly() + public async Task A_AggregateAsync_must_handle_cancel_properly() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var pub = this.CreateManualPublisherProbe(); var sub = this.CreateSubscriberProbe(); @@ -414,15 +413,16 @@ public void A_AggregateAsync_must_handle_cancel_properly() upstream.ExpectRequest(); sub.ExpectSubscription().Cancel(); - + upstream.ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_stream() + public async Task A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_stream() { - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var task = Source.From(Enumerable.Empty()) .RunAggregateAsync(0, (acc, element) => Task.FromResult(acc + element), Materializer); @@ -432,9 +432,9 @@ public void A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_s } [Fact] - public void A_AggregateAsync_must_complete_task_and_return_zero_and_item_given_a_stream_of_one_item() + public async Task A_AggregateAsync_must_complete_task_and_return_zero_and_item_given_a_stream_of_one_item() { - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var task = Source.Single(100) .RunAggregateAsync(5, (acc, element) => Task.FromResult(acc + element), Materializer);