Skip to content

Commit

Permalink
[fix][broker][branch-2.11] The topic might reference a closed ledger (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jul 19, 2024
1 parent 12af1f0 commit 69ff0f7
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1836,4 +1836,9 @@ public void shutdownNow() {
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
}

@VisibleForTesting
public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) {
this.transactionBufferProvider = transactionBufferProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,17 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
try {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()) {
try {
topicFuture.join();
} catch (Exception ex) {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx == FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION) {
return CompletableFuture.failedFuture(new TimeoutException("The previous loading task"
+ " has not finished yet even through it has timeout, please retry again."));
}
}
}
if (topicFuture.isCompletedExceptionally()
|| (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
Expand Down Expand Up @@ -1608,6 +1619,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " topic", topic, FutureUtil.getException(topicFuture));
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,27 @@
*/
package org.apache.pulsar.client.api;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
import static org.junit.Assert.assertNotNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -114,4 +121,68 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception {
admin.topics().delete(tpName, false);
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
}

@Test
public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception {
// Make the topic loading timeout faster.
long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds();
int topicLoadTimeoutSeconds = 1;
pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
pulsar.getConfig().setBrokerDeduplicationEnabled(true);
pulsar.getConfig().setTransactionCoordinatorEnabled(true);
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2");

// Mock message deduplication recovery speed topicLoadTimeoutSeconds
AtomicBoolean stopDelay = new AtomicBoolean();
String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME;
mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> {
if (mlPath.equals(path) && !stopDelay.get()) {
log.info("Topic load timeout: " + path);
return true;
}
return false;
});

// First load topic will trigger timeout
// The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close.
// Here, we simulate a sleep to ensure that the ledger is not immediately closed.
TransactionBufferProvider mockTransactionBufferProvider = new TransactionBufferProvider() {
@Override
public TransactionBuffer newTransactionBuffer(Topic originTopic) {
return new TransactionBufferDisable(originTopic) {
@SneakyThrows
@Override
public CompletableFuture<Void> closeAsync() {
Thread.sleep(500);
return super.closeAsync();
}
};
}
};
TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider();
pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider);
CompletableFuture<Optional<Topic>> firstLoad = pulsar.getBrokerService().getTopic(tpName, true);
Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
// assert first create topic timeout
.untilAsserted(() -> {
assertTrue(firstLoad.isCompletedExceptionally());
});

// Once the first load topic times out, immediately to load the topic again.
stopDelay.set(true);
Producer<byte[]> producer = pulsarClient.newProducer().topic(tpName).create();
for (int i = 0; i < 10; i++) {
MessageId send = producer.send("msg".getBytes());
Thread.sleep(100);
assertNotNull(send);
}

// set to back
pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider);
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
pulsar.getConfig().setBrokerDeduplicationEnabled(false);
pulsar.getConfig().setTransactionCoordinatorEnabled(false);
}
}

0 comments on commit 69ff0f7

Please sign in to comment.