From af3255c00905490de99c0d4fa9d6b4f49287e380 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Apr 2024 12:35:59 +0800 Subject: [PATCH 1/8] [fix] [broker] Deduplication cursor has infinite backlog causing topic loading timeout --- .../pulsar/broker/service/BrokerService.java | 7 +- .../DeduplicationDisabledBrokerLevelTest.java | 74 +++++++++++++++++++ ...onfigurationCheckOnBrokerStartingTest.java | 58 +++++++++++++++ 3 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationCheckOnBrokerStartingTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 549dfef896cd0..8bc1e0180c975 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -631,8 +631,10 @@ protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpd } protected void startDeduplicationSnapshotMonitor() { + // We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this + // scheduled task runs. int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds(); - if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) { + if (interval > 0) { this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder() .name("deduplication-snapshot-monitor") .numThreads(1) @@ -640,6 +642,9 @@ protected void startDeduplicationSnapshotMonitor() { deduplicationSnapshotMonitor.scheduleAtFixedRate(() -> forEachTopic( Topic::checkDeduplicationSnapshot) , interval, interval, TimeUnit.SECONDS); + } else { + throw new IllegalArgumentException("The config brokerDeduplicationSnapshotFrequencyInSeconds should be" + + " greater than 0"); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java new file mode 100644 index 0000000000000..99c1a73d3a6de --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java @@ -0,0 +1,74 @@ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class DeduplicationDisabledBrokerLevelTest extends ProducerConsumerBase { + + private final int deduplicationSnapshotFrequency = 5; + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + protected void doInitConf() throws Exception { + this.conf.setBrokerDeduplicationEnabled(false); + this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(deduplicationSnapshotFrequency); + } + + @Test + public void testNoBacklogOnDeduplication() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topic); + final PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + final ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + // deduplication enabled: + // broker level: "false" + // topic level: "true". + // So it is enabled. + admin.topicPolicies().setDeduplicationStatus(topic, true); + Awaitility.await().untilAsserted(() -> { + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); + assertNotNull(cursor); + }); + + // Verify: regarding deduplication cursor, messages will be acknowledged automatically. + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + producer.send("1"); + producer.send("2"); + producer.send("3"); + producer.close(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); + Awaitility.await().atMost(Duration.ofSeconds(deduplicationSnapshotFrequency * 3)).untilAsserted(() -> { + PositionImpl LAC = (PositionImpl) ml.getLastConfirmedEntry(); + PositionImpl cursorMD = (PositionImpl) cursor.getMarkDeletedPosition(); + assertTrue(LAC.compareTo(cursorMD) <= 0); + }); + + // cleanup. + admin.topics().delete(topic); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationCheckOnBrokerStartingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationCheckOnBrokerStartingTest.java new file mode 100644 index 0000000000000..451f6d2de76d4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationCheckOnBrokerStartingTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.junit.Assert; +import org.testng.annotations.Test; + +@Test(groups = "broker-naming") +public class ServiceConfigurationCheckOnBrokerStartingTest extends MockedPulsarServiceBaseTest { + + private int deduplicationSnapshotFrequency = 120; + + @Override + protected void setup() throws Exception {} + + protected void setupInternal() throws Exception { + super.internalSetup(); + } + + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + protected void doInitConf() throws Exception { + this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(deduplicationSnapshotFrequency); + } + + @Test + public void testInvalidDeduplicationSnapshotFrequency() throws Exception { + cleanup(); + // set a invalidate value. + deduplicationSnapshotFrequency = 0; + try { + setupInternal(); + Assert.fail("the config brokerDeduplicationSnapshotFrequencyInSeconds is 0, the check should not pass"); + } catch (Exception ex) { + Assert.assertTrue(ex.getMessage().contains("greater than 0")); + } + } +} From 63e6f6da4b4f9a42e438551585eb3574593e6177 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Apr 2024 12:52:08 +0800 Subject: [PATCH 2/8] - --- .../DeduplicationDisabledBrokerLevelTest.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java index 99c1a73d3a6de..67bccf56b2ad8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pulsar.broker.service; import static org.testng.Assert.assertNotNull; @@ -18,7 +36,7 @@ public class DeduplicationDisabledBrokerLevelTest extends ProducerConsumerBase { - private final int deduplicationSnapshotFrequency = 5; + private int deduplicationSnapshotFrequency = 5; @BeforeClass @Override From b08df7ddedb109af9e6ff40bb5c00f7627913e36 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Apr 2024 12:55:10 +0800 Subject: [PATCH 3/8] - --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- .../pulsar/broker/service/persistent/PersistentTopic.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a7deda752fdde..7cbeb5cea41bd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -774,7 +774,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( category = CATEGORY_POLICIES, doc = "How often is the thread pool scheduled to check whether a snapshot needs to be taken." - + "(disable with value 0)" + + "(must be greater than 0)" ) private int brokerDeduplicationSnapshotFrequencyInSeconds = 120; @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index b21cd165402e4..a6b04b99c29df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -208,7 +208,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private volatile List shadowTopics; private final TopicName shadowSourceTopic; - static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup"; + public static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup"; public static boolean isDedupCursorName(String name) { return DEDUPLICATION_CURSOR_NAME.equals(name); From 08d2f0eaa791279463a50e853164526388b9dee3 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Apr 2024 15:00:11 +0800 Subject: [PATCH 4/8] address comments --- .../pulsar/broker/ServiceConfiguration.java | 2 +- .../pulsar/broker/service/BrokerService.java | 3 - ...onfigurationCheckOnBrokerStartingTest.java | 58 ------------------- 3 files changed, 1 insertion(+), 62 deletions(-) delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationCheckOnBrokerStartingTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 7cbeb5cea41bd..a7deda752fdde 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -774,7 +774,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( category = CATEGORY_POLICIES, doc = "How often is the thread pool scheduled to check whether a snapshot needs to be taken." - + "(must be greater than 0)" + + "(disable with value 0)" ) private int brokerDeduplicationSnapshotFrequencyInSeconds = 120; @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8bc1e0180c975..e3bbf11287f57 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -642,9 +642,6 @@ protected void startDeduplicationSnapshotMonitor() { deduplicationSnapshotMonitor.scheduleAtFixedRate(() -> forEachTopic( Topic::checkDeduplicationSnapshot) , interval, interval, TimeUnit.SECONDS); - } else { - throw new IllegalArgumentException("The config brokerDeduplicationSnapshotFrequencyInSeconds should be" - + " greater than 0"); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationCheckOnBrokerStartingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationCheckOnBrokerStartingTest.java deleted file mode 100644 index 451f6d2de76d4..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationCheckOnBrokerStartingTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.naming; - -import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.junit.Assert; -import org.testng.annotations.Test; - -@Test(groups = "broker-naming") -public class ServiceConfigurationCheckOnBrokerStartingTest extends MockedPulsarServiceBaseTest { - - private int deduplicationSnapshotFrequency = 120; - - @Override - protected void setup() throws Exception {} - - protected void setupInternal() throws Exception { - super.internalSetup(); - } - - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - - protected void doInitConf() throws Exception { - this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(deduplicationSnapshotFrequency); - } - - @Test - public void testInvalidDeduplicationSnapshotFrequency() throws Exception { - cleanup(); - // set a invalidate value. - deduplicationSnapshotFrequency = 0; - try { - setupInternal(); - Assert.fail("the config brokerDeduplicationSnapshotFrequencyInSeconds is 0, the check should not pass"); - } catch (Exception ex) { - Assert.assertTrue(ex.getMessage().contains("greater than 0")); - } - } -} From 2745648e17b28e21bdde3d499c443e3a61d6d05d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Apr 2024 16:12:17 +0800 Subject: [PATCH 5/8] fix import --- .../broker/service/DeduplicationDisabledBrokerLevelTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java index 67bccf56b2ad8..d783bc816ea72 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java @@ -29,7 +29,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; -import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; From af56a8bff89ecaab74a626df8b8caad791ea6daa Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Apr 2024 17:16:04 +0800 Subject: [PATCH 6/8] add a new fix --- .../persistent/MessageDeduplication.java | 2 +- .../DeduplicationDisabledBrokerLevelTest.java | 70 +++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 802dd91796127..7a42d2b58b28f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -182,7 +182,7 @@ public void readEntriesComplete(List entries, Object ctx) { highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); - + snapshotCounter += md.getNumMessagesInBatch(); entry.release(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java index d783bc816ea72..fe6e6c8c81f69 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import java.time.Duration; @@ -25,11 +26,13 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.MessageDeduplication; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -37,6 +40,7 @@ public class DeduplicationDisabledBrokerLevelTest extends ProducerConsumerBase { private int deduplicationSnapshotFrequency = 5; + private int brokerDeduplicationEntriesInterval = 1000; @BeforeClass @Override @@ -54,6 +58,7 @@ protected void cleanup() throws Exception { protected void doInitConf() throws Exception { this.conf.setBrokerDeduplicationEnabled(false); this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(deduplicationSnapshotFrequency); + this.conf.setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval); } @Test @@ -89,4 +94,69 @@ public void testNoBacklogOnDeduplication() throws Exception { // cleanup. admin.topics().delete(topic); } + + @Test + public void testSnapshotCounterAfterUnload() throws Exception { + final int originalDeduplicationSnapshotFrequency = deduplicationSnapshotFrequency; + deduplicationSnapshotFrequency = 3600; + cleanup(); + setup(); + + // Create a topic and wait deduplication is started. + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topic); + final PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger(); + admin.topicPolicies().setDeduplicationStatus(topic, true); + Awaitility.await().untilAsserted(() -> { + ManagedCursorImpl cursor1 = (ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); + assertNotNull(cursor1); + }); + final MessageDeduplication deduplication1 = persistentTopic1.getMessageDeduplication(); + + // 1. Send 999 messages, it is less than "brokerDeduplicationEntriesIntervaddl". + // 2. Unload topic. + // 3. Send 1 messages, there are 1099 messages have not been snapshot now. + // 4. Verify the snapshot has been taken. + // step 1. + final Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) { + producer.send(i + ""); + } + int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter"); + assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1); + admin.topics().unload(topic); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) persistentTopic2.getManagedLedger(); + MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication(); + admin.topicPolicies().setDeduplicationStatus(topic, true); + Awaitility.await().untilAsserted(() -> { + ManagedCursorImpl cursor = (ManagedCursorImpl) ml2.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); + assertNotNull(cursor); + }); + // step 2. + // Note: repeat read will make the counter is larger than expected. + int snapshotCounter2 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter"); + assertTrue(snapshotCounter2 >= brokerDeduplicationEntriesInterval - 1); + // step 3. + producer.send((brokerDeduplicationEntriesInterval - 1) + ""); + // step 4. + final ManagedCursorImpl cursor2 = (ManagedCursorImpl) ml2.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); + Awaitility.await().untilAsserted(() -> { + int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter"); + assertTrue(snapshotCounter3 < snapshotCounter2); + PositionImpl LAC = (PositionImpl) ml2.getLastConfirmedEntry(); + PositionImpl cursorMD = (PositionImpl) cursor2.getMarkDeletedPosition(); + assertTrue(LAC.compareTo(cursorMD) <= 0); + }); + + // cleanup. + producer.close(); + admin.topics().delete(topic); + deduplicationSnapshotFrequency = originalDeduplicationSnapshotFrequency; + cleanup(); + setup(); + } } From 0883a21b3131866615f7b83c534377a6ad8a4fca Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Apr 2024 17:22:53 +0800 Subject: [PATCH 7/8] add a new fix --- .../pulsar/broker/service/persistent/MessageDeduplication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 7a42d2b58b28f..b14ab79e4fd33 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -182,7 +182,7 @@ public void readEntriesComplete(List entries, Object ctx) { highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); - snapshotCounter += md.getNumMessagesInBatch(); + snapshotCounter++; entry.release(); } From 4a438044a98a9d122196540a8ade336d35273236 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 11 Apr 2024 17:45:51 +0800 Subject: [PATCH 8/8] add a new fix --- .../persistent/MessageDeduplication.java | 16 ++++++++---- .../DeduplicationDisabledBrokerLevelTest.java | 25 +++++++++---------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index b14ab79e4fd33..e508661364d74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -157,9 +157,14 @@ private CompletableFuture recoverSequenceIdsMap() { // Replay all the entries and apply all the sequence ids updates log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); replayCursor(future); - return future; + return future.thenAccept(lastPosition -> { + if (lastPosition != null && snapshotCounter >= snapshotInterval) { + snapshotCounter = 0; + takeSnapshot(lastPosition); + } + }); } /** @@ -168,11 +173,11 @@ private CompletableFuture recoverSequenceIdsMap() { * * @param future future to trigger when the replay is complete */ - private void replayCursor(CompletableFuture future) { + private void replayCursor(CompletableFuture future) { managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { - + Position lastPosition = null; for (Entry entry : entries) { ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); @@ -183,6 +188,7 @@ public void readEntriesComplete(List entries, Object ctx) { highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); snapshotCounter++; + lastPosition = entry.getPosition(); entry.release(); } @@ -191,7 +197,7 @@ public void readEntriesComplete(List entries, Object ctx) { pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying - future.complete(null); + future.complete(lastPosition); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java index fe6e6c8c81f69..2ce4ea9b00b2e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import java.time.Duration; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -74,7 +75,8 @@ public void testNoBacklogOnDeduplication() throws Exception { // So it is enabled. admin.topicPolicies().setDeduplicationStatus(topic, true); Awaitility.await().untilAsserted(() -> { - ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); + ManagedCursorImpl cursor = + (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); assertNotNull(cursor); }); @@ -110,7 +112,8 @@ public void testSnapshotCounterAfterUnload() throws Exception { final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger(); admin.topicPolicies().setDeduplicationStatus(topic, true); Awaitility.await().untilAsserted(() -> { - ManagedCursorImpl cursor1 = (ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); + ManagedCursorImpl cursor1 = + (ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); assertNotNull(cursor1); }); final MessageDeduplication deduplication1 = persistentTopic1.getMessageDeduplication(); @@ -133,23 +136,19 @@ public void testSnapshotCounterAfterUnload() throws Exception { MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication(); admin.topicPolicies().setDeduplicationStatus(topic, true); Awaitility.await().untilAsserted(() -> { - ManagedCursorImpl cursor = (ManagedCursorImpl) ml2.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); + ManagedCursorImpl cursor = + (ManagedCursorImpl) ml2.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); assertNotNull(cursor); }); - // step 2. - // Note: repeat read will make the counter is larger than expected. - int snapshotCounter2 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter"); - assertTrue(snapshotCounter2 >= brokerDeduplicationEntriesInterval - 1); // step 3. - producer.send((brokerDeduplicationEntriesInterval - 1) + ""); + producer.send("last message"); + ml2.trimConsumedLedgersInBackground(new CompletableFuture<>()); // step 4. - final ManagedCursorImpl cursor2 = (ManagedCursorImpl) ml2.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); Awaitility.await().untilAsserted(() -> { int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter"); - assertTrue(snapshotCounter3 < snapshotCounter2); - PositionImpl LAC = (PositionImpl) ml2.getLastConfirmedEntry(); - PositionImpl cursorMD = (PositionImpl) cursor2.getMarkDeletedPosition(); - assertTrue(LAC.compareTo(cursorMD) <= 0); + assertTrue(snapshotCounter3 < brokerDeduplicationEntriesInterval); + // Verify: the previous ledger will be removed because all messages have been acked. + assertEquals(ml2.getLedgersInfo().size(), 1); }); // cleanup.