Skip to content

Commit

Permalink
Remove test intermittency by waiting on egress publishers to be conne…
Browse files Browse the repository at this point in the history
…cted
  • Loading branch information
eliquinox committed Apr 9, 2022
1 parent 83571ac commit 62802d3
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public <T> T getPublisherFor(final Class<T> clazz)
return registry.getPublisherFor(clazz);
}

public boolean egressConnected()
{
return registry.egressConnected();
}

public static class Configuration
{
private ClusteredService clusteredService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public void registerIngressSubscriberInvoker(final AbstractSubscriberInvoker<?>
.forEach(subscriberInterface -> invokerByName.put(subscriberInterface + "__IngressPublisher", subscriberInvoker));
}

public boolean egressConnected()
{
return clientSessionPublicationByName.values().stream().allMatch(ClientSessionPublication::isConnected);
}

public void onSessionOpen(final ClientSession session)
{
final byte[] encodedPrincipal = session.encodedPrincipal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ public void clusterIngress()
.clusteredService(clusteredService)
.registerIngressSubscriber(SimpleEvents.class, simpleEvents)
.registerIngressSubscriber(SampleEvents.class, sampleEvents)
.registerEgressPublisher(SimpleEvents.class)
.registerEgressPublisher(SampleEvents.class)
.create();

clusterNode = new TestClusterNode(aeronicClusteredService,true);
clusterNode = new TestClusterNode(aeronicClusteredService, true);

final SimpleEvents simpleEventsPublisher = aeronic.createClusterIngressPublisher(SimpleEvents.class, INGRESS_CHANNEL);
final SampleEvents sampleEventsPublisher = aeronic.createClusterIngressPublisher(SampleEvents.class, INGRESS_CHANNEL);

aeronic.awaitUntilPubsAndSubsConnect();

simpleEventsPublisher.onEvent(101L);
sampleEventsPublisher.onEvent(201L);

Expand Down Expand Up @@ -108,6 +108,8 @@ public void clusterEgress()
aeronic.registerClusterEgressSubscriber(SimpleEvents.class, simpleEvents, INGRESS_CHANNEL);
aeronic.registerClusterEgressSubscriber(SampleEvents.class, sampleEvents, INGRESS_CHANNEL);
aeronic.start();
aeronic.awaitUntilPubsAndSubsConnect();
await().timeout(Duration.ofSeconds(1)).until(aeronicClusteredService::egressConnected);

simpleEventsPublisher.onEvent(101L);
sampleEventsPublisher.onEvent(202L);
Expand Down Expand Up @@ -144,6 +146,8 @@ public void clusterIngressAndEgress()
aeronic.registerClusterEgressSubscriber(SimpleEvents.class, simpleEvents, INGRESS_CHANNEL);
aeronic.registerClusterEgressSubscriber(SampleEvents.class, sampleEvents, INGRESS_CHANNEL);
aeronic.start();
aeronic.awaitUntilPubsAndSubsConnect();
await().timeout(Duration.ofSeconds(1)).until(aeronicClusteredService::egressConnected);

// cluster -> client
clusterEgressSimpleEventsPublisher.onEvent(101L);
Expand Down

0 comments on commit 62802d3

Please sign in to comment.