Skip to content

Commit

Permalink
CURATOR-621: Fix write acquire after downgrade InterProcessReadWriteL…
Browse files Browse the repository at this point in the history
…ock (#445)

Currently, in downgrading the write lock of InterProcessReadWriteLock, the read lock could have a larger sorting sequence than the contending write-acquire; this cause the contending write-acquire to succeed after downgrading.

This commit solves this by using the write lock's sorting sequence for read lock in downgrading.
  • Loading branch information
kezhuw authored Apr 1, 2023
1 parent 37728a4 commit 2c283bd
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,7 @@ public static class WriteLock extends InternalInterProcessMutex
{
public WriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
super(client, basePath, WRITE_LOCK_NAME, lockData, 1, new SortingLockInternalsDriver() {
@Override
public PredicateResults getsTheLock(
CuratorFramework client,
List<String> children,
String sequenceNodeName,
int maxLeases
) throws Exception {
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
});
super(client, basePath, WRITE_LOCK_NAME, lockData, 1, new SortingLockInternalsDriver());
}

@Override
Expand All @@ -138,6 +128,15 @@ public static class ReadLock extends InternalInterProcessMutex {
public ReadLock(CuratorFramework client, String basePath, byte[] lockData, WriteLock writeLock)
{
super(client, basePath, READ_LOCK_NAME, lockData, Integer.MAX_VALUE, new SortingLockInternalsDriver() {
@Override
protected String getSortingSequence() {
String writePath = writeLock.getLockPath();
if (writePath != null) {
return fixForSorting(writePath, WRITE_LOCK_NAME);
}
return null;
}

@Override
public PredicateResults getsTheLock(
CuratorFramework client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,28 @@ public PredicateResults getsTheLock(CuratorFramework client, List<String> childr
return new PredicateResults(pathToWatch, getsTheLock);
}

protected String getSortingSequence() {
return null;
}

@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{

CreateMode createMode = CreateMode.EPHEMERAL_SEQUENTIAL;
String sequence = getSortingSequence();
if (sequence != null) {
path += sequence;
createMode = CreateMode.EPHEMERAL;
}
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(createMode).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(createMode).forPath(path);
}
return ourPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import com.google.common.collect.Lists;
Expand All @@ -45,6 +46,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class TestInterProcessReadWriteLock extends BaseClassForTests
Expand Down Expand Up @@ -239,6 +241,47 @@ public void testDowngrading() throws Exception
}
}

@Test
public void testContendingDowngrading() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
ExecutorService executor = Executors.newCachedThreadPool();
try
{
client.start();

InterProcessReadWriteLock lock1 = new InterProcessReadWriteLock(client, "/lock");
lock1.writeLock().acquire();

CountDownLatch ready = new CountDownLatch(1);
Future<?> writeAcquire = executor.submit(() -> {
ready.countDown();
InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client, "/lock");
lock2.writeLock().acquire();
fail("expect no acquire");
return null;
});

ready.await();
// Let lock2 have chance to do write-acquire before downgrading.
Thread.sleep(20);

assertTrue(lock1.readLock().acquire(5, TimeUnit.SECONDS));

lock1.writeLock().release();
// We still hold read lock, other write-acquire should block.
assertThrows(TimeoutException.class, () -> {
// Let lock2 have chance to respond to write-release
writeAcquire.get(20, TimeUnit.MILLISECONDS);
});
}
finally
{
TestCleanState.closeAndTestClean(client);
executor.shutdown();
}
}

@Test
public void testBasic() throws Exception
{
Expand Down

0 comments on commit 2c283bd

Please sign in to comment.