Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Subscription stuck due to called Admin API analyzeSubscriptionBacklog #22019

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,8 @@ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeByte
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op =
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition,
skipCondition, true);
ledger.asyncReadEntries(op);
}

Expand Down Expand Up @@ -955,7 +956,7 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);
ctx, maxPosition, skipCondition, true);

if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ class OpReadEntry implements ReadEntriesCallback {

Predicate<PositionImpl> skipCondition;

private boolean moveCursorReadPos;

public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition,
boolean moveCursorReadPos) {
OpReadEntry op = RECYCLER.get();
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
op.cursor = cursor;
Expand All @@ -63,6 +66,7 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi
op.skipCondition = skipCondition;
op.ctx = ctx;
op.nextReadPosition = PositionImpl.get(op.readPosition);
op.moveCursorReadPos = moveCursorReadPos;
return op;
}

Expand Down Expand Up @@ -92,7 +96,9 @@ void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, Positi
// if entries have been filtered out then try to skip reading of already deletedMessages in that range
final Position nexReadPosition = entriesCount != filteredEntries.size()
? cursor.getNextAvailablePosition(lastPosition) : lastPosition.getNext();
updateReadPosition(nexReadPosition);
if (moveCursorReadPos) {
updateReadPosition(nexReadPosition);
}
checkReadCompletion();
}

Expand Down Expand Up @@ -161,9 +167,7 @@ void updateReadPosition(Position newReadPosition) {

void checkReadCompletion() {
// op readPosition is smaller or equals maxPosition then can read again
if (entries.size() < count && cursor.hasMoreEntries()
&& maxPosition.compareTo(readPosition) > 0) {

if (needAccumulateMoreEntries()) {
// We still have more entries to read from the next ledger, schedule a new async operation
cursor.ledger.getExecutor().execute(() -> {
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
Expand All @@ -183,6 +187,16 @@ void checkReadCompletion() {
}
}

private boolean needAccumulateMoreEntries() {
if (!moveCursorReadPos) {
// Just responds the entries of current reads, because "cursor.readPosition" has not moved, so cannot
// accumulate more entries.
return false;
}
boolean hasMoreEntries = cursor.hasMoreEntries() && maxPosition.compareTo(readPosition) > 0;
return entries.size() < count && hasMoreEntries;
}

public int getNumberOfEntriesToRead() {
return count - entries.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void find() {
}
if (cursor.hasMoreEntries(searchPosition)) {
OpReadEntry opReadEntry = OpReadEntry.create(cursor, searchPosition, batchSize,
this, OpScan.this.ctx, null, null);
this, OpScan.this.ctx, null, null, false);
dao-jun marked this conversation as resolved.
Show resolved Hide resolved
ledger.asyncReadEntries(opReadEntry);
} else {
callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -2117,7 +2118,7 @@
assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.empty(), (entry -> {
positionsAfterDelete.add(entry.getPosition());
return true;
}), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());

Check failure on line 2121 in managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Other

ManagedCursorTest.testScan

org.apache.bookkeeper.mledger.ManagedLedgerException: java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 0
assertEquals(numEntries - 1, positionsAfterDelete.size());

// delete all the entries
Expand Down Expand Up @@ -4249,7 +4250,7 @@

// op readPosition is bigger than maxReadPosition
OpReadEntry opReadEntry = OpReadEntry.create(cursor, ledger.lastConfirmedEntry, 10, callback,
null, PositionImpl.get(lastPosition.getLedgerId(), -1), null);
null, PositionImpl.get(lastPosition.getLedgerId(), -1), null, true);
Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
field.setAccessible(true);
field.set(cursor, PositionImpl.EARLIEST);
Expand All @@ -4271,7 +4272,8 @@
};

@Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry = Mockito.mockStatic(OpReadEntry.class);
mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(), any(), any(), any()))
mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(), any(), any(), any(),
anyBoolean()))
.thenAnswer(__ -> createOpReadEntry.get());

final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

}
}, null, maxPosition, null);
}, null, maxPosition, null, true);
Assert.assertEquals(opReadEntry.readPosition, position);
}

Expand Down Expand Up @@ -3157,7 +3157,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
responseException2.set(exception);
}

}, null, PositionImpl.LATEST, null);
}, null, PositionImpl.LATEST, null, true);
ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
opReadEntry, ctxStr);
retryStrategically((test) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3389,4 +3389,33 @@ private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Except
// cleanup.
admin.namespaces().deleteNamespace(ns);
}

@Test
private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topic);
// Send 10 messages.
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription)
.receiverQueueSize(0).subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
for (int i = 0; i < 10; i++) {
producer.send(i + "");
}

// Verify consumer can receive all messages after calling "analyzeSubscriptionBacklog".
admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.of(MessageIdImpl.earliest));
for (int i = 0; i < 10; i++) {
Awaitility.await().untilAsserted(() -> {
Message m = consumer.receive();
assertNotNull(m);
consumer.acknowledge(m);
});
}

// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topic);
}
}
Loading