diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index f34144deb0ab0..394fad21ae6dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -248,7 +248,7 @@ protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { } startProducer(); }).exceptionally(ex -> { - log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and" + log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and" + " trigger a terminate. Replicator state: {}", localTopicName, replicatorId, STATE_UPDATER.get(this), ex); terminate(); @@ -377,9 +377,13 @@ public CompletableFuture terminate() { this.producer = null; // set the cursor as inactive. disableReplicatorRead(); + // release resources. + doReleaseResources(); }); } + protected void doReleaseResources() {} + protected boolean tryChangeStatusToTerminating() { if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){ return true; @@ -468,4 +472,8 @@ protected ImmutablePair compareSetAndGetState(State expect, Stat } return compareSetAndGetState(expect, update); } + + public boolean isTerminated() { + return state == State.Terminating || state == State.Terminated; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 8130b855b4e4a..5c314397da80e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -51,4 +51,6 @@ default Optional getRateLimiter() { boolean isConnected(); long getNumberOfEntriesInBacklog(); + + boolean isTerminated(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index c6153e81a7a6d..9f21ee38e04b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -450,7 +450,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { long waitTimeMillis = readFailureBackoff.next(); if (exception instanceof CursorAlreadyClosedException) { - log.error("[{}] Error reading entries because replicator is" + log.warn("[{}] Error reading entries because replicator is" + " already deleted and cursor is already closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); // replicator is already deleted and cursor is already closed so, producer should also be disconnected. @@ -570,7 +570,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx, exception.getMessage(), exception); if (exception instanceof CursorAlreadyClosedException) { - log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already" + log.warn("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already" + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); // replicator is already deleted and cursor is already closed so, producer should also be disconnected. terminate(); @@ -698,6 +698,11 @@ public boolean isConnected() { return producer != null && producer.isConnected(); } + @Override + protected void doReleaseResources() { + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + } + private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class); @VisibleForTesting diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7d3ca226fe4b5..fa049e1a5bcc3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1697,6 +1697,7 @@ public CompletableFuture checkReplication() { return deleteForcefully(); } + removeTerminatedReplicators(replicators); List> futures = new ArrayList<>(); // Check for missing replicators @@ -1735,6 +1736,8 @@ private CompletableFuture checkShadowReplication() { if (log.isDebugEnabled()) { log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics); } + + removeTerminatedReplicators(shadowReplicators); List> futures = new ArrayList<>(); // Check for missing replicators @@ -1885,19 +1888,30 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma if (replicationClient == null) { return; } - Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { - try { - return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, - remoteCluster, brokerService, (PulsarClientImpl) replicationClient); - } catch (PulsarServerException e) { - log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); + lock.readLock().lock(); + try { + if (isClosingOrDeleting) { + // Whether is "transferring" or not, do not create new replicator. + log.info("[{}] Skip to create replicator because this topic is closing." + + " remote cluster: {}. State of transferring : {}", + topic, remoteCluster, transferring); + return; } - return null; - }); - - // clean up replicator if startup is failed - if (replicator == null) { - replicators.removeNullValue(remoteCluster); + Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { + try { + return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, + remoteCluster, brokerService, (PulsarClientImpl) replicationClient); + } catch (PulsarServerException e) { + log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); + } + return null; + }); + // clean up replicator if startup is failed + if (replicator == null) { + replicators.removeNullValue(remoteCluster); + } + } finally { + lock.readLock().unlock(); } }); } @@ -3709,9 +3723,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { - subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; + subscriptions.values().forEach(sub -> sub.resumeAfterFence()); + unfenceReplicatorsToResume(); + } + + private void unfenceReplicatorsToResume() { + checkReplication(); + checkShadowReplication(); + } + + private void removeTerminatedReplicators(ConcurrentOpenHashMap replicators) { + Map terminatedReplicators = new HashMap<>(); + replicators.forEach((cluster, replicator) -> { + if (replicator.isTerminated()) { + terminatedReplicators.put(cluster, replicator); + } + }); + terminatedReplicators.entrySet().forEach(entry -> { + replicators.remove(entry.getKey(), entry.getValue()); + }); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 35073575f34ed..9b8b567af081b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -20,18 +20,21 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Sets; import io.netty.util.concurrent.FastThreadLocalThread; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; import java.util.Arrays; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -48,6 +51,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; @@ -486,4 +490,166 @@ public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throw admin1.topics().deletePartitionedTopic(topicName); admin2.topics().deletePartitionedTopic(topicName); } + + /** + * See the description and execution flow: https://github.com/apache/pulsar/pull/21948. + * Steps: + * 1.Create topic, does not enable replication now. + * - The topic will be loaded in the memory. + * 2.Enable namespace level replication. + * - Broker creates a replicator, and the internal producer of replicator is starting. + * - We inject an error to make the internal producer fail to connect,after few seconds, it will retry to start. + * 3.Unload bundle. + * - Starting to close the topic. + * - The replicator will be closed, but it will not close the internal producer, because the producer has not + * been created successfully. + * - We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck. So the topic is still + * in the process of being closed now. + * 4.Internal producer retry to connect. + * - At the next retry, it connected successful. Since the state of "repl.cursor" is not "Closed", this producer + * will not be closed now. + * 5.Topic closed. + * - Cancel the stuck of closing the "repl.cursor". + * - The topic is wholly closed. + * 6.Verify: the delayed created internal producer will be closed. In other words, there is no producer is connected + * to the remote cluster. + */ + @Test + public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception { + final String namespaceName = defaultTenant + "/" + UUID.randomUUID().toString().replaceAll("-", ""); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp_"); + // 1.Create topic, does not enable replication now. + admin1.namespaces().createNamespace(namespaceName); + admin2.namespaces().createNamespace(namespaceName); + admin1.topics().createNonPartitionedTopic(topicName); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + + // We inject an error to make the internal producer fail to connect. + // The delay time of next retry to create producer is below: + // 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s... + // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. + final AtomicInteger createProducerCounter = new AtomicInteger(); + final int failTimes = 6; + injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { + if (topicName.equals(producerCnf.getTopicName())) { + // There is a switch to determine create producer successfully or not. + if (createProducerCounter.incrementAndGet() > failTimes) { + return originalProducer; + } + log.info("Retry create replicator.producer count: {}", createProducerCounter); + // Release producer and fail callback. + originalProducer.closeAsync(); + throw new RuntimeException("mock error"); + } + return originalProducer; + }); + + // 2.Enable namespace level replication. + admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1, cluster2)); + AtomicReference replicator = new AtomicReference(); + Awaitility.await().untilAsserted(() -> { + assertFalse(persistentTopic.getReplicators().isEmpty()); + replicator.set( + (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next()); + // Since we inject a producer creation error, the replicator can not start successfully. + assertFalse(replicator.get().isConnected()); + }); + + // We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck, until the internal + // producer of the replicator started. + SpyCursor spyCursor = + spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName()); + CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor); + + // 3.Unload bundle: call "topic.close(false)". + // Stuck start new producer, until the state of replicator change to Stopped. + // The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully. + Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + assertTrue(createProducerCounter.get() >= failTimes); + }); + CompletableFuture topicCloseFuture = persistentTopic.close(true); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + String state = String.valueOf(replicator.get().getState()); + log.error("replicator state: {}", state); + assertTrue(state.equals("Disconnected") || state.equals("Terminated")); + }); + + // 5.Delay close cursor, until "replicator.producer" create successfully. + // The next once retry time of create "replicator.producer" will be 3.2s. + Thread.sleep(4 * 1000); + log.info("Replicator.state: {}", replicator.get().getState()); + cursorCloseSignal.startClose(); + cursorCloseSignal.startCallback(); + // Wait for topic close successfully. + topicCloseFuture.join(); + + // 6. Verify there is no orphan producer on the remote cluster. + Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + assertEquals(persistentTopic2.getProducers().size(), 0); + Assert.assertFalse(replicator.get().isConnected()); + }); + + // cleanup. + cleanupTopics(namespaceName, () -> { + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + }); + admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1)); + admin1.namespaces().deleteNamespace(namespaceName); + admin2.namespaces().deleteNamespace(namespaceName); + } + + @Test + public void testUnFenceTopicToReuse() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp"); + // Wait for replicator started. + Producer producer1 = client1.newProducer(Schema.STRING).topic(topicName).create(); + waitReplicatorStarted(topicName); + + // Inject an error to make topic close fails. + final String mockProducerName = UUID.randomUUID().toString(); + final org.apache.pulsar.broker.service.Producer mockProducer = + mock(org.apache.pulsar.broker.service.Producer.class); + doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error"))) + .when(mockProducer).disconnect(any()); + doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error"))) + .when(mockProducer).disconnect(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + persistentTopic.getProducers().put(mockProducerName, mockProducer); + + // Do close. + GeoPersistentReplicator replicator1 = + (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); + try { + persistentTopic.close(true, false).join(); + fail("Expected close fails due to a producer close fails"); + } catch (Exception ex) { + log.info("Expected error: {}", ex.getMessage()); + } + + // Broker will call `topic.unfenceTopicToResume` if close clients fails. + // Verify: the replicator will be re-created. + Awaitility.await().untilAsserted(() -> { + assertTrue(producer1.isConnected()); + GeoPersistentReplicator replicator2 = + (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); + assertNotEquals(replicator1, replicator2); + assertFalse(replicator1.isConnected()); + assertFalse(replicator1.producer != null && replicator1.producer.isConnected()); + assertTrue(replicator2.isConnected()); + assertTrue(replicator2.producer != null && replicator2.producer.isConnected()); + }); + + // cleanup. + persistentTopic.getProducers().remove(mockProducerName, mockProducer); + producer1.close(); + cleanupTopics(() -> { + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 9e4214897fb90..b4eed00c4470f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -150,12 +150,16 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { } protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception { - waitChangeEventsInit(replicatedNamespace); - admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Collections.singleton(cluster1)); - admin1.namespaces().unload(replicatedNamespace); + cleanupTopics(replicatedNamespace, cleanupTopicAction); + } + + protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAction) throws Exception { + waitChangeEventsInit(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1)); + admin1.namespaces().unload(namespace); cleanupTopicAction.run(); - admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1, cluster2)); - waitChangeEventsInit(replicatedNamespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + waitChangeEventsInit(namespace); } protected void waitChangeEventsInit(String namespace) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index c809a2e371fc5..fa12eba1c6611 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -154,7 +154,7 @@ public Object[][] partitionedTopicProvider() { return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } - @Test + @Test(priority = Integer.MAX_VALUE) public void testConfigChange() throws Exception { log.info("--- Starting ReplicatorTest::testConfigChange ---"); // This test is to verify that the config change on global namespace is successfully applied in broker during