Skip to content

Commit

Permalink
Skip creating a subscription replication snapshot if no messages have…
Browse files Browse the repository at this point in the history
… been published after the topic gets activated on a broker (#16618)

* Skip creating a replication snapshot if no messages have been published

* Adapt test to new behavior where replication snapshots happen only when there are new messages

(cherry picked from commit 43ad6f9)
  • Loading branch information
lhotari authored and congbobo184 committed Dec 1, 2022
1 parent 0245a2f commit 594ee5b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
private void startNewSnapshot() {
cleanupTimedOutSnapshots();

if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime
|| topic.getLastDataMessagePublishedTimestamp() == 0) {
// There was no message written since the last snapshot, we can skip creating a new snapshot
if (log.isDebugEnabled()) {
log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -173,6 +174,14 @@ public void testReplicationSnapshotStopWhenNoTraffic() throws Exception {
// create subscription in r1
createReplicatedSubscription(client1, topicName, subscriptionName, true);

// Validate that no snapshots are created before messages are published
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(topicName, false).get().get();
ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
// no snapshot should have been created before any messages are published
assertTrue(rsc1.getLastCompletedSnapshotId().isEmpty());

@Cleanup
PulsarClient client2 = PulsarClient.builder()
.serviceUrl(url2.toString())
Expand All @@ -196,9 +205,6 @@ public void testReplicationSnapshotStopWhenNoTraffic() throws Exception {
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());

// In R1
PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(topicName, false).get().get();
ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
Position p1 = t1.getLastPosition();
String snapshot1 = rsc1.getLastCompletedSnapshotId().get();

Expand Down Expand Up @@ -479,22 +485,35 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
.statsInterval(0, TimeUnit.SECONDS)
.build();

// create consumer in r1
@Cleanup
Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();
{
// create consumer in r1
@Cleanup
Consumer<byte[]> consumer = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();

// waiting to replicate topic/subscription to r1->r2
Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
// send one message to trigger replication
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("message".getBytes(StandardCharsets.UTF_8));

assertEquals(readMessages(consumer, new HashSet<>(), 1, false), 1);

// waiting to replicate topic/subscription to r1->r2
Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName));
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected()));
Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
}

// unsubscribe replicated subscription in r2
admin2.topics().deleteSubscription(topicName, subscriptionName);
final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
assertNull(topic2.getSubscription(subscriptionName));

// close replicator producer in r2
Expand All @@ -519,6 +538,12 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex

// consume 6 messages in r1
Set<String> receivedMessages = new LinkedHashSet<>();
@Cleanup
Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(true)
.subscribe();
assertEquals(readMessages(consumer1, receivedMessages, numMessages, false), numMessages);

// wait for subscription to be replicated
Expand Down

0 comments on commit 594ee5b

Please sign in to comment.