Skip to content

Commit

Permalink
[4-74]FlowAggregateAsyncSpec: Change AssertAllStagesStopped to `A…
Browse files Browse the repository at this point in the history
…ssertAllStagesStoppedAsync` (#6543)

Co-authored-by: Aaron Stannard <[email protected]>
  • Loading branch information
eaba and Aaronontheweb authored Mar 23, 2023
1 parent 996cac7 commit 6784296
Showing 1 changed file with 33 additions and 33 deletions.
66 changes: 33 additions & 33 deletions src/core/Akka.Streams.Tests/Dsl/FlowAggregateAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ await this.AssertAllStagesStoppedAsync(async() =>
}

[Fact]
public void A_AggregateAsync_must_work_when_using_Sink_AggregateAsync()
public async Task A_AggregateAsync_must_work_when_using_Sink_AggregateAsync()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = InputSource.RunWith(AggregateSink, Materializer);
var complete = await task.ShouldCompleteWithin(3.Seconds());
Expand All @@ -78,10 +78,10 @@ public void A_AggregateAsync_must_work_when_using_Sink_AggregateAsync()
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void A_AggregateAsync_must_work_when_using_Flow_AggregateAsync()
public async Task A_AggregateAsync_must_work_when_using_Flow_AggregateAsync()
{
var flowTimeout = TimeSpan.FromMilliseconds(FlowDelayInMs*Input.Count()) + TimeSpan.FromSeconds(3);
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = InputSource.Via(AggregateFlow).RunWith(Sink.First<int>(), Materializer);
var complete = await task.ShouldCompleteWithin(flowTimeout);
Expand All @@ -90,9 +90,9 @@ public void A_AggregateAsync_must_work_when_using_Flow_AggregateAsync()
}

[Fact]
public void A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow_AggregateAsync_and_Sink_AggregateAsync()
public async Task A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow_AggregateAsync_and_Sink_AggregateAsync()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = AggregateSource.Via(AggregateFlow).RunWith(AggregateSink, Materializer);
var complete = await task.ShouldCompleteWithin(3.Seconds());
Expand All @@ -101,10 +101,9 @@ public void A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow
}

[Fact]
public void A_AggregateAsync_must_propagate_an_error()
public async Task A_AggregateAsync_must_propagate_an_error()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var error = new TestException("buh");
var future = InputSource.Select(x =>
{
Expand All @@ -116,14 +115,14 @@ public void A_AggregateAsync_must_propagate_an_error()
future.Invoking(f => f.Wait(TimeSpan.FromSeconds(3)))
.Should().Throw<TestException>()
.And.Should().Be(error);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_complete_task_with_failure_when_Aggregating_functions_throws()
public async Task A_AggregateAsync_must_complete_task_with_failure_when_Aggregating_functions_throws()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var error = new TestException("buh");
var future = InputSource.RunAggregateAsync(0, (x, y) =>
{
Expand All @@ -139,6 +138,7 @@ public void A_AggregateAsync_must_complete_task_with_failure_when_Aggregating_fu
future.Invoking(f => f.Wait(TimeSpan.FromSeconds(3)))
.Should().Throw<TestException>()
.And.Should().Be(error);
return Task.CompletedTask;
}, Materializer);
}

Expand All @@ -159,10 +159,9 @@ public void A_AggregateAsync_must_not_blow_up_with_high_request_count()
}

[Fact]
public void A_AggregateAsync_must_signal_task_failure()
public async Task A_AggregateAsync_must_signal_task_failure()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var probe = this.CreateSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5)).AggregateAsync(0, (_, n) => Task.Run(() =>
{
Expand All @@ -174,14 +173,14 @@ public void A_AggregateAsync_must_signal_task_failure()
var subscription = probe.ExpectSubscription();
subscription.Request(100);
probe.ExpectError().InnerException.Message.Should().Be("err1");
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_signal_error_from_AggregateAsync()
public async Task A_AggregateAsync_must_signal_error_from_AggregateAsync()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var c = this.CreateManualSubscriberProbe<int>();

Source.From(Enumerable.Range(1, 5)).AggregateAsync(0, (_, n) =>
Expand All @@ -195,14 +194,14 @@ public void A_AggregateAsync_must_signal_error_from_AggregateAsync()
var subscription = c.ExpectSubscription();
subscription.Request(10);
c.ExpectError().Message.Should().Be("err2");
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_resume_after_task_failure()
public async Task A_AggregateAsync_must_resume_after_task_failure()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var probe = this.CreateSubscriberProbe<(int, int)>();
Source.From(Enumerable.Range(1, 5)).AggregateAsync((0, 1), (t, n) =>
{
Expand All @@ -213,7 +212,7 @@ public void A_AggregateAsync_must_resume_after_task_failure()
if (n == 3)
throw new Exception("err3");

return (n, i + res*n);
return (n, i + res * n);
});
})
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
Expand All @@ -224,14 +223,14 @@ public void A_AggregateAsync_must_resume_after_task_failure()
subscription.Request(10);
probe.ExpectNext((5, 74));
probe.ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_restart_after_task_failure()
public async Task A_AggregateAsync_must_restart_after_task_failure()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var probe = this.CreateSubscriberProbe<(int, int)>();
Source.From(Enumerable.Range(1, 5)).AggregateAsync((0, 1), (t, n) =>
{
Expand All @@ -253,6 +252,7 @@ public void A_AggregateAsync_must_restart_after_task_failure()
subscription.Request(10);
probe.ExpectNext((5, 24));
probe.ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

Expand Down Expand Up @@ -399,10 +399,9 @@ public void A_AggregateAsync_must_restart_when_task_is_completed_with_null()
}

[Fact]
public void A_AggregateAsync_must_handle_cancel_properly()
public async Task A_AggregateAsync_must_handle_cancel_properly()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var pub = this.CreateManualPublisherProbe<int>();
var sub = this.CreateSubscriberProbe<int>();

Expand All @@ -414,15 +413,16 @@ public void A_AggregateAsync_must_handle_cancel_properly()
upstream.ExpectRequest();

sub.ExpectSubscription().Cancel();

upstream.ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_stream()
public async Task A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_stream()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = Source.From(Enumerable.Empty<int>())
.RunAggregateAsync(0, (acc, element) => Task.FromResult(acc + element), Materializer);
Expand All @@ -432,9 +432,9 @@ public void A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_s
}

[Fact]
public void A_AggregateAsync_must_complete_task_and_return_zero_and_item_given_a_stream_of_one_item()
public async Task A_AggregateAsync_must_complete_task_and_return_zero_and_item_given_a_stream_of_one_item()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = Source.Single(100)
.RunAggregateAsync(5, (acc, element) => Task.FromResult(acc + element), Materializer);
Expand Down

0 comments on commit 6784296

Please sign in to comment.