Skip to content

Commit

Permalink
getOptionalLedgerInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed Sep 29, 2022
1 parent 36ee4d7 commit f074db0
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand Down Expand Up @@ -649,6 +650,12 @@ void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePro
*/
CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);

/**
* Get basic ledger summary.
* will get {@link Optional#empty()} if corresponding ledger not exists.
*/
Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId);

/**
* Truncate ledgers
* The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,27 +143,14 @@ public TooManyRequestsException(String msg) {
}

public static class NonRecoverableLedgerException extends ManagedLedgerException {
private Integer bkErrorCode; // null means ledger not exists or deleted
public NonRecoverableLedgerException(String msg) {
super(msg);
}
}

public NonRecoverableLedgerException(String msg, Integer bkErrorCode) {
public static class LedgerNotExistException extends NonRecoverableLedgerException {
public LedgerNotExistException(String msg) {
super(msg);
this.bkErrorCode = bkErrorCode;
}

public boolean isLedgerNotExistException() {
if (bkErrorCode == null) {
return true;
}
switch (bkErrorCode) {
case BKException.Code.NoSuchLedgerExistsException:
case BKException.Code.NoSuchLedgerExistsOnMetadataServerException:
return true;
default:
return false;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerInterceptException;
Expand Down Expand Up @@ -1837,6 +1838,11 @@ public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
return result;
}

@Override
public Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId) {
return Optional.ofNullable(ledgers.get(ledgerId));
}

CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
if (ledgerHandle != null) {
Expand Down Expand Up @@ -1943,7 +1949,7 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob
} else {
log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+ "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
callback.readEntryFailed(new LedgerNotExistException("Message not found, "
+ "the ledgerId does not belong to this topic or has been deleted"), ctx);
}

Expand Down Expand Up @@ -3756,11 +3762,26 @@ private static boolean isBkErrorNotRecoverable(int rc) {
}
}

private static boolean isLedgerNotExistException(int rc) {
switch (rc) {
case Code.NoSuchLedgerExistsException:
case Code.NoSuchLedgerExistsOnMetadataServerException:
return true;

default:
return false;
}
}

public static ManagedLedgerException createManagedLedgerException(int bkErrorCode) {
if (bkErrorCode == BKException.Code.TooManyRequestsException) {
return new TooManyRequestsException("Too many request error from bookies");
} else if (isBkErrorNotRecoverable(bkErrorCode)) {
return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode), bkErrorCode);
if (isLedgerNotExistException(bkErrorCode)) {
return new LedgerNotExistException(BKException.getMessage(bkErrorCode));
} else {
return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
}
} else {
return new ManagedLedgerException(BKException.getMessage(bkErrorCode));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand Down Expand Up @@ -191,19 +193,25 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
&& (exception instanceof NonRecoverableLedgerException)) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", topicName, subName, failedReadPosition,
exception.getMessage());
if ((((NonRecoverableLedgerException) exception).isLedgerNotExistException())) {
try {
long failedLedgerId = failedReadPosition.get().getLedgerId();
Position lastPositionInLedger = PositionImpl.get(failedLedgerId,
cursor.getManagedLedger().getLedgerInfo(failedLedgerId).get().getEntries() - 1);
log.info("[{}][{}] ledger not existed, will complete the last position of the non-existed"
+ " ledger:{}", topicName, subName, lastPositionInLedger);
findEntryComplete(lastPositionInLedger, ctx);
} catch (Exception e) {
log.warn("[{}][{}] failed to expire not existed ledger, try to just expire failed position:{} : {}",
topicName, subName, failedReadPosition, e.getMessage());
findEntryComplete(failedReadPosition.get(), ctx);
}
if (exception instanceof LedgerNotExistException) {
long failedLedgerId = failedReadPosition.get().getLedgerId();
ManagedLedgerImpl ledger = ((ManagedLedgerImpl) cursor.getManagedLedger());
Position lastPositionInLedger = ledger.getOptionalLedgerInfo(failedLedgerId)
.map(ledgerInfo -> PositionImpl.get(failedLedgerId, ledgerInfo.getEntries() - 1))
.orElseGet(() -> {
Long nextExistingLedger = ledger.getNextValidLedger(failedReadPosition.get().getLedgerId());
if (nextExistingLedger == null) {
log.info("[{}] [{}] Couldn't find next next valid ledger for expiry monitor when find "
+ "entry failed {}", ledger.getName(), ledger.getName(),
failedReadPosition);
return (PositionImpl) failedReadPosition.get();
} else {
return PositionImpl.get(nextExistingLedger, -1);
}
});
log.info("[{}][{}] ledger not existed, will complete the last position of the non-existed"
+ " ledger:{}", topicName, subName, lastPositionInLedger);
findEntryComplete(lastPositionInLedger, ctx);
} else {
findEntryComplete(failedReadPosition.get(), ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -354,6 +355,12 @@ public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
return CompletableFuture.completedFuture(build);
}

@Override
public Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId) {
final LedgerInfo build = LedgerInfo.newBuilder().setLedgerId(ledgerId).setSize(100).setEntries(20).build();
return Optional.of(build);
}

@Override
public CompletableFuture<Void> asyncTruncate() {
return CompletableFuture.completedFuture(null);
Expand Down

0 comments on commit f074db0

Please sign in to comment.