Skip to content

Commit

Permalink
[fix][broker] Skip reading more entries for a pending read with no mo…
Browse files Browse the repository at this point in the history
…re entries (apache#16400)

### Motivation

Related issue: streamnative/kop#1379

KoP uses reader on a single partition of a compacted topic and we
observed a lot of logs like:

> Error reading entries at 928511:1 : We can only have a single waiting callback

It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns
false, `asyncReadEntriesOrWait` is called for multiple times before
`cancelPendingReadRequest` or new messages arrived.

### Modifications

Throw a `ConcurrentWaitCallbackException` instead of a raw
`ManagedLedgerException` when there are more wait callbacks. Then check
this exception type and skip the following steps in
`PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.

(cherry picked from commit 5ec4e3d)
  • Loading branch information
BewareMyPower authored and nicoloboschi committed Jul 13, 2022
1 parent 7c002e6 commit d62fb98
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ public ManagedLedgerFactoryClosedException(Throwable e) {
}
}

public static class ConcurrentWaitCallbackException extends ManagedLedgerException {

public ConcurrentWaitCallbackException() {
super("We can only have a single waiting callback");
}
}

@Override
public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,8 +859,7 @@ public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntrie

if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"),
ctx);
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -352,6 +353,7 @@ protected void readMoreEntries(Consumer consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
}

synchronized (this) {
havePendingRead = true;
if (consumer.readCompacted()) {
Expand Down Expand Up @@ -481,6 +483,12 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
Consumer c = readEntriesCtx.getConsumer();
readEntriesCtx.recycle();

if (exception instanceof ConcurrentWaitCallbackException) {
// At most one pending read request is allowed when there are no more entries, we should not trigger more
// read operations in this case and just wait the existing read operation completes.
return;
}

long waitTimeMillis = readFailureBackoff.next();

if (exception instanceof NoMoreEntriesToReadException) {
Expand Down

0 comments on commit d62fb98

Please sign in to comment.