Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Client-side caching through URI/URL #3703

Merged
merged 18 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@
<version>2.10.1</version>
</dependency>

<!-- Optional dependencies -->
<!-- Client-side caching -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.16</version>
<optional>true</optional>
</dependency>

<!-- UNIX socket connection support -->
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
Expand All @@ -90,6 +111,7 @@
<version>1.19.0</version>
<scope>test</scope>
</dependency>

<!-- test -->
<dependency>
<groupId>junit</groupId>
Expand Down
108 changes: 68 additions & 40 deletions src/main/java/redis/clients/jedis/ClientSideCache.java
Original file line number Diff line number Diff line change
@@ -1,71 +1,99 @@
package redis.clients.jedis;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import redis.clients.jedis.exceptions.JedisException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import redis.clients.jedis.util.SafeEncoder;

public class ClientSideCache {
public abstract class ClientSideCache {

private final Map<ByteBuffer, Object> cache;
private final Map<ByteBuffer, Set<Long>> keyHashes;
private final ReentrantLock writeLock = new ReentrantLock();

public ClientSideCache() {
this.cache = new HashMap<>();
protected ClientSideCache() {
this.keyHashes = new ConcurrentHashMap<>();
}

/**
* For testing purpose only.
* @param map
*/
ClientSideCache(Map<ByteBuffer, Object> map) {
this.cache = map;
}
public abstract void invalidateAll();

public final void clear() {
cache.clear();
}
protected abstract void invalidateAll(Iterable<Long> hashes);

public final void invalidateKeys(List list) {
final void invalidate(List list) {
if (list == null) {
clear();
invalidateAll();
return;
}

list.forEach(this::invalidateKey);
list.forEach(this::invalidate0);
}

private void invalidateKey(Object key) {
if (key instanceof byte[]) {
cache.remove(convertKey((byte[]) key));
} else {
throw new JedisException("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
private void invalidate0(Object key) {
if (!(key instanceof byte[])) {
throw new AssertionError("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
}
}

protected void setKey(Object key, Object value) {
cache.put(getMapKey(key), value);
}
final ByteBuffer mapKey = makeKey((byte[]) key);

protected <T> T getValue(Object key) {
return (T) getMapValue(key);
Set<Long> hashes = keyHashes.get(mapKey);
if (hashes != null) {
writeLock.lock();
try {
invalidateAll(hashes);
keyHashes.remove(mapKey);
} finally {
writeLock.unlock();
}
}
}

private Object getMapValue(Object key) {
return cache.get(getMapKey(key));
}
protected abstract void put(long hash, Object value);

protected abstract Object get(long hash);

private ByteBuffer getMapKey(Object key) {
if (key instanceof byte[]) {
return convertKey((byte[]) key);
} else {
return convertKey(SafeEncoder.encode(String.valueOf(key)));
final <T> T getValue(Function<CommandObject<T>, T> loader, CommandObject<T> command, String... keys) {

final long hash = getHash(command);

T value = (T) get(hash);
if (value != null) {
return value;
}

value = loader.apply(command);
if (value != null) {
writeLock.lock();
try {
put(hash, value);
for (String key : keys) {
ByteBuffer mapKey = makeKey(key);
if (keyHashes.containsKey(mapKey)) {
keyHashes.get(mapKey).add(hash);
} else {
Set<Long> set = new HashSet<>();
set.add(hash);
keyHashes.put(mapKey, set);
}
}
} finally {
writeLock.unlock();
}
}

return value;
}

protected abstract long getHash(CommandObject command);

private ByteBuffer makeKey(String key) {
return makeKey(SafeEncoder.encode(key));
}

private static ByteBuffer convertKey(byte[] b) {
private static ByteBuffer makeKey(byte[] b) {
return ByteBuffer.wrap(b);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void discoverClusterSlots(Connection jedis) {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.invalidateAll();
}
Set<String> hostAndPortKeys = new HashSet<>();

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public JedisPooled() {
* @param url
*/
public JedisPooled(final String url) {
this(URI.create(url));
super(url);
}

/**
Expand Down Expand Up @@ -76,7 +76,7 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
}

public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache) {
super(new PooledConnectionProvider(hostAndPort, clientConfig, csCache), clientConfig.getRedisProtocol(), csCache);
super(hostAndPort, clientConfig, csCache);
}

public JedisPooled(PooledObjectFactory<Connection> factory) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private static void processPush(final RedisInputStream is, ClientSideCache cache
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
cache.invalidateKeys((List) list.get(1));
cache.invalidate((List) list.get(1));
}
}

Expand Down
26 changes: 15 additions & 11 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public UnifiedJedis(final URI uri) {
this(JedisURIHelper.getHostAndPort(uri), DefaultJedisClientConfig.builder()
.user(JedisURIHelper.getUser(uri)).password(JedisURIHelper.getPassword(uri))
.database(JedisURIHelper.getDBIndex(uri)).protocol(JedisURIHelper.getRedisProtocol(uri))
.ssl(JedisURIHelper.isRedisSSLScheme(uri)).build());
.ssl(JedisURIHelper.isRedisSSLScheme(uri)).build(), JedisURIHelper.getClientSideCache(uri));
}

public UnifiedJedis(final URI uri, JedisClientConfig config) {
Expand All @@ -85,13 +85,17 @@ public UnifiedJedis(final URI uri, JedisClientConfig config) {
.protocol(JedisURIHelper.getRedisProtocol(uri))
.ssl(JedisURIHelper.isRedisSSLScheme(uri)).sslSocketFactory(config.getSslSocketFactory())
.sslParameters(config.getSslParameters()).hostnameVerifier(config.getHostnameVerifier())
.build());
.build(), JedisURIHelper.getClientSideCache(uri));
}

public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new PooledConnectionProvider(hostAndPort, clientConfig), clientConfig.getRedisProtocol());
}

public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
this(new PooledConnectionProvider(hostAndPort, clientConfig, clientSideCache), clientConfig.getRedisProtocol(), clientSideCache);
}

public UnifiedJedis(ConnectionProvider provider) {
this(new DefaultCommandExecutor(provider), provider);
}
Expand Down Expand Up @@ -295,6 +299,14 @@ public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig co
this.commandObjects.setBroadcastAndRoundRobinConfig(this.broadcastAndRoundRobinConfig);
}

private <T> T executeClientSideCacheCommand(CommandObject<T> command, String... keys) {
if (clientSideCache == null) {
return executeCommand(command);
}

return clientSideCache.getValue((cmd) -> executeCommand(cmd), command, keys);
}

public String ping() {
return checkAndBroadcastCommand(commandObjects.ping());
}
Expand Down Expand Up @@ -749,15 +761,7 @@ public String set(String key, String value, SetParams params) {

@Override
public String get(String key) {
if (clientSideCache != null) {
String cachedValue = clientSideCache.getValue(key);
if (cachedValue != null) return cachedValue;

String value = executeCommand(commandObjects.get(key));
if (value != null) clientSideCache.setKey(key, value);
return value;
}
return executeCommand(commandObjects.get(key));
return executeClientSideCacheCommand(commandObjects.get(key), key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void initMaster(HostAndPort master) {
pool = newPool;
LOG.info("Created connection pool to master at {}.", master);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.invalidateAll();
}

if (existingPool != null) {
Expand Down
95 changes: 95 additions & 0 deletions src/main/java/redis/clients/jedis/util/CaffeineCSC.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package redis.clients.jedis.util;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import net.openhft.hashing.LongHashFunction;
import redis.clients.jedis.ClientSideCache;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.args.Rawable;

public class CaffeineCSC extends ClientSideCache {

private static final int DEFAULT_MAXIMUM_SIZE = 10_000;
private static final int DEFAULT_EXPIRE_SECONDS = 100;
private static final LongHashFunction DEFAULT_HASH_FUNCTION = LongHashFunction.xx3();

private final Cache<Long, Object> cache;
private final LongHashFunction function;

public CaffeineCSC(Cache<Long, Object> caffeineCache, LongHashFunction hashFunction) {
this.cache = caffeineCache;
this.function = hashFunction;
}

@Override
public final void invalidateAll() {
cache.invalidateAll();
}

@Override
protected void invalidateAll(Iterable<Long> hashes) {
cache.invalidateAll(hashes);
}

@Override
protected void put(long hash, Object value) {
cache.put(hash, value);
}

@Override
protected Object get(long hash) {
return cache.getIfPresent(hash);
}

@Override
protected final long getHash(CommandObject command) {
long[] nums = new long[command.getArguments().size() + 1];
int idx = 0;
for (Rawable raw : command.getArguments()) {
nums[idx++] = function.hashBytes(raw.getRaw());
}
nums[idx] = function.hashInt(command.getBuilder().hashCode());
return function.hashLongs(nums);
}

public static Builder builder() {
return new Builder();
}

public static class Builder {

private long maximumSize = DEFAULT_MAXIMUM_SIZE;
private long expireTime = DEFAULT_EXPIRE_SECONDS;
private final TimeUnit expireTimeUnit = TimeUnit.SECONDS;

private LongHashFunction hashFunction = DEFAULT_HASH_FUNCTION;

private Builder() { }

public Builder maximumSize(int size) {
this.maximumSize = size;
return this;
}

public Builder ttl(int seconds) {
this.expireTime = seconds;
return this;
}

public Builder hashFunction(LongHashFunction function) {
this.hashFunction = function;
return this;
}

public CaffeineCSC build() {
Caffeine cb = Caffeine.newBuilder();

cb.maximumSize(maximumSize);

cb.expireAfterWrite(expireTime, expireTimeUnit);

return new CaffeineCSC(cb.build(), hashFunction);
}
}
}
Loading
Loading