Skip to content

Commit

Permalink
Adapt test to new behavior where replication snapshots happen only wh…
Browse files Browse the repository at this point in the history
…en there are new messages
  • Loading branch information
lhotari committed Jul 18, 2022
1 parent 5fa5be3 commit 9091794
Showing 1 changed file with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -546,22 +547,35 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
.statsInterval(0, TimeUnit.SECONDS)
.build();

// create consumer in r1
@Cleanup
Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();
{
// create consumer in r1
@Cleanup
Consumer<byte[]> consumer = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();

// waiting to replicate topic/subscription to r1->r2
Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
// send one message to trigger replication
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("message".getBytes(StandardCharsets.UTF_8));

assertEquals(readMessages(consumer, new HashSet<>(), 1, false), 1);

// waiting to replicate topic/subscription to r1->r2
Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
}

// unsubscribe replicated subscription in r2
admin2.topics().deleteSubscription(topicName, subscriptionName);
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
assertNull(topic2.getSubscription(subscriptionName));

// close replicator producer in r2
Expand All @@ -586,6 +600,12 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex

// consume 6 messages in r1
Set<String> receivedMessages = new LinkedHashSet<>();
@Cleanup
Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();
assertEquals(readMessages(consumer1, receivedMessages, numMessages, false), numMessages);

// wait for subscription to be replicated
Expand Down

0 comments on commit 9091794

Please sign in to comment.