Skip to content

Commit

Permalink
[42-74] GraphBroadcastSpec (#6589)
Browse files Browse the repository at this point in the history
* [42-74] `GraphBroadcastSpec`

* Changes to `async` TestKit
  • Loading branch information
eaba authored Apr 4, 2023
1 parent 48c8077 commit 81a710a
Showing 1 changed file with 72 additions and 80 deletions.
152 changes: 72 additions & 80 deletions src/core/Akka.Streams.Tests/Dsl/GraphBroadcastSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ public GraphBroadcastSpec(ITestOutputHelper helper) : base(helper)
}

[Fact]
public void A_Broadcast_must_broadcast_to_other_subscriber()
public async Task A_Broadcast_must_broadcast_to_other_subscriber()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<int>();
RunnableGraph.FromGraph(GraphDsl.Create (b =>
RunnableGraph.FromGraph(GraphDsl.Create(b =>
{
var broadcast = b.Add(new Broadcast<int>(2));
var source = Source.From(Enumerable.Range(1, 3));
Expand All @@ -51,50 +50,49 @@ public void A_Broadcast_must_broadcast_to_other_subscriber()
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();

sub1.Request(1);
sub2.Request(2);

c1.ExpectNext(1).ExpectNoMsg(TimeSpan.FromMilliseconds(100));
c2.ExpectNext( 1, 2).ExpectNoMsg(TimeSpan.FromMilliseconds(100));
await c1.ExpectNext(1).ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
await c2.ExpectNext(1, 2).ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
sub1.Request(3);
c1.ExpectNext( 2, 3).ExpectComplete();
await c1.ExpectNext(2, 3).ExpectCompleteAsync();
sub2.Request(3);
c2.ExpectNext(3).ExpectComplete();
await c2.ExpectNext(3).ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_work_with_one_way_broadcast()
public async Task A_Broadcast_must_work_with_one_way_broadcast()
{
this.AssertAllStagesStopped(() =>
{
var t = Source.FromGraph(GraphDsl.Create(b =>
{
var broadcast = b.Add(new Broadcast<int>(1));
await this.AssertAllStagesStoppedAsync(() => {
var t = Source.FromGraph(GraphDsl.Create(b =>
{
var broadcast = b.Add(new Broadcast<int>(1));
var source = b.Add(Source.From(Enumerable.Range(1, 3)));

b.From(source).To(broadcast.In);

return new SourceShape<int>(broadcast.Out(0));
})).RunAggregate(new List<int>(), (list, i) =>
{
list.Add(i);
return list;
return new SourceShape<int>(broadcast.Out(0));
})).RunAggregate(new List<int>(), (list, i) =>
{
list.Add(i);
return list;
}, Materializer);

t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
t.Result.Should().BeEquivalentTo(new[] {1, 2, 3});
t.Result.Should().BeEquivalentTo(new[] { 1, 2, 3 });
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Broadcast_must_work_with_n_way_broadcast()
public async Task A_Broadcast_must_work_with_n_way_broadcast()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var headSink = Sink.First<IEnumerable<int>>();

var t = RunnableGraph.FromGraph(GraphDsl.Create(headSink, headSink, headSink, headSink, headSink, ValueTuple.Create,
Expand All @@ -116,14 +114,14 @@ public void A_Broadcast_must_work_with_n_way_broadcast()
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
foreach (var list in task.Result)
list.Should().BeEquivalentTo(new[] { 1, 2, 3 });
return Task.CompletedTask;
}, Materializer);
}

[Fact(Skip="We don't have enough overloads for GraphDsl.Create")]
public void A_Broadcast_must_with_22_way_broadcast()
public async Task A_Broadcast_must_with_22_way_broadcast()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
//var headSink = Sink.First<IEnumerable<int>>();

//var t = RunnableGraph.FromGraph(GraphDsl.Create(headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, ValueTuple.Create,
Expand Down Expand Up @@ -162,15 +160,14 @@ public void A_Broadcast_must_with_22_way_broadcast()
//task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
//foreach (var list in task.Result)
// list.Should().BeEquivalentTo(new[] { 1, 2, 3 });

return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Broadcast_must_produce_to_other_even_though_downstream_cancels()
public async Task A_Broadcast_must_produce_to_other_even_though_downstream_cancels()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<int>();
RunnableGraph.FromGraph(GraphDsl.Create(b =>
Expand All @@ -187,20 +184,19 @@ public void A_Broadcast_must_produce_to_other_even_though_downstream_cancels()
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
sub1.Cancel();
var sub2 = c2.ExpectSubscription();
var sub2 = await c2.ExpectSubscriptionAsync();
sub2.Request(3);
c2.ExpectNext( 1, 2, 3);
c2.ExpectComplete();
c2.ExpectNext(1, 2, 3);
await c2.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_produce_to_downstream_even_though_other_cancels()
public async Task A_Broadcast_must_produce_to_downstream_even_though_other_cancels()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<int>();
RunnableGraph.FromGraph(GraphDsl.Create(b =>
Expand All @@ -217,20 +213,19 @@ public void A_Broadcast_must_produce_to_downstream_even_though_other_cancels()
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();
sub2.Cancel();
sub1.Request(3);
c1.ExpectNext( 1, 2, 3);
c1.ExpectComplete();
c1.ExpectNext(1, 2, 3);
await c1.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_cancel_upstream_when_downstreams_cancel()
public async Task A_Broadcast_must_cancel_upstream_when_downstreams_cancel()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var p1 = this.CreateManualPublisherProbe<int>();
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<int>();
Expand All @@ -248,30 +243,29 @@ public void A_Broadcast_must_cancel_upstream_when_downstreams_cancel()
return ClosedShape.Instance;
})).Run(Materializer);

var bSub = p1.ExpectSubscription();
var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var bSub = await p1.ExpectSubscriptionAsync();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();

sub1.Request(3);
sub2.Request(3);
p1.ExpectRequest(bSub, 16);
await p1.ExpectRequestAsync(bSub, 16);
bSub.SendNext(1);
c1.ExpectNext(1);
c2.ExpectNext(1);
await c1.ExpectNextAsync(1);
await c2.ExpectNextAsync(1);
bSub.SendNext(2);
c1.ExpectNext(2);
c2.ExpectNext(2);
await c1.ExpectNextAsync(2);
await c2.ExpectNextAsync(2);
sub1.Cancel();
sub2.Cancel();
bSub.ExpectCancellation();
await bSub.ExpectCancellationAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_pass_along_early_cancellation()
public async Task A_Broadcast_must_pass_along_early_cancellation()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<int>();

Expand All @@ -287,22 +281,21 @@ public void A_Broadcast_must_pass_along_early_cancellation()

var up = this.CreateManualPublisherProbe<int>();

var downSub1 = c1.ExpectSubscription();
var downSub2 = c2.ExpectSubscription();
var downSub1 = await c1.ExpectSubscriptionAsync();
var downSub2 = await c2.ExpectSubscriptionAsync();
downSub1.Cancel();
downSub2.Cancel();

up.Subscribe(s);
var upSub = up.ExpectSubscription();
upSub.ExpectCancellation();
var upSub = await up.ExpectSubscriptionAsync();
await upSub.ExpectCancellationAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_AltoTo_must_broadcast()
public async Task A_Broadcast_must_AltoTo_must_broadcast()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var p = this.SinkProbe<int>();
var p2 = this.SinkProbe<int>();

Expand All @@ -315,20 +308,19 @@ public void A_Broadcast_must_AltoTo_must_broadcast()
var ps1 = t.Item1;
var ps2 = t.Item2;

ps1.Request(6);
ps2.Request(6);
ps1.ExpectNext( 1, 2, 3, 4, 5, 6);
ps2.ExpectNext( 1, 2, 3, 4, 5, 6);
ps1.ExpectComplete();
ps2.ExpectComplete();
await ps1.RequestAsync(6);
await ps2.RequestAsync(6);
ps1.ExpectNext(1, 2, 3, 4, 5, 6);
ps2.ExpectNext(1, 2, 3, 4, 5, 6);
await ps1.ExpectCompleteAsync();
await ps2.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels()
public async Task A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var p = this.SinkProbe<int>();
var p2 = this.SinkProbe<int>();

Expand All @@ -340,11 +332,11 @@ public void A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels()

var ps1 = t.Item1;
var ps2 = t.Item2;
ps2.Request(6);

await ps2.RequestAsync(6);
ps1.Cancel();
ps2.ExpectNext( 1, 2, 3, 4, 5, 6);
ps2.ExpectComplete();
ps2.ExpectNext(1, 2, 3, 4, 5, 6);
await ps2.ExpectCompleteAsync();
}, Materializer);
}
}
Expand Down

0 comments on commit 81a710a

Please sign in to comment.