From d41077dc8766840f8f398f73f4609b65d9001ce1 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 7 Nov 2024 11:49:28 +0800 Subject: [PATCH] [fix][broker] Fix ownership loss (#23515) Signed-off-by: Zixuan Liu (cherry picked from commit 576d34144c2dd44ed2eb0ce0b2babdf95ade874b) --- .../pulsar/broker/namespace/OwnershipCache.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 9a4534f538774..868ed2d9fc2c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -170,7 +170,22 @@ public CompletableFuture> getOwnerAsync(Namespa // If we're not the owner, we need to check if anybody else is String path = ServiceUnitUtils.path(suName); - return lockManager.readLock(path); + return lockManager.readLock(path).thenCompose(owner -> { + // If the current broker is the owner, attempt to reacquire ownership to avoid cache loss. + if (owner.isPresent() && owner.get().equals(selfOwnerInfo)) { + log.warn("Detected ownership loss for broker [{}] on namespace bundle [{}]. " + + "Attempting to reacquire ownership to maintain cache consistency.", + selfOwnerInfo, suName); + try { + return tryAcquiringOwnership(suName).thenApply(Optional::ofNullable); + } catch (Exception e) { + log.error("Failed to reacquire ownership for namespace bundle [{}] on broker [{}]: {}", + suName, selfOwnerInfo, e.getMessage(), e); + return CompletableFuture.failedFuture(e); + } + } + return CompletableFuture.completedFuture(owner); + }); } /**