Skip to content

Commit

Permalink
Adapt test to new behavior where replication snapshots happen only wh…
Browse files Browse the repository at this point in the history
…en there are new messages
  • Loading branch information
lhotari committed Sep 27, 2022
1 parent fbe8ddd commit b58145d
Showing 1 changed file with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -546,22 +547,35 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
.statsInterval(0, TimeUnit.SECONDS)
.build();

// create consumer in r1
@Cleanup
Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();
{
// create consumer in r1
@Cleanup
Consumer<byte[]> consumer = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();

// waiting to replicate topic/subscription to r1->r2
Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
// send one message to trigger replication
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("message".getBytes(StandardCharsets.UTF_8));

assertEquals(readMessages(consumer, new HashSet<>(), 1, false), 1);

// waiting to replicate topic/subscription to r1->r2
Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
}

// unsubscribe replicated subscription in r2
admin2.topics().deleteSubscription(topicName, subscriptionName);
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
assertNull(topic2.getSubscription(subscriptionName));

// close replicator producer in r2
Expand All @@ -586,6 +600,12 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex

// consume 6 messages in r1
Set<String> receivedMessages = new LinkedHashSet<>();
@Cleanup
Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();
assertEquals(readMessages(consumer1, receivedMessages, numMessages, false), numMessages);

// wait for subscription to be replicated
Expand Down

0 comments on commit b58145d

Please sign in to comment.