Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][log] Print ZK path if write to ZK fails due to data being too large to persist #23652

Merged
merged 5 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,8 @@ public void operationFailed(ManagedLedgerException exception) {

if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
log.error("[{}][{}] Metadata ledger creation failed, try to persist the position in the metadata"
+ " store.", ledger.getName(), name);
persistPositionToMetaStore(mdEntry, cb);
} else {
cb.operationFailed(new ManagedLedgerException("Switch new cursor ledger failed"));
Expand Down Expand Up @@ -2921,9 +2923,7 @@ public void operationComplete() {

@Override
public void operationFailed(ManagedLedgerException exception) {
log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata"
+ " store.", ledger.getName(), name, exception);

log.error("[{}][{}] Metadata ledger creation failed {}", ledger.getName(), name, exception);
synchronized (pendingMarkDeleteOps) {
// At this point we don't have a ledger ready
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
Expand Down Expand Up @@ -201,10 +202,15 @@ protected void batchOperation(List<MetadataOp> ops) {
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
.entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries")
.collect(Collectors.joining(", "));
List<Pair> opsForLog = ops.stream()
.filter(item -> item.size() > 256 * 1024)
.map(op -> Pair.of(op.getPath(), op.size()))
.collect(Collectors.toList());
Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size));
log.warn("Connection loss while executing batch operation of {} "
+ "of total data size of {}. "
+ "Retrying individual operations one-by-one.", countsByType, totalSize);
+ "Retrying individual operations one-by-one. ops whose size > 256KB: {}",
countsByType, totalSize, opsForLog);

// Retry with the individual operations
executor.schedule(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ default OpGetChildren asGetChildren() {
default OpPut asPut() {
return (OpPut) this;
}

String getPath();
}
Loading