diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 15704d49ad325..1aac0ba5e7c70 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1836,4 +1836,9 @@ public void shutdownNow() { protected BrokerService newBrokerService(PulsarService pulsar) throws Exception { return new BrokerService(pulsar, ioEventLoopGroup); } + + @VisibleForTesting + public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) { + this.transactionBufferProvider = transactionBufferProvider; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 090d5ce0b540d..e8e94cbee56c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -988,6 +988,17 @@ public CompletableFuture> getTopic(final TopicName topicName, bo try { CompletableFuture> topicFuture = topics.get(topicName.toString()); if (topicFuture != null) { + if (topicFuture.isCompletedExceptionally()) { + try { + topicFuture.join(); + } catch (Exception ex) { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx == FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION) { + return CompletableFuture.failedFuture(new TimeoutException("The previous loading task" + + " has not finished yet even through it has timeout, please retry again.")); + } + } + } if (topicFuture.isCompletedExceptionally() || (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) { // Exceptional topics should be recreated. @@ -1608,6 +1619,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " topic", topic, FutureUtil.getException(topicFuture)); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, ex) -> { + topics.remove(topic, topicFuture); if (ex != null) { log.warn("[{}] Get an error when closing topic.", topic, ex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 4f5ab783374d0..76e4be48b2fee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.api; +import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME; +import static org.junit.Assert.assertNotNull; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import java.util.List; @@ -25,13 +27,18 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TopicPolicyListener; +import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; +import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.awaitility.Awaitility; @@ -114,4 +121,68 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception { admin.topics().delete(tpName, false); pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); } + + @Test + public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception { + // Make the topic loading timeout faster. + long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds(); + int topicLoadTimeoutSeconds = 1; + pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); + pulsar.getConfig().setBrokerDeduplicationEnabled(true); + pulsar.getConfig().setTransactionCoordinatorEnabled(true); + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); + + // Mock message deduplication recovery speed topicLoadTimeoutSeconds + AtomicBoolean stopDelay = new AtomicBoolean(); + String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + + TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; + mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> { + if (mlPath.equals(path) && !stopDelay.get()) { + log.info("Topic load timeout: " + path); + return true; + } + return false; + }); + + // First load topic will trigger timeout + // The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close. + // Here, we simulate a sleep to ensure that the ledger is not immediately closed. + TransactionBufferProvider mockTransactionBufferProvider = new TransactionBufferProvider() { + @Override + public TransactionBuffer newTransactionBuffer(Topic originTopic) { + return new TransactionBufferDisable(originTopic) { + @SneakyThrows + @Override + public CompletableFuture closeAsync() { + Thread.sleep(500); + return super.closeAsync(); + } + }; + } + }; + TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider(); + pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider); + CompletableFuture> firstLoad = pulsar.getBrokerService().getTopic(tpName, true); + Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + // assert first create topic timeout + .untilAsserted(() -> { + assertTrue(firstLoad.isCompletedExceptionally()); + }); + + // Once the first load topic times out, immediately to load the topic again. + stopDelay.set(true); + Producer producer = pulsarClient.newProducer().topic(tpName).create(); + for (int i = 0; i < 10; i++) { + MessageId send = producer.send("msg".getBytes()); + Thread.sleep(100); + assertNotNull(send); + } + + // set to back + pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider); + pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); + pulsar.getConfig().setBrokerDeduplicationEnabled(false); + pulsar.getConfig().setTransactionCoordinatorEnabled(false); + } }