Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace synchronized with j.u.c.l.ReentrantLock for Loom #3480

Merged
merged 5 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.json.JSONArray;
Expand Down Expand Up @@ -50,6 +52,7 @@ protected RedisProtocol getProtocol() {
return protocol;
}

private Lock mapperLock = new ReentrantLock(true);
private volatile JsonObjectMapper jsonObjectMapper;
private final AtomicInteger searchDialect = new AtomicInteger(0);

Expand Down Expand Up @@ -4424,22 +4427,22 @@ public final CommandObject<Object> tFunctionCallAsync(String library, String fun
/**
* Get the instance for JsonObjectMapper if not null, otherwise a new instance reference with
* default implementation will be created and returned.
* <p>This process of checking whether or not
* the instance reference exists follows <a
* href="https://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java"
* target="_blank">'double-checked lock optimization'</a> approach to reduce the overhead of
* acquiring a lock by testing the lock criteria (the "lock hint") before acquiring the lock.</p>
*
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
* @return the JsonObjectMapper instance reference
* @see DefaultGsonObjectMapper
*/
private JsonObjectMapper getJsonObjectMapper() {
JsonObjectMapper localRef = this.jsonObjectMapper;
if (Objects.isNull(localRef)) {
synchronized (this) {
mapperLock.lock();

try {
localRef = this.jsonObjectMapper;
if (Objects.isNull(localRef)) {
this.jsonObjectMapper = localRef = new DefaultGsonObjectMapper();
}
} finally {
mapperLock.unlock();
}
}
return localRef;
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/redis/clients/jedis/JedisSentinelPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
Expand All @@ -28,7 +30,7 @@ public class JedisSentinelPool extends Pool<Jedis> {

private volatile HostAndPort currentHostMaster;

private final Object initPoolLock = new Object();
private final Lock initPoolLock = new ReentrantLock(true);

public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
final JedisClientConfig masterClientConfig, final JedisClientConfig sentinelClientConfig) {
Expand Down Expand Up @@ -213,7 +215,9 @@ public HostAndPort getCurrentHostMaster() {
}

private void initMaster(HostAndPort master) {
synchronized (initPoolLock) {
initPoolLock.lock();

try {
if (!master.equals(currentHostMaster)) {
currentHostMaster = master;
factory.setHostAndPort(currentHostMaster);
Expand All @@ -223,6 +227,8 @@ private void initMaster(HostAndPort master) {

LOG.info("Created JedisSentinelPool to master at {}", master);
}
} finally {
initPoolLock.unlock();
}
}

Expand Down
16 changes: 11 additions & 5 deletions src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import redis.clients.jedis.Builder;
Expand Down Expand Up @@ -106,9 +108,7 @@ private Builder<ResultSet> getBuilder(String graphName) {
}

private void createBuilder(String graphName) {
synchronized (builders) {
builders.putIfAbsent(graphName, new ResultSetBuilder(new GraphCacheImpl(graphName)));
}
builders.computeIfAbsent(graphName, graphNameKey -> new ResultSetBuilder(new GraphCacheImpl(graphNameKey)));
}

private class GraphCacheImpl implements GraphCache {
Expand Down Expand Up @@ -144,6 +144,8 @@ private class GraphCacheList {
private final String name;
private final String query;
private final List<String> data = new CopyOnWriteArrayList<>();

private final Lock dataLock = new ReentrantLock(true);

/**
*
Expand All @@ -164,14 +166,18 @@ public GraphCacheList(String name, String procedure) {
*/
public String getCachedData(int index) {
if (index >= data.size()) {
synchronized (data) {
dataLock.lock();

try {
if (index >= data.size()) {
getProcedureInfo();
}
} finally {
dataLock.unlock();
}
}

return data.get(index);

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package redis.clients.jedis.mcf;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
Expand All @@ -17,6 +19,7 @@
*/
@Experimental
public class CircuitBreakerFailoverBase implements AutoCloseable {
private final Lock lock = new ReentrantLock(true);

protected final MultiClusterPooledConnectionProvider provider;

Expand All @@ -32,29 +35,34 @@ public void close() {
/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
*/
protected synchronized void clusterFailover(CircuitBreaker circuitBreaker) {
protected void clusterFailover(CircuitBreaker circuitBreaker) {
lock.lock();

try {
// Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {

// Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {
// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();

// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();
// Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand()
// to use the next cluster's connection pool - according to the configuration's prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();

// Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand()
// to use the next cluster's connection pool - according to the configuration's prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();
// Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
}

// Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
}

// Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " +
"provided with an additional cluster/database endpoint according to its prioritized sequence. " +
"If applicable, consider failing back OR restarting with an available cluster/database endpoint");
// Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " +
"provided with an additional cluster/database endpoint according to its prioritized sequence. " +
"If applicable, consider failing back OR restarting with an available cluster/database endpoint");
}
} finally {
lock.unlock();
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
Expand Down Expand Up @@ -54,6 +56,8 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider
* provided at startup via the MultiClusterClientConfig. All traffic will be routed according to this index.
*/
private volatile Integer activeMultiClusterIndex = 1;

private final Lock activeClusterIndexLock = new ReentrantLock(true);

/**
* Indicates the final cluster/database endpoint (connection pool), according to the pre-configured list
Expand Down Expand Up @@ -162,8 +166,9 @@ public int incrementActiveMultiClusterIndex() {

// Field-level synchronization is used to avoid the edge case in which
// setActiveMultiClusterIndex(int multiClusterIndex) is called at the same time
synchronized (activeMultiClusterIndex) {

activeClusterIndexLock.lock();

try {
String originalClusterName = getClusterCircuitBreaker().getName();

// Only increment if it can pass this validation otherwise we will need to check for NULL in the data path
Expand All @@ -185,6 +190,8 @@ public int incrementActiveMultiClusterIndex() {
incrementActiveMultiClusterIndex();

else log.warn("Cluster/database endpoint successfully updated from '{}' to '{}'", originalClusterName, circuitBreaker.getName());
} finally {
activeClusterIndexLock.unlock();
}

return activeMultiClusterIndex;
Expand Down Expand Up @@ -229,11 +236,13 @@ public void validateTargetConnection(int multiClusterIndex) {
* Special care should be taken to confirm cluster/database availability AND
* potentially cross-cluster replication BEFORE using this capability.
*/
public synchronized void setActiveMultiClusterIndex(int multiClusterIndex) {
public void setActiveMultiClusterIndex(int multiClusterIndex) {

// Field-level synchronization is used to avoid the edge case in which
// incrementActiveMultiClusterIndex() is called at the same time
synchronized (activeMultiClusterIndex) {
activeClusterIndexLock.lock();

try {

// Allows an attempt to reset the current cluster from a FORCED_OPEN to CLOSED state in the event that no failover is possible
if (activeMultiClusterIndex == multiClusterIndex &&
Expand All @@ -256,6 +265,8 @@ public synchronized void setActiveMultiClusterIndex(int multiClusterIndex) {

activeMultiClusterIndex = multiClusterIndex;
lastClusterCircuitBreakerForcedOpen = false;
} finally {
activeClusterIndexLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
Expand Down Expand Up @@ -43,7 +45,7 @@ public class SentineledConnectionProvider implements ConnectionProvider {

private final long subscribeRetryWaitTimeMillis;

private final Object initPoolLock = new Object();
private final Lock initPoolLock = new ReentrantLock(true);

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
Expand Down Expand Up @@ -95,7 +97,9 @@ public HostAndPort getCurrentMaster() {
}

private void initMaster(HostAndPort master) {
synchronized (initPoolLock) {
initPoolLock.lock();

try {
if (!master.equals(currentMaster)) {
currentMaster = master;

Expand All @@ -114,6 +118,8 @@ private void initMaster(HostAndPort master) {
existingPool.close();
}
}
} finally {
initPoolLock.unlock();
}
}

Expand Down
Loading