Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support client-side caching from UnifiedJedis #3691

Merged
merged 15 commits into from
Jan 17, 2024
Merged
21 changes: 17 additions & 4 deletions src/main/java/redis/clients/jedis/ClientSideCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,27 @@

public class ClientSideCache {

private final Map<ByteBuffer, Object> cache = new HashMap<>();
private final Map<ByteBuffer, Object> cache;

protected ClientSideCache() {
public ClientSideCache() {
this.cache = new HashMap<>();
}

protected void invalidateKeys(List list) {
/**
* For testing purpose only.
* @param map
*/
ClientSideCache(Map<ByteBuffer, Object> map) {
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
this.cache = map;
}

public final void clear() {
cache.clear();
}

public final void invalidateKeys(List list) {
if (list == null) {
cache.clear();
clear();
return;
}

Expand Down
35 changes: 26 additions & 9 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ public Connection(final HostAndPort hostAndPort) {
}

public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig));
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeFromClientConfig(clientConfig);
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig), clientConfig);
}

public Connection(final JedisSocketFactory socketFactory) {
Expand All @@ -65,7 +63,15 @@ public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clie
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeFromClientConfig(clientConfig);
initializeConnection(clientConfig);
}

public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, ClientSideCache csCache) {
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeConnection(clientConfig);
initializeClientSideCache(csCache);
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -122,10 +128,6 @@ public void rollbackTimeout() {
}
}

final void setClientSideCache(ClientSideCache clientSideCache) {
this.clientSideCache = clientSideCache;
}

public Object executeCommand(final ProtocolCommand cmd) {
return executeCommand(new CommandArguments(cmd));
}
Expand Down Expand Up @@ -389,7 +391,7 @@ private static boolean validateClientInfo(String info) {
return true;
}

private void initializeFromClientConfig(final JedisClientConfig config) {
private void initializeConnection(final JedisClientConfig config) {
try {
connect();

Expand Down Expand Up @@ -516,4 +518,19 @@ public boolean ping() {
}
return true;
}

private void initializeClientSideCache(ClientSideCache csCache) {
this.clientSideCache = csCache;
if (clientSideCache != null) {
if (protocol != RedisProtocol.RESP3) {
throw new JedisException("Client side caching is only supported with RESP3.");
}

sendCommand(Protocol.Command.CLIENT, "TRACKING", "ON");
String reply = getStatusCodeReply();
if (!"OK".equals(reply)) {
throw new JedisException("Could not enable client tracking. Reply: " + reply);
}
}
}
}
18 changes: 13 additions & 5 deletions src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@ public class ConnectionFactory implements PooledObjectFactory<Connection> {
private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);

private final JedisSocketFactory jedisSocketFactory;

private final JedisClientConfig clientConfig;
private ClientSideCache clientSideCache = null;
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved

public ConnectionFactory(final HostAndPort hostAndPort) {
this.clientConfig = DefaultJedisClientConfig.builder().build();
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort);
}

public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this.clientConfig = DefaultJedisClientConfig.copyConfig(clientConfig);
this.clientConfig = clientConfig;
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
}

public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache) {
this.clientConfig = clientConfig;
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
this.clientSideCache = csCache;
}

public ConnectionFactory(final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
this.clientConfig = DefaultJedisClientConfig.copyConfig(clientConfig);
this.clientConfig = clientConfig;
this.jedisSocketFactory = jedisSocketFactory;
}

Expand All @@ -54,9 +60,11 @@ public void destroyObject(PooledObject<Connection> pooledConnection) throws Exce

@Override
public PooledObject<Connection> makeObject() throws Exception {
Connection jedis = null;
try {
jedis = new Connection(jedisSocketFactory, clientConfig);
Connection jedis = clientSideCache == null
? new Connection(jedisSocketFactory, clientConfig)
: new Connection(jedisSocketFactory, clientConfig, clientSideCache);
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved

return new DefaultPooledObject<>(jedis);
} catch (JedisException je) {
logger.debug("Error while makeObject", je);
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig));
}

public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache csCache) {
this(new ConnectionFactory(hostAndPort, clientConfig, csCache));
}

public ConnectionPool(PooledObjectFactory<Connection> factory) {
super(factory);
}
Expand All @@ -19,6 +23,11 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig);
}

public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache csCache,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig, csCache), poolConfig);
}

public ConnectionPool(PooledObjectFactory<Connection> factory,
GenericObjectPoolConfig<Connection> poolConfig) {
super(factory, poolConfig);
Expand Down
44 changes: 0 additions & 44 deletions src/main/java/redis/clients/jedis/JedisClientSideCache.java

This file was deleted.

54 changes: 44 additions & 10 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

Expand Down Expand Up @@ -198,28 +197,63 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
Duration.ofMillis((long) clientConfig.getSocketTimeoutMillis() * maxAttempts), poolConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol());
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
this(clusterNodes, clientConfig, clientSideCache, DEFAULT_MAX_ATTEMPTS,
Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()));
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol(), clientSideCache);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
DEFAULT_MAX_ATTEMPTS, Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()),
clientConfig.getRedisProtocol(), clientSideCache);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
}

private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
RedisProtocol protocol, ClientSideCache clientSideCache) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache);
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
}

public Map<String, ConnectionPool> getClusterNodes() {
Expand Down
43 changes: 39 additions & 4 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class JedisClusterInfoCache {

private final GenericObjectPoolConfig<Connection> poolConfig;
private final JedisClientConfig clientConfig;
private final ClientSideCache clientSideCache;
private final Set<HostAndPort> startNodes;

private static final int MASTER_NODE_INDEX = 2;
Expand All @@ -61,19 +62,35 @@ public void run() {
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, null, startNodes);
this(clientConfig, null, null, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache, final Set<HostAndPort> startNodes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency and alignment multiple constructors should follow an additive pattern. This means, adding the piece (csCache) as the last variable in this new constructor.

this(clientConfig, csCache, null, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, poolConfig, startNodes, null);
this(clientConfig, null, poolConfig, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, csCache, poolConfig, startNodes, null);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this(clientConfig, null, poolConfig, startNodes, topologyRefreshPeriod);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
this.clientSideCache = csCache;
this.startNodes = startNodes;
if (topologyRefreshPeriod != null) {
logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes);
Expand Down Expand Up @@ -209,6 +226,9 @@ private void discoverClusterSlots(Connection jedis) {
try {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
}
Set<String> hostAndPortKeys = new HashSet<>();

for (Object slotInfoObj : slotsInfo) {
Expand Down Expand Up @@ -270,15 +290,30 @@ public ConnectionPool setupNodeIfNotExist(final HostAndPort node) {
ConnectionPool existingPool = nodes.get(nodeKey);
if (existingPool != null) return existingPool;

ConnectionPool nodePool = poolConfig == null ? new ConnectionPool(node, clientConfig)
: new ConnectionPool(node, clientConfig, poolConfig);
ConnectionPool nodePool = createNodePool(node);
nodes.put(nodeKey, nodePool);
return nodePool;
} finally {
w.unlock();
}
}

private ConnectionPool createNodePool(HostAndPort node) {
if (poolConfig == null) {
if (clientSideCache == null) {
return new ConnectionPool(node, clientConfig);
} else {
return new ConnectionPool(node, clientConfig, clientSideCache);
}
} else {
if (clientSideCache == null) {
return new ConnectionPool(node, clientConfig, poolConfig);
} else {
return new ConnectionPool(node, clientConfig, clientSideCache, poolConfig);
}
}
}

public void assignSlotToNode(int slot, HostAndPort targetNode) {
w.lock();
try {
Expand Down
Loading
Loading