Skip to content

Commit

Permalink
[improve][broker]Improve PersistentMessageExpiryMonitor expire speed …
Browse files Browse the repository at this point in the history
…when ledger not existed (apache#17842)
  • Loading branch information
AnonHxy authored Oct 3, 2022
1 parent 1148204 commit af11c32
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand Down Expand Up @@ -649,6 +650,12 @@ void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePro
*/
CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);

/**
* Get basic ledger summary.
* will get {@link Optional#empty()} if corresponding ledger not exists.
*/
Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId);

/**
* Truncate ledgers
* The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public NonRecoverableLedgerException(String msg) {
}
}

public static class LedgerNotExistException extends NonRecoverableLedgerException {
public LedgerNotExistException(String msg) {
super(msg);
}
}

public static class InvalidReplayPositionException extends ManagedLedgerException {
public InvalidReplayPositionException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerInterceptException;
Expand Down Expand Up @@ -1835,6 +1836,11 @@ public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
return result;
}

@Override
public Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId) {
return Optional.ofNullable(ledgers.get(ledgerId));
}

CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
if (ledgerHandle != null) {
Expand Down Expand Up @@ -1941,7 +1947,7 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob
} else {
log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+ "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
callback.readEntryFailed(new LedgerNotExistException("Message not found, "
+ "the ledgerId does not belong to this topic or has been deleted"), ctx);
}

Expand Down Expand Up @@ -3754,11 +3760,26 @@ private static boolean isBkErrorNotRecoverable(int rc) {
}
}

private static boolean isLedgerNotExistException(int rc) {
switch (rc) {
case Code.NoSuchLedgerExistsException:
case Code.NoSuchLedgerExistsOnMetadataServerException:
return true;

default:
return false;
}
}

public static ManagedLedgerException createManagedLedgerException(int bkErrorCode) {
if (bkErrorCode == BKException.Code.TooManyRequestsException) {
return new TooManyRequestsException("Too many request error from bookies");
} else if (isBkErrorNotRecoverable(bkErrorCode)) {
return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
if (isLedgerNotExistException(bkErrorCode)) {
return new LedgerNotExistException(BKException.getMessage(bkErrorCode));
} else {
return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
}
} else {
return new ManagedLedgerException(BKException.getMessage(bkErrorCode));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand Down Expand Up @@ -191,7 +193,28 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
&& (exception instanceof NonRecoverableLedgerException)) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", topicName, subName, failedReadPosition,
exception.getMessage());
findEntryComplete(failedReadPosition.get(), ctx);
if (exception instanceof LedgerNotExistException) {
long failedLedgerId = failedReadPosition.get().getLedgerId();
ManagedLedgerImpl ledger = ((ManagedLedgerImpl) cursor.getManagedLedger());
Position lastPositionInLedger = ledger.getOptionalLedgerInfo(failedLedgerId)
.map(ledgerInfo -> PositionImpl.get(failedLedgerId, ledgerInfo.getEntries() - 1))
.orElseGet(() -> {
Long nextExistingLedger = ledger.getNextValidLedger(failedReadPosition.get().getLedgerId());
if (nextExistingLedger == null) {
log.info("[{}] [{}] Couldn't find next next valid ledger for expiry monitor when find "
+ "entry failed {}", ledger.getName(), ledger.getName(),
failedReadPosition);
return (PositionImpl) failedReadPosition.get();
} else {
return PositionImpl.get(nextExistingLedger, -1);
}
});
log.info("[{}][{}] ledger not existed, will complete the last position of the non-existed"
+ " ledger:{}", topicName, subName, lastPositionInLedger);
findEntryComplete(lastPositionInLedger, ctx);
} else {
findEntryComplete(failedReadPosition.get(), ctx);
}
}
expirationCheckInProgress = FALSE;
updateRates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -354,6 +355,12 @@ public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
return CompletableFuture.completedFuture(build);
}

@Override
public Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId) {
final LedgerInfo build = LedgerInfo.newBuilder().setLedgerId(ledgerId).setSize(100).setEntries(20).build();
return Optional.of(build);
}

@Override
public CompletableFuture<Void> asyncTruncate() {
return CompletableFuture.completedFuture(null);
Expand Down

0 comments on commit af11c32

Please sign in to comment.