Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Subscription stuck due to called Admin API analyzeSubscriptionBacklog #22019

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ public class ManagedCursorImpl implements ManagedCursor {
position.ackSet = null;
return position;
};
private final RangeSetWrapper<PositionImpl> individualDeletedMessages;
protected final RangeSetWrapper<PositionImpl> individualDeletedMessages;

// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
protected final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private RateLimiter markDeleteLimiter;
Expand Down Expand Up @@ -3622,4 +3622,35 @@ public boolean isCacheReadEntry() {
public ManagedLedgerConfig getConfig() {
return config;
}

/***
* Create a non-durable cursor and copy the ack stats.
*/
public CompletableFuture<ManagedCursor> duplicateToNonDurableCursor(String nonDurableCursorName) {
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
CompletableFuture<ManagedCursor> res = new CompletableFuture<>();
try {
NonDurableCursorImpl newNonDurableCursor =
(NonDurableCursorImpl) ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName);
if (individualDeletedMessages != null) {
this.individualDeletedMessages.forEach(range -> {
newNonDurableCursor.individualDeletedMessages.addOpenClosed(
range.lowerEndpoint().getLedgerId(),
range.lowerEndpoint().getEntryId(),
range.upperEndpoint().getLedgerId(),
range.upperEndpoint().getEntryId());
return true;
});
}
if (batchDeletedIndexes != null) {
for (Map.Entry<PositionImpl, BitSetRecyclable> entry : this.batchDeletedIndexes.entrySet()) {
BitSetRecyclable copiedBitSet = BitSetRecyclable.valueOf(entry.getValue());
newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), copiedBitSet);
}
}
res.complete(newNonDurableCursor);
} catch (ManagedLedgerException ex) {
res.completeExceptionally(ex);
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -530,111 +531,129 @@ public String getTypeString() {
return "Null";
}

@Override
public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position) {
final ManagedLedger managedLedger = topic.getManagedLedger();
CompletableFuture<ManagedCursor> nonDurableCursorFuture = ((ManagedCursorImpl) cursor)
.duplicateToNonDurableCursor("analyze-backlog-" + UUID.randomUUID());
return nonDurableCursorFuture.thenCompose(newNonDurableCursor -> {
long start = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Starting to analyze backlog", topicName, subName);
}

long start = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Starting to analyze backlog", topicName, subName);
}

AtomicLong entries = new AtomicLong();
AtomicLong accepted = new AtomicLong();
AtomicLong rejected = new AtomicLong();
AtomicLong rescheduled = new AtomicLong();
AtomicLong messages = new AtomicLong();
AtomicLong acceptedMessages = new AtomicLong();
AtomicLong rejectedMessages = new AtomicLong();
AtomicLong rescheduledMessages = new AtomicLong();
AtomicLong entries = new AtomicLong();
AtomicLong accepted = new AtomicLong();
AtomicLong rejected = new AtomicLong();
AtomicLong rescheduled = new AtomicLong();
AtomicLong messages = new AtomicLong();
AtomicLong acceptedMessages = new AtomicLong();
AtomicLong rejectedMessages = new AtomicLong();
AtomicLong rescheduledMessages = new AtomicLong();

Position currentPosition = cursor.getMarkDeletedPosition();
Position currentPosition = newNonDurableCursor.getMarkDeletedPosition();

if (log.isDebugEnabled()) {
log.debug("[{}][{}] currentPosition {}",
topicName, subName, currentPosition);
}
final EntryFilterSupport entryFilterSupport = dispatcher != null
? (EntryFilterSupport) dispatcher : new EntryFilterSupport(this);
// we put some hard limits on the scan, in order to prevent denial of services
ServiceConfiguration configuration = topic.getBrokerService().getPulsar().getConfiguration();
long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
int batchSize = configuration.getDispatcherMaxReadBatchSize();
AtomicReference<Position> firstPosition = new AtomicReference<>();
AtomicReference<Position> lastPosition = new AtomicReference<>();
final Predicate<Entry> condition = entry -> {
if (log.isDebugEnabled()) {
log.debug("found {}", entry);
}
Position entryPosition = entry.getPosition();
firstPosition.compareAndSet(null, entryPosition);
lastPosition.set(entryPosition);
ByteBuf metadataAndPayload = entry.getDataBuffer();
MessageMetadata messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1);
int numMessages = 1;
if (messageMetadata.hasNumMessagesInBatch()) {
numMessages = messageMetadata.getNumMessagesInBatch();
log.debug("[{}][{}] currentPosition {}",
topicName, subName, currentPosition);
}
EntryFilter.FilterResult filterResult = entryFilterSupport
.runFiltersForEntry(entry, messageMetadata, null);
final EntryFilterSupport entryFilterSupport = dispatcher != null
? (EntryFilterSupport) dispatcher : new EntryFilterSupport(this);
// we put some hard limits on the scan, in order to prevent denial of services
ServiceConfiguration configuration = topic.getBrokerService().getPulsar().getConfiguration();
long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
int batchSize = configuration.getDispatcherMaxReadBatchSize();
AtomicReference<Position> firstPosition = new AtomicReference<>();
AtomicReference<Position> lastPosition = new AtomicReference<>();
final Predicate<Entry> condition = entry -> {
if (log.isDebugEnabled()) {
log.debug("found {}", entry);
}
Position entryPosition = entry.getPosition();
firstPosition.compareAndSet(null, entryPosition);
lastPosition.set(entryPosition);
ByteBuf metadataAndPayload = entry.getDataBuffer();
MessageMetadata messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1);
int numMessages = 1;
if (messageMetadata.hasNumMessagesInBatch()) {
numMessages = messageMetadata.getNumMessagesInBatch();
}
EntryFilter.FilterResult filterResult = entryFilterSupport
.runFiltersForEntry(entry, messageMetadata, null);

if (filterResult == null) {
filterResult = EntryFilter.FilterResult.ACCEPT;
}
switch (filterResult) {
case REJECT:
rejected.incrementAndGet();
rejectedMessages.addAndGet(numMessages);
break;
case RESCHEDULE:
rescheduled.incrementAndGet();
rescheduledMessages.addAndGet(numMessages);
break;
default:
accepted.incrementAndGet();
acceptedMessages.addAndGet(numMessages);
break;
}
long num = entries.incrementAndGet();
messages.addAndGet(numMessages);
if (filterResult == null) {
filterResult = EntryFilter.FilterResult.ACCEPT;
}
switch (filterResult) {
case REJECT:
rejected.incrementAndGet();
rejectedMessages.addAndGet(numMessages);
break;
case RESCHEDULE:
rescheduled.incrementAndGet();
rescheduledMessages.addAndGet(numMessages);
break;
default:
accepted.incrementAndGet();
acceptedMessages.addAndGet(numMessages);
break;
}
long num = entries.incrementAndGet();
messages.addAndGet(numMessages);

if (num % 1000 == 0) {
long end = System.currentTimeMillis();
log.info(
"[{}][{}] scan running since {} ms - scanned {} entries",
topicName, subName, end - start, num);
}

if (num % 1000 == 0) {
return true;
};
CompletableFuture<AnalyzeBacklogResult> res = newNonDurableCursor.scan(
position,
condition,
batchSize,
maxEntries,
timeOutMs
).thenApply((ScanOutcome outcome) -> {
long end = System.currentTimeMillis();
AnalyzeBacklogResult result = new AnalyzeBacklogResult();
result.setFirstPosition(firstPosition.get());
result.setLastPosition(lastPosition.get());
result.setEntries(entries.get());
result.setMessages(messages.get());
result.setFilterAcceptedEntries(accepted.get());
result.setFilterAcceptedMessages(acceptedMessages.get());
result.setFilterRejectedEntries(rejected.get());
result.setFilterRejectedMessages(rejectedMessages.get());
result.setFilterRescheduledEntries(rescheduled.get());
result.setFilterRescheduledMessages(rescheduledMessages.get());
// sometimes we abort the execution due to a timeout or
// when we reach a maximum number of entries
result.setScanOutcome(outcome);
log.info(
"[{}][{}] scan running since {} ms - scanned {} entries",
topicName, subName, end - start, num);
}
"[{}][{}] scan took {} ms - {}",
topicName, subName, end - start, result);
return result;
});
res.whenComplete((__, ex) -> {
managedLedger.asyncDeleteCursor(newNonDurableCursor.getName(),
new AsyncCallbacks.DeleteCursorCallback(){
@Override
public void deleteCursorComplete(Object ctx) {
// Nothing to do.
}

return true;
};
return cursor.scan(
position,
condition,
batchSize,
maxEntries,
timeOutMs
).thenApply((ScanOutcome outcome) -> {
long end = System.currentTimeMillis();
AnalyzeBacklogResult result = new AnalyzeBacklogResult();
result.setFirstPosition(firstPosition.get());
result.setLastPosition(lastPosition.get());
result.setEntries(entries.get());
result.setMessages(messages.get());
result.setFilterAcceptedEntries(accepted.get());
result.setFilterAcceptedMessages(acceptedMessages.get());
result.setFilterRejectedEntries(rejected.get());
result.setFilterRejectedMessages(rejectedMessages.get());
result.setFilterRescheduledEntries(rescheduled.get());
result.setFilterRescheduledMessages(rescheduledMessages.get());
// sometimes we abort the execution due to a timeout or
// when we reach a maximum number of entries
result.setScanOutcome(outcome);
log.info(
"[{}][{}] scan took {} ms - {}",
topicName, subName, end - start, result);
return result;
@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.info("[{}][{}] Delete non-durable cursor[{}] failed when analyze backlog.",
topicName, subName, newNonDurableCursor.getName());
}
}, null);
});
return res;
});

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3389,4 +3389,33 @@ private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Except
// cleanup.
admin.namespaces().deleteNamespace(ns);
}

@Test
private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topic);
// Send 10 messages.
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription)
.receiverQueueSize(0).subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
for (int i = 0; i < 10; i++) {
producer.send(i + "");
}

// Verify consumer can receive all messages after calling "analyzeSubscriptionBacklog".
admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.of(MessageIdImpl.earliest));
for (int i = 0; i < 10; i++) {
Awaitility.await().untilAsserted(() -> {
Message m = consumer.receive();
assertNotNull(m);
consumer.acknowledge(m);
});
}

// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,17 @@ private void verifyBacklog(String topic, String subscription, int numEntries, in
AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklogResult
= admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.empty());

assertEquals(numEntries, analyzeSubscriptionBacklogResult.getEntries());
assertEquals(numEntries, analyzeSubscriptionBacklogResult.getFilterAcceptedEntries());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedEntries());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
assertEquals(analyzeSubscriptionBacklogResult.getEntries(), numEntries);
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedEntries(), numEntries);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedEntries(), 0);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);

assertEquals(numMessages, analyzeSubscriptionBacklogResult.getMessages());
assertEquals(numMessages, analyzeSubscriptionBacklogResult.getFilterAcceptedMessages());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedMessages());
assertEquals(analyzeSubscriptionBacklogResult.getMessages(), numMessages);
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(), numMessages);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0);

assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledMessages());
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledMessages(), 0);
assertFalse(analyzeSubscriptionBacklogResult.isAborted());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ public static BitSetRecyclable valueOf(byte[] bytes) {
return BitSetRecyclable.valueOf(ByteBuffer.wrap(bytes));
}

/**
* Copy a BitSetRecyclable.
*/
public static BitSetRecyclable valueOf(BitSetRecyclable src) {
// The internal implementation will do the array-copy.
return valueOf(src.words);
}

/**
* Returns a new bit set containing all the bits in the given byte
* buffer between its position and limit.
Expand Down
Loading