Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client-side caching by command arguments #3698

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@
<version>2.10.1</version>
</dependency>

<!-- Optional dependencies -->
<!-- Client-side caching -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
<optional>true</optional>
</dependency>

<!-- UNIX socket connection support -->
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
Expand All @@ -90,6 +105,7 @@
<version>1.19.0</version>
<scope>test</scope>
</dependency>

<!-- test -->
<dependency>
<groupId>junit</groupId>
Expand Down
114 changes: 76 additions & 38 deletions src/main/java/redis/clients/jedis/ClientSideCache.java
Original file line number Diff line number Diff line change
@@ -1,71 +1,109 @@
package redis.clients.jedis;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import redis.clients.jedis.exceptions.JedisException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import redis.clients.jedis.util.SafeEncoder;

public class ClientSideCache {
public abstract class ClientSideCache {

private final Map<ByteBuffer, Object> cache;
private final Map<ByteBuffer, Set<Long>> keyHashes; // TODO: clean-up
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock readLock = rwl.readLock();
private final Lock writeLock = rwl.writeLock();

public ClientSideCache() {
this.cache = new HashMap<>();
protected ClientSideCache() {
this.keyHashes = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we are only adding stuff to this map. Some more housekeeping will be needed, otherwise the map will only grow, we need to remove unused keys and stuff, at some point. Could this be in itself a Cache, either Caffeine, or Guava?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, I'm not sure which would be a better approach for clean-up. I'm not thinking about this ATM as I hope it can be done without effecting public APIs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding TODO comments for now.

}

/**
* For testing purpose only.
* @param map
*/
ClientSideCache(Map<ByteBuffer, Object> map) {
this.cache = map;
protected ClientSideCache(Map<ByteBuffer, Set<Long>> keyHashes) {
this.keyHashes = keyHashes;
}

public final void clear() {
cache.clear();
}
public abstract void invalidateAll();

protected abstract void invalidateAll(Iterable<Long> hashes);

protected abstract void put(long hash, Object value);

public final void invalidateKeys(List list) {
protected abstract Object get(long hash);

final void invalidate(List list) {
if (list == null) {
clear();
invalidateAll();
return;
}

list.forEach(this::invalidateKey);
Set<Long> hashes = new HashSet<>();
list.forEach(key -> hashes.addAll(getHashes(key)));
invalidateAll(hashes);
// TODO: clean-up keyHashes
}

private void invalidateKey(Object key) {
if (key instanceof byte[]) {
cache.remove(convertKey((byte[]) key));
} else {
throw new JedisException("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
private Set<Long> getHashes(Object key) {
if (!(key instanceof byte[])) {
throw new AssertionError("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
}
}

protected void setKey(Object key, Object value) {
cache.put(getMapKey(key), value);
final ByteBuffer mapKey = makeKey((byte[]) key);
readLock.lock();
try {
Set<Long> hashes = keyHashes.get(mapKey);
return hashes != null ? hashes : Collections.emptySet();
} finally {
readLock.unlock();
}
}

protected <T> T getValue(Object key) {
return (T) getMapValue(key);
final <T> T getValue(Function<CommandObject<T>, T> loader, CommandObject<T> command, String... keys) {

final long hash = getHash(command);
gerzse marked this conversation as resolved.
Show resolved Hide resolved

T value = (T) get(hash);
if (value != null) {
return value;
}

value = loader.apply(command);
if (value != null) {
writeLock.lock();
try {
put(hash, value);
for (String key : keys) {
ByteBuffer mapKey = makeKey(key);
if (keyHashes.containsKey(mapKey)) {
keyHashes.get(mapKey).add(hash);
} else {
Set<Long> set = new HashSet<>();
set.add(hash);
keyHashes.put(mapKey, set);
}
}
} finally {
writeLock.unlock();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are cached values for this key? Should we remove them, given that the key was removed on the server, most probably? Should there be an else for the if (value != null)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean, what if there are cached values for this "command"? In that case we have already returned the result in Line#70.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I had "command" in mind, not "key", and if there is anything cached, you don't even end up in this code.

Now that I re-read this, this might be a clean-up scenario: you have a command that is not cached, read from the server, get null, you could clean up potential other commands that were cached for the same key(s).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems redundant as it should be covered by void invalidate(List list) (previously void invalidateKeys(List list)).


return value;
}

private Object getMapValue(Object key) {
return cache.get(getMapKey(key));
private long getHash(CommandObject command) {
// TODO:
return 0;
}

private ByteBuffer getMapKey(Object key) {
if (key instanceof byte[]) {
return convertKey((byte[]) key);
} else {
return convertKey(SafeEncoder.encode(String.valueOf(key)));
}
private ByteBuffer makeKey(String key) {
return makeKey(SafeEncoder.encode(key));
}

private static ByteBuffer convertKey(byte[] b) {
private static ByteBuffer makeKey(byte[] b) {
return ByteBuffer.wrap(b);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void discoverClusterSlots(Connection jedis) {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.invalidateAll();
}
Set<String> hostAndPortKeys = new HashSet<>();

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private static void processPush(final RedisInputStream is, ClientSideCache cache
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
cache.invalidateKeys((List) list.get(1));
cache.invalidate((List) list.get(1));
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig co
this.commandObjects.setBroadcastAndRoundRobinConfig(this.broadcastAndRoundRobinConfig);
}

private <T> T executeClientSideCacheCommand(CommandObject<T> command, String... keys) {
if (clientSideCache == null) {
return executeCommand(command);
}

return clientSideCache.getValue((cmd) -> executeCommand(cmd), command, keys);
}

public String ping() {
return checkAndBroadcastCommand(commandObjects.ping());
}
Expand Down Expand Up @@ -749,15 +757,7 @@ public String set(String key, String value, SetParams params) {

@Override
public String get(String key) {
if (clientSideCache != null) {
String cachedValue = clientSideCache.getValue(key);
if (cachedValue != null) return cachedValue;

String value = executeCommand(commandObjects.get(key));
if (value != null) clientSideCache.setKey(key, value);
return value;
}
return executeCommand(commandObjects.get(key));
return executeClientSideCacheCommand(commandObjects.get(key), key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void initMaster(HostAndPort master) {
pool = newPool;
LOG.info("Created connection pool to master at {}.", master);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.invalidateAll();
}

if (existingPool != null) {
Expand Down
50 changes: 50 additions & 0 deletions src/main/java/redis/clients/jedis/util/CaffeineCSC.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package redis.clients.jedis.util;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.ClientSideCache;

public class CaffeineCSC extends ClientSideCache {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought that comes to my mind when I read this: the general advice is to favor composition over inheritance. Maybe we could model this via composition, put an interface on top of CaffeineCSC and GuavaCSC and inject that interface into ClientSideCache?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClientSideCache itself is that interface (abstract class). It is an optional parameter for our actual client classes.


private static final int DEFAULT_MAXIMUM_SIZE = 10_000;

private final Cache<Long, Object> cache;

public CaffeineCSC() {
this(DEFAULT_MAXIMUM_SIZE);
}

public CaffeineCSC(int maximumSize) {
this(Caffeine.newBuilder().maximumSize(maximumSize).build());
}

public CaffeineCSC(int maximumSize, int ttlSeconds) {
this(Caffeine.newBuilder().maximumSize(maximumSize)
.expireAfterWrite(ttlSeconds, TimeUnit.SECONDS).build());
}

public CaffeineCSC(Cache<Long, Object> caffeineCache) {
this.cache = caffeineCache;
}

@Override
public final void invalidateAll() {
cache.invalidateAll();
}

@Override
protected void invalidateAll(Iterable<Long> hashes) {
cache.invalidateAll(hashes);
}

@Override
protected void put(long hash, Object value) {
cache.put(hash, value);
}

@Override
protected Object get(long hash) {
return cache.getIfPresent(hash);
}
}
50 changes: 50 additions & 0 deletions src/main/java/redis/clients/jedis/util/GuavaCSC.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package redis.clients.jedis.util;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.ClientSideCache;

public class GuavaCSC extends ClientSideCache {

private static final int DEFAULT_MAXIMUM_SIZE = 10_000;

private final Cache<Long, Object> cache;

public GuavaCSC() {
this(DEFAULT_MAXIMUM_SIZE);
}

public GuavaCSC(int maximumSize) {
this(CacheBuilder.newBuilder().maximumSize(maximumSize).build());
}

public GuavaCSC(int maximumSize, int ttlSeconds) {
this(CacheBuilder.newBuilder().maximumSize(maximumSize)
.expireAfterWrite(ttlSeconds, TimeUnit.SECONDS).build());
}

public GuavaCSC(Cache<Long, Object> guavaCache) {
this.cache = guavaCache;
}

@Override
public final void invalidateAll() {
cache.invalidateAll();
}

@Override
protected void invalidateAll(Iterable<Long> hashes) {
cache.invalidateAll(hashes);
}

@Override
protected void put(long hash, Object value) {
cache.put(hash, value);
}

@Override
protected Object get(long hash) {
return cache.getIfPresent(hash);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Set;
Expand All @@ -14,6 +13,7 @@
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.hamcrest.Matchers;
import org.junit.Test;
import redis.clients.jedis.util.MapCSC;

public class JedisClusterClientSideCacheTest extends JedisClusterTestBase {

Expand All @@ -31,7 +31,7 @@ public class JedisClusterClientSideCacheTest extends JedisClusterTestBase {

@Test
public void simple() {
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache())) {
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC())) {
jedis.set("foo", "bar");
assertEquals("bar", jedis.get("foo"));
jedis.del("foo");
Expand All @@ -41,8 +41,8 @@ public void simple() {

@Test
public void simpleWithSimpleMap() {
HashMap<ByteBuffer, Object> map = new HashMap<>();
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache(map), singleConnectionPoolConfig.get())) {
HashMap<Long, Object> map = new HashMap<>();
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC(map), singleConnectionPoolConfig.get())) {
jedis.set("foo", "bar");
assertThat(map, Matchers.aMapWithSize(0));
assertEquals("bar", jedis.get("foo"));
Expand All @@ -60,7 +60,7 @@ public void simpleWithSimpleMap() {

@Test
public void flushAll() {
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache())) {
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC())) {
jedis.set("foo", "bar");
assertEquals("bar", jedis.get("foo"));
jedis.flushAll();
Expand All @@ -70,8 +70,8 @@ public void flushAll() {

@Test
public void flushAllWithSimpleMap() {
HashMap<ByteBuffer, Object> map = new HashMap<>();
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache(map), singleConnectionPoolConfig.get())) {
HashMap<Long, Object> map = new HashMap<>();
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC(map), singleConnectionPoolConfig.get())) {
jedis.set("foo", "bar");
assertThat(map, Matchers.aMapWithSize(0));
assertEquals("bar", jedis.get("foo"));
Expand Down
Loading
Loading