Skip to content

Commit

Permalink
Support execute the read-only command on replica nodes (#3848)
Browse files Browse the repository at this point in the history
Co-authored-by: M Sazzadul Hoque <[email protected]>
  • Loading branch information
jjz921024 and sazzad16 authored Aug 12, 2024
1 parent e14b899 commit 0856245
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 5 deletions.
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ private void initializeFromClientConfig(final JedisClientConfig config) {
}
}

// set readonly flag to ALL connections (including master nodes) when enable read from replica
if (config.isReadOnlyForReplica()) {
fireAndForgetMsg.add(new CommandArguments(Command.READONLY));
}

for (CommandArguments arg : fireAndForgetMsg) {
sendCommand(arg);
}
Expand Down
25 changes: 21 additions & 4 deletions src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ public final class DefaultJedisClientConfig implements JedisClientConfig {

private final ClientSetInfoConfig clientSetInfoConfig;

private final boolean readOnlyForReplica;

private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMillis, int soTimeoutMillis,
int blockingSocketTimeoutMillis, Supplier<RedisCredentials> credentialsProvider, int database,
String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, HostAndPortMapper hostAndPortMapper,
ClientSetInfoConfig clientSetInfoConfig) {
ClientSetInfoConfig clientSetInfoConfig, boolean readOnlyForReplica) {
this.redisProtocol = protocol;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.socketTimeoutMillis = soTimeoutMillis;
Expand All @@ -44,6 +46,7 @@ private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMi
this.hostnameVerifier = hostnameVerifier;
this.hostAndPortMapper = hostAndPortMapper;
this.clientSetInfoConfig = clientSetInfoConfig;
this.readOnlyForReplica = readOnlyForReplica;
}

@Override
Expand Down Expand Up @@ -122,6 +125,11 @@ public ClientSetInfoConfig getClientSetInfoConfig() {
return clientSetInfoConfig;
}

@Override
public boolean isReadOnlyForReplica() {
return readOnlyForReplica;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -149,6 +157,8 @@ public static class Builder {

private ClientSetInfoConfig clientSetInfoConfig = ClientSetInfoConfig.DEFAULT;

private boolean readOnlyForReplicas = false;

private Builder() {
}

Expand All @@ -160,7 +170,8 @@ public DefaultJedisClientConfig build() {

return new DefaultJedisClientConfig(redisProtocol, connectionTimeoutMillis, socketTimeoutMillis,
blockingSocketTimeoutMillis, credentialsProvider, database, clientName, ssl,
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig);
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig,
readOnlyForReplicas);
}

/**
Expand Down Expand Up @@ -255,6 +266,11 @@ public Builder clientSetInfoConfig(ClientSetInfoConfig setInfoConfig) {
this.clientSetInfoConfig = setInfoConfig;
return this;
}

public Builder readOnlyForReplicas() {
this.readOnlyForReplicas = true;
return this;
}
}

public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int soTimeoutMillis,
Expand All @@ -264,7 +280,8 @@ public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int s
return new DefaultJedisClientConfig(null,
connectionTimeoutMillis, soTimeoutMillis, blockingSocketTimeoutMillis,
new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(user, password)), database,
clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null);
clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null,
false);
}

public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
Expand All @@ -273,6 +290,6 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
copy.getBlockingSocketTimeoutMillis(), copy.getCredentialsProvider(),
copy.getDatabase(), copy.getClientName(), copy.isSsl(), copy.getSslSocketFactory(),
copy.getSslParameters(), copy.getHostnameVerifier(), copy.getHostAndPortMapper(),
copy.getClientSetInfoConfig());
copy.getClientSetInfoConfig(), copy.isReadOnlyForReplica());
}
}
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ default HostAndPortMapper getHostAndPortMapper() {
return null;
}

default boolean isReadOnlyForReplica() {
return false;
}

/**
* Modify the behavior of internally executing CLIENT SETINFO command.
* @return CLIENT SETINFO config
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

Expand Down Expand Up @@ -266,4 +267,11 @@ public ClusterPipeline pipelined() {
public AbstractTransaction transaction(boolean doMulti) {
throw new UnsupportedOperationException();
}

public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
if (!(executor instanceof ClusterCommandExecutor)) {
throw new UnsupportedOperationException("Support only execute to replica in ClusterCommandExecutor");
}
return ((ClusterCommandExecutor) executor).executeCommandToReplica(commandObject);
}
}
34 changes: 34 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class JedisClusterInfoCache {
private final Map<String, ConnectionPool> nodes = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
private final List<ConnectionPool>[] replicaSlots;

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
Expand Down Expand Up @@ -85,6 +86,11 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(),
topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
if (clientConfig.isReadOnlyForReplica()) {
replicaSlots = new ArrayList[Protocol.CLUSTER_HASHSLOTS];
} else {
replicaSlots = null;
}
}

/**
Expand Down Expand Up @@ -144,6 +150,8 @@ public void discoverClusterNodesAndSlots(Connection jedis) {
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
} else if (clientConfig.isReadOnlyForReplica()) {
assignSlotsToReplicaNode(slotNums, targetNode);
}
}
}
Expand Down Expand Up @@ -236,6 +244,8 @@ private void discoverClusterSlots(Connection jedis) {
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
} else if (clientConfig.isReadOnlyForReplica()) {
assignSlotsToReplicaNode(slotNums, targetNode);
}
}
}
Expand Down Expand Up @@ -307,6 +317,21 @@ public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode)
}
}

public void assignSlotsToReplicaNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
ConnectionPool targetPool = setupNodeIfNotExist(targetNode);
for (Integer slot : targetSlots) {
if (replicaSlots[slot] == null) {
replicaSlots[slot] = new ArrayList<>();
}
replicaSlots[slot].add(targetPool);
}
} finally {
w.unlock();
}
}

public ConnectionPool getNode(String nodeKey) {
r.lock();
try {
Expand Down Expand Up @@ -338,6 +363,15 @@ public HostAndPort getSlotNode(int slot) {
}
}

public List<ConnectionPool> getSlotReplicaPools(int slot) {
r.lock();
try {
return replicaSlots[slot];
} finally {
r.unlock();
}
}

public Map<String, ConnectionPool> getNodes() {
r.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public final <T> T broadcastCommand(CommandObject<T> commandObject) {

@Override
public final <T> T executeCommand(CommandObject<T> commandObject) {
return doExecuteCommand(commandObject, false);
}

public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
return doExecuteCommand(commandObject, true);
}

private <T> T doExecuteCommand(CommandObject<T> commandObject, boolean toReplica) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);

JedisRedirectionException redirect = null;
Expand All @@ -88,7 +96,8 @@ public final <T> T executeCommand(CommandObject<T> commandObject) {
connection.executeCommand(Protocol.Command.ASKING);
}
} else {
connection = provider.getConnection(commandObject.getArguments());
connection = toReplica ? provider.getReplicaConnection(commandObject.getArguments())
: provider.getConnection(commandObject.getArguments());
}

return execute(connection, commandObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.ClusterCommandArguments;
Expand Down Expand Up @@ -102,6 +104,11 @@ public Connection getConnection(CommandArguments args) {
return slot >= 0 ? getConnectionFromSlot(slot) : getConnection();
}

public Connection getReplicaConnection(CommandArguments args) {
final int slot = ((ClusterCommandArguments) args).getCommandHashSlot();
return slot >= 0 ? getReplicaConnectionFromSlot(slot) : getConnection();
}

@Override
public Connection getConnection() {
// In antirez's redis-rb-cluster implementation, getRandomConnection always
Expand Down Expand Up @@ -158,6 +165,25 @@ public Connection getConnectionFromSlot(int slot) {
}
}

public Connection getReplicaConnectionFromSlot(int slot) {
List<ConnectionPool> connectionPools = cache.getSlotReplicaPools(slot);
ThreadLocalRandom random = ThreadLocalRandom.current();
if (connectionPools != null && !connectionPools.isEmpty()) {
// pick up randomly a connection
int idx = random.nextInt(connectionPools.size());
return connectionPools.get(idx).getResource();
}

renewSlotCache();
connectionPools = cache.getSlotReplicaPools(slot);
if (connectionPools != null && !connectionPools.isEmpty()) {
int idx = random.nextInt(connectionPools.size());
return connectionPools.get(idx).getResource();
}

return getConnectionFromSlot(slot);
}

@Override
public Map<String, ConnectionPool> getConnectionMap() {
return Collections.unmodifiableMap(getNodes());
Expand Down
27 changes: 27 additions & 0 deletions src/test/java/redis/clients/jedis/JedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,33 @@ public void testReadonlyAndReadwrite() throws Exception {
nodeSlave2.flushDB();
}

@Test
public void testReadFromReplicas() throws Exception {
node1.clusterMeet(LOCAL_IP, nodeInfoSlave2.getPort());
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, nodeSlave2);

for (String nodeInfo : node2.clusterNodes().split("\n")) {
if (nodeInfo.contains("myself")) {
nodeSlave2.clusterReplicate(nodeInfo.split(" ")[0]);
break;
}
}

DefaultJedisClientConfig READ_REPLICAS_CLIENT_CONFIG
= DefaultJedisClientConfig.builder().password("cluster").readOnlyForReplicas().build();
ClusterCommandObjects commandObjects = new ClusterCommandObjects();
try (JedisCluster jedisCluster = new JedisCluster(nodeInfo1, READ_REPLICAS_CLIENT_CONFIG,
DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG)) {
assertEquals("OK", jedisCluster.set("test", "read-from-replicas"));

assertEquals("read-from-replicas", jedisCluster.executeCommandToReplica(commandObjects.get("test")));
// TODO: ensure data being served from replica node(s)
}

nodeSlave2.clusterReset(ClusterResetType.SOFT);
nodeSlave2.flushDB();
}

/**
* slot->nodes 15363 node3 e
*/
Expand Down

0 comments on commit 0856245

Please sign in to comment.