Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[44-74] GraphMergeSpec #6591

Merged
merged 3 commits into from
Apr 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 37 additions & 42 deletions src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
Expand Down Expand Up @@ -46,10 +47,9 @@ public MergeFixture(GraphDsl.Builder<NotUsed> builder) : base(builder)
}

[Fact]
public void A_Merge_must_work_in_the_happy_case()
public async Task A_Merge_must_work_in_the_happy_case()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
// Different input sizes(4 and 6)
var source1 = Source.From(Enumerable.Range(0, 4));
var source2 = Source.From(Enumerable.Range(4, 6));
Expand All @@ -63,27 +63,27 @@ public void A_Merge_must_work_in_the_happy_case()
var sink = Sink.FromSubscriber(probe);

b.From(source1).To(m1.In(0));
b.From(m1.Out).Via(Flow.Create<int>().Select(x => x*2)).To(m2.In(0));
b.From(m2.Out).Via(Flow.Create<int>().Select(x => x / 2).Select(x=>x+1)).To(sink);
b.From(m1.Out).Via(Flow.Create<int>().Select(x => x * 2)).To(m2.In(0));
b.From(m2.Out).Via(Flow.Create<int>().Select(x => x / 2).Select(x => x + 1)).To(sink);
b.From(source2).To(m1.In(1));
b.From(source3).To(m2.In(1));

return ClosedShape.Instance;
})).Run(Materializer);

var subscription = probe.ExpectSubscription();
var subscription = await probe.ExpectSubscriptionAsync();
var collected = new List<int>();
for (var i = 1; i <= 10; i++)
{
subscription.Request(1);
collected.Add(probe.ExpectNext());
collected.Add(await probe.ExpectNextAsync());
}

collected.Where(i => i <= 4).ShouldOnlyContainInOrder(1, 2, 3, 4);
collected.Where(i => i >= 5).ShouldOnlyContainInOrder(5, 6, 7, 8, 9, 10);

collected.Should().BeEquivalentTo(Enumerable.Range(1, 10).ToArray());
probe.ExpectComplete();
await probe.ExpectCompleteAsync();
}, Materializer);
}

Expand All @@ -109,7 +109,7 @@ public void A_Merge_must_work_with_one_way_merge()
}

[Fact]
public void A_Merge_must_work_with_n_way_merge()
public async Task A_Merge_must_work_with_n_way_merge()
{
var source1 = Source.Single(1);
var source2 = Source.Single(2);
Expand All @@ -135,76 +135,71 @@ public void A_Merge_must_work_with_n_way_merge()
return ClosedShape.Instance;
})).Run(Materializer);

var subscription = probe.ExpectSubscription();
var subscription = await probe.ExpectSubscriptionAsync();

var collected = new List<int>();
for (var i = 1; i <= 5; i++)
{
subscription.Request(1);
collected.Add(probe.ExpectNext());
collected.Add(await probe.ExpectNextAsync());
}

collected.Should().BeEquivalentTo(Enumerable.Range(1, 5));
probe.ExpectComplete();
await probe.ExpectCompleteAsync();
}

[Fact]
public void A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
public async Task A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var subscriber1 = Setup(CompletedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
var subscription1 = subscriber1.ExpectSubscription();
var subscription1 = await subscriber1.ExpectSubscriptionAsync();
subscription1.Request(4);
subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete();
await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();

var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), CompletedPublisher<int>());
var subscription2 = subscriber2.ExpectSubscription();
var subscription2 = await subscriber2.ExpectSubscriptionAsync();
subscription2.Request(4);
subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete();
await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
public async Task A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var subscriber1 = Setup(SoonToCompletePublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
var subscription1 = subscriber1.ExpectSubscription();
var subscription1 = await subscriber1.ExpectSubscriptionAsync();
subscription1.Request(4);
subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete();
await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();

var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToCompletePublisher<int>());
var subscription2 = subscriber2.ExpectSubscription();
var subscription2 = await subscriber2.ExpectSubscriptionAsync();
subscription2.Request(4);
subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete();
await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
}, Materializer);
}

[Fact(Skip = "This is nondeterministic, multiple scenarios can happen")]
public void A_Merge_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
public async Task A_Merge_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{

await this.AssertAllStagesStoppedAsync(() => {
return Task.CompletedTask;
}, Materializer);
}

[Fact(Skip = "This is nondeterministic, multiple scenarios can happen")]
public void A_Merge_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
public async Task A_Merge_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{

await this.AssertAllStagesStoppedAsync(() => {
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Merge_must_pass_along_early_cancellation()
public async Task A_Merge_must_pass_along_early_cancellation()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var up1 = this.CreateManualPublisherProbe<int>();
var up2 = this.CreateManualPublisherProbe<int>();
var down = this.CreateManualSubscriberProbe<int>();
Expand All @@ -224,14 +219,14 @@ public void A_Merge_must_pass_along_early_cancellation()
return ClosedShape.Instance;
})).Run(Materializer);

var downstream = down.ExpectSubscription();
var downstream = await down.ExpectSubscriptionAsync();
downstream.Cancel();
up1.Subscribe(t.Item1);
up2.Subscribe(t.Item2);
var upSub1 = up1.ExpectSubscription();
upSub1.ExpectCancellation();
var upSub2 = up2.ExpectSubscription();
upSub2.ExpectCancellation();
var upSub1 = await up1.ExpectSubscriptionAsync();
await upSub1.ExpectCancellationAsync();
var upSub2 = await up2.ExpectSubscriptionAsync();
await upSub2.ExpectCancellationAsync();
}, Materializer);
}
}
Expand Down