Skip to content

Commit

Permalink
[fix][metadata] Cleanup state when lock revalidation gets `LockBusyEx…
Browse files Browse the repository at this point in the history
…ception` (#17700)

### Motivation

In the production environment,  we found two brokers holding the same valid locks. and one has an exceptional revalidate future with `lockBusyException`. after reading the code, there may forget the reset the cache and complete expire exception when getting lockBusyException.

(cherry picked from commit 955ae34)
  • Loading branch information
mattisonchao authored and codelipenghui committed Sep 20, 2022
1 parent 0529c57 commit 65c059a
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private void handleSessionEvent(SessionEvent se) {
if (se == SessionEvent.SessionReestablished) {
log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
for (ResourceLockImpl<T> lock : locks.values()) {
futures.add(lock.revalidate(lock.getValue()));
futures.add(lock.revalidate(lock.getValue(), true));
}

} else if (se == SessionEvent.Reconnected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -127,7 +128,7 @@ synchronized CompletableFuture<Void> acquire(T newValue) {
.thenRun(() -> result.complete(null))
.exceptionally(ex -> {
if (ex.getCause() instanceof LockBusyException) {
revalidate(newValue)
revalidate(newValue, false)
.thenAccept(__ -> result.complete(null))
.exceptionally(ex1 -> {
result.completeExceptionally(ex1);
Expand Down Expand Up @@ -184,38 +185,21 @@ synchronized void lockWasInvalidated() {
}

log.info("Lock on resource {} was invalidated", path);
revalidate(value)
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
if (ex.getCause() instanceof BadVersionException) {
log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
state = State.Released;
expiredFuture.complete(null);
} else {
// We failed to revalidate the lock due to connectivity issue
// Continue assuming we hold the lock, until we can revalidate it, either
// on Reconnected or SessionReestablished events.
revalidateAfterReconnection = true;
log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
ex.getCause().getMessage());
}
}
return null;
});
revalidate(value, true)
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path));
}

synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
if (revalidateAfterReconnection) {
revalidateAfterReconnection = false;
log.warn("Revalidate lock at {} after reconnection", path);
return revalidate(value);
return revalidate(value, true);
} else {
return CompletableFuture.completedFuture(null);
}
}

synchronized CompletableFuture<Void> revalidate(T newValue) {
synchronized CompletableFuture<Void> revalidate(T newValue, boolean revalidateAfterReconnection) {
if (revalidateFuture == null || revalidateFuture.isDone()) {
revalidateFuture = doRevalidate(newValue);
} else {
Expand All @@ -233,6 +217,26 @@ synchronized CompletableFuture<Void> revalidate(T newValue) {
});
revalidateFuture = newFuture;
}
revalidateFuture.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (!revalidateAfterReconnection || realCause instanceof BadVersionException
|| realCause instanceof LockBusyException) {
log.warn("Failed to revalidate the lock at {}. Marked as expired. {}",
path, realCause.getMessage());
state = State.Released;
expiredFuture.complete(null);
} else {
// We failed to revalidate the lock due to connectivity issue
// Continue assuming we hold the lock, until we can revalidate it, either
// on Reconnected or SessionReestablished events.
ResourceLockImpl.this.revalidateAfterReconnection = true;
log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
realCause.getMessage());
}
}
return null;
});
return revalidateFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -29,6 +31,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand Down Expand Up @@ -293,4 +296,60 @@ public void revalidateLockOnDifferentSession(String provider, Supplier<String> u
assertEquals(new String(store1.get(path2).join().get().getValue()), "\"value-1\"");
});
}

@Test(dataProvider = "impl")
public void testCleanUpStateWhenRevalidationGotLockBusy(String provider, Supplier<String> urlSupplier)
throws Exception {

if (provider.equals("Memory") || provider.equals("RocksDB")) {
// Local memory provider doesn't really have the concept of multiple sessions
return;
}

@Cleanup
MetadataStoreExtended store1 = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
@Cleanup
MetadataStoreExtended store2 = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());

@Cleanup
CoordinationService cs1 = new CoordinationServiceImpl(store1);
@Cleanup
LockManager<String> lm1 = cs1.getLockManager(String.class);

@Cleanup
CoordinationService cs2 = new CoordinationServiceImpl(store2);
@Cleanup
LockManager<String> lm2 = cs2.getLockManager(String.class);

String path1 = newKey();

ResourceLock<String> lock1 = lm1.acquireLock(path1, "value-1").join();
AtomicReference<ResourceLock<String>> lock2 = new AtomicReference<>();
// lock 2 will steal the distributed lock first.
Awaitility.await().until(()-> {
// Ensure steal the lock success.
try {
lock2.set(lm2.acquireLock(path1, "value-1").join());
return true;
} catch (Exception ex) {
return false;
}
});

// Since we can steal the lock repeatedly, we don't know which one will get it.
// But we can verify the final state.
Awaitility.await().untilAsserted(() -> {
if (lock1.getLockExpiredFuture().isDone()) {
assertTrue(lm1.listLocks(path1).join().isEmpty());
assertFalse(lock2.get().getLockExpiredFuture().isDone());
} else if (lock2.get().getLockExpiredFuture().isDone()) {
assertTrue(lm2.listLocks(path1).join().isEmpty());
assertFalse(lock1.getLockExpiredFuture().isDone());
} else {
fail("unexpected behaviour");
}
});
}
}

0 comments on commit 65c059a

Please sign in to comment.