Skip to content

Commit

Permalink
Replace synchronized with j.u.c.l.ReentrantLock for Loom (#3480)
Browse files Browse the repository at this point in the history
Co-authored-by: M Sazzadul Hoque <[email protected]>
  • Loading branch information
babanin and sazzad16 authored Jul 16, 2024
1 parent ba98da7 commit e113b9c
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 33 deletions.
9 changes: 8 additions & 1 deletion src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.json.JSONArray;
Expand Down Expand Up @@ -50,6 +52,7 @@ protected RedisProtocol getProtocol() {
return protocol;
}

private Lock mapperLock = new ReentrantLock(true);
private volatile JsonObjectMapper jsonObjectMapper;
private final AtomicInteger searchDialect = new AtomicInteger(0);

Expand Down Expand Up @@ -4435,11 +4438,15 @@ public final CommandObject<Object> tFunctionCallAsync(String library, String fun
private JsonObjectMapper getJsonObjectMapper() {
JsonObjectMapper localRef = this.jsonObjectMapper;
if (Objects.isNull(localRef)) {
synchronized (this) {
mapperLock.lock();

try {
localRef = this.jsonObjectMapper;
if (Objects.isNull(localRef)) {
this.jsonObjectMapper = localRef = new DefaultGsonObjectMapper();
}
} finally {
mapperLock.unlock();
}
}
return localRef;
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/redis/clients/jedis/JedisSentinelPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
Expand All @@ -28,7 +30,7 @@ public class JedisSentinelPool extends Pool<Jedis> {

private volatile HostAndPort currentHostMaster;

private final Object initPoolLock = new Object();
private final Lock initPoolLock = new ReentrantLock(true);

public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
final JedisClientConfig masterClientConfig, final JedisClientConfig sentinelClientConfig) {
Expand Down Expand Up @@ -213,7 +215,9 @@ public HostAndPort getCurrentHostMaster() {
}

private void initMaster(HostAndPort master) {
synchronized (initPoolLock) {
initPoolLock.lock();

try {
if (!master.equals(currentHostMaster)) {
currentHostMaster = master;
factory.setHostAndPort(currentHostMaster);
Expand All @@ -223,6 +227,8 @@ private void initMaster(HostAndPort master) {

LOG.info("Created JedisSentinelPool to master at {}", master);
}
} finally {
initPoolLock.unlock();
}
}

Expand Down
16 changes: 11 additions & 5 deletions src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import redis.clients.jedis.Builder;
Expand Down Expand Up @@ -106,9 +108,7 @@ private Builder<ResultSet> getBuilder(String graphName) {
}

private void createBuilder(String graphName) {
synchronized (builders) {
builders.putIfAbsent(graphName, new ResultSetBuilder(new GraphCacheImpl(graphName)));
}
builders.computeIfAbsent(graphName, graphNameKey -> new ResultSetBuilder(new GraphCacheImpl(graphNameKey)));
}

private class GraphCacheImpl implements GraphCache {
Expand Down Expand Up @@ -144,6 +144,8 @@ private class GraphCacheList {
private final String name;
private final String query;
private final List<String> data = new CopyOnWriteArrayList<>();

private final Lock dataLock = new ReentrantLock(true);

/**
*
Expand All @@ -164,14 +166,18 @@ public GraphCacheList(String name, String procedure) {
*/
public String getCachedData(int index) {
if (index >= data.size()) {
synchronized (data) {
dataLock.lock();

try {
if (index >= data.size()) {
getProcedureInfo();
}
} finally {
dataLock.unlock();
}
}

return data.get(index);

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package redis.clients.jedis.mcf;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
Expand All @@ -17,6 +19,7 @@
*/
@Experimental
public class CircuitBreakerFailoverBase implements AutoCloseable {
private final Lock lock = new ReentrantLock(true);

protected final MultiClusterPooledConnectionProvider provider;

Expand All @@ -32,29 +35,34 @@ public void close() {
/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
*/
protected synchronized void clusterFailover(CircuitBreaker circuitBreaker) {
protected void clusterFailover(CircuitBreaker circuitBreaker) {
lock.lock();

try {
// Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {

// Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {
// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();

// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();
// Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand()
// to use the next cluster's connection pool - according to the configuration's prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();

// Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand()
// to use the next cluster's connection pool - according to the configuration's prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();
// Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
}

// Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
}

// Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " +
"provided with an additional cluster/database endpoint according to its prioritized sequence. " +
"If applicable, consider failing back OR restarting with an available cluster/database endpoint");
// Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " +
"provided with an additional cluster/database endpoint according to its prioritized sequence. " +
"If applicable, consider failing back OR restarting with an available cluster/database endpoint");
}
} finally {
lock.unlock();
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
Expand Down Expand Up @@ -54,6 +56,8 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider
* provided at startup via the MultiClusterClientConfig. All traffic will be routed according to this index.
*/
private volatile Integer activeMultiClusterIndex = 1;

private final Lock activeClusterIndexLock = new ReentrantLock(true);

/**
* Indicates the final cluster/database endpoint (connection pool), according to the pre-configured list
Expand Down Expand Up @@ -162,8 +166,9 @@ public int incrementActiveMultiClusterIndex() {

// Field-level synchronization is used to avoid the edge case in which
// setActiveMultiClusterIndex(int multiClusterIndex) is called at the same time
synchronized (activeMultiClusterIndex) {

activeClusterIndexLock.lock();

try {
String originalClusterName = getClusterCircuitBreaker().getName();

// Only increment if it can pass this validation otherwise we will need to check for NULL in the data path
Expand All @@ -185,6 +190,8 @@ public int incrementActiveMultiClusterIndex() {
incrementActiveMultiClusterIndex();

else log.warn("Cluster/database endpoint successfully updated from '{}' to '{}'", originalClusterName, circuitBreaker.getName());
} finally {
activeClusterIndexLock.unlock();
}

return activeMultiClusterIndex;
Expand Down Expand Up @@ -229,11 +236,13 @@ public void validateTargetConnection(int multiClusterIndex) {
* Special care should be taken to confirm cluster/database availability AND
* potentially cross-cluster replication BEFORE using this capability.
*/
public synchronized void setActiveMultiClusterIndex(int multiClusterIndex) {
public void setActiveMultiClusterIndex(int multiClusterIndex) {

// Field-level synchronization is used to avoid the edge case in which
// incrementActiveMultiClusterIndex() is called at the same time
synchronized (activeMultiClusterIndex) {
activeClusterIndexLock.lock();

try {

// Allows an attempt to reset the current cluster from a FORCED_OPEN to CLOSED state in the event that no failover is possible
if (activeMultiClusterIndex == multiClusterIndex &&
Expand All @@ -256,6 +265,8 @@ public synchronized void setActiveMultiClusterIndex(int multiClusterIndex) {

activeMultiClusterIndex = multiClusterIndex;
lastClusterCircuitBreakerForcedOpen = false;
} finally {
activeClusterIndexLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
Expand Down Expand Up @@ -43,7 +45,7 @@ public class SentineledConnectionProvider implements ConnectionProvider {

private final long subscribeRetryWaitTimeMillis;

private final Object initPoolLock = new Object();
private final Lock initPoolLock = new ReentrantLock(true);

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
Expand Down Expand Up @@ -95,7 +97,9 @@ public HostAndPort getCurrentMaster() {
}

private void initMaster(HostAndPort master) {
synchronized (initPoolLock) {
initPoolLock.lock();

try {
if (!master.equals(currentMaster)) {
currentMaster = master;

Expand All @@ -114,6 +118,8 @@ private void initMaster(HostAndPort master) {
existingPool.close();
}
}
} finally {
initPoolLock.unlock();
}
}

Expand Down

0 comments on commit e113b9c

Please sign in to comment.