diff --git a/pom.xml b/pom.xml index c9fd15d2cb7..e6ff5bc70b9 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,27 @@ 2.10.1 + + + + com.google.guava + guava + 33.0.0-jre + true + + + com.github.ben-manes.caffeine + caffeine + 2.9.3 + true + + + net.openhft + zero-allocation-hashing + 0.16 + true + + com.kohlschutter.junixsocket @@ -90,6 +111,7 @@ 1.19.0 test + junit diff --git a/src/main/java/redis/clients/jedis/ClientSideCache.java b/src/main/java/redis/clients/jedis/ClientSideCache.java index 62c5be28c24..c2c9248acf4 100644 --- a/src/main/java/redis/clients/jedis/ClientSideCache.java +++ b/src/main/java/redis/clients/jedis/ClientSideCache.java @@ -1,71 +1,104 @@ package redis.clients.jedis; import java.nio.ByteBuffer; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; - -import redis.clients.jedis.exceptions.JedisException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import redis.clients.jedis.util.SafeEncoder; -public class ClientSideCache { +/** + * The class to manage the client-side caching. User can provide any of implementation of this class to the client + * object; e.g. {@link redis.clients.jedis.util.CaffeineCSC CaffeineCSC} or + * {@link redis.clients.jedis.util.GuavaCSC GuavaCSC} or a custom implementation of their own. + */ +public abstract class ClientSideCache { - private final Map cache; + protected static final int DEFAULT_MAXIMUM_SIZE = 10_000; + protected static final int DEFAULT_EXPIRE_SECONDS = 100; - public ClientSideCache() { - this.cache = new HashMap<>(); - } + private final Map> keyToCommandHashes; - /** - * For testing purpose only. - * @param map - */ - ClientSideCache(Map map) { - this.cache = map; + protected ClientSideCache() { + this.keyToCommandHashes = new ConcurrentHashMap<>(); } + protected abstract void invalidateAllCommandHashes(); + + protected abstract void invalidateCommandHashes(Iterable hashes); + + protected abstract void put(long hash, Object value); + + protected abstract Object get(long hash); + + protected abstract long getCommandHash(CommandObject command); + public final void clear() { - cache.clear(); + invalidateAllKeysAndCommandHashes(); } - public final void invalidateKeys(List list) { + final void invalidate(List list) { if (list == null) { - clear(); + invalidateAllKeysAndCommandHashes(); return; } - list.forEach(this::invalidateKey); + list.forEach(this::invalidateKeyAndRespectiveCommandHashes); } - private void invalidateKey(Object key) { - if (key instanceof byte[]) { - cache.remove(convertKey((byte[]) key)); - } else { - throw new JedisException("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key)); - } + private void invalidateAllKeysAndCommandHashes() { + invalidateAllCommandHashes(); + keyToCommandHashes.clear(); } - protected void setKey(Object key, Object value) { - cache.put(getMapKey(key), value); - } + private void invalidateKeyAndRespectiveCommandHashes(Object key) { + if (!(key instanceof byte[])) { + throw new AssertionError("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key)); + } - protected T getValue(Object key) { - return (T) getMapValue(key); - } + final ByteBuffer mapKey = makeKeyForKeyToCommandHashes((byte[]) key); - private Object getMapValue(Object key) { - return cache.get(getMapKey(key)); + Set hashes = keyToCommandHashes.get(mapKey); + if (hashes != null) { + invalidateCommandHashes(hashes); + keyToCommandHashes.remove(mapKey); + } } - private ByteBuffer getMapKey(Object key) { - if (key instanceof byte[]) { - return convertKey((byte[]) key); - } else { - return convertKey(SafeEncoder.encode(String.valueOf(key))); + final T getValue(Function, T> loader, CommandObject command, String... keys) { + + final long hash = getCommandHash(command); + + T value = (T) get(hash); + if (value != null) { + return value; } + + value = loader.apply(command); + if (value != null) { + put(hash, value); + for (String key : keys) { + ByteBuffer mapKey = makeKeyForKeyToCommandHashes(key); + if (keyToCommandHashes.containsKey(mapKey)) { + keyToCommandHashes.get(mapKey).add(hash); + } else { + Set set = new HashSet<>(); + set.add(hash); + keyToCommandHashes.put(mapKey, set); + } + } + } + + return value; + } + + private ByteBuffer makeKeyForKeyToCommandHashes(String key) { + return makeKeyForKeyToCommandHashes(SafeEncoder.encode(key)); } - private static ByteBuffer convertKey(byte[] b) { + private static ByteBuffer makeKeyForKeyToCommandHashes(byte[] b) { return ByteBuffer.wrap(b); } } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 4af1261cd40..4bd82fec1ef 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -248,7 +248,7 @@ private static void processPush(final RedisInputStream is, ClientSideCache cache //System.out.println("PUSH: " + SafeEncoder.encodeObject(list)); if (list.size() == 2 && list.get(0) instanceof byte[] && Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) { - cache.invalidateKeys((List) list.get(1)); + cache.invalidate((List) list.get(1)); } } diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index ba7b36b134b..3a2dec9d771 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -295,6 +295,14 @@ public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig co this.commandObjects.setBroadcastAndRoundRobinConfig(this.broadcastAndRoundRobinConfig); } + private T executeClientSideCacheCommand(CommandObject command, String... keys) { + if (clientSideCache == null) { + return executeCommand(command); + } + + return clientSideCache.getValue((cmd) -> executeCommand(cmd), command, keys); + } + public String ping() { return checkAndBroadcastCommand(commandObjects.ping()); } @@ -749,15 +757,7 @@ public String set(String key, String value, SetParams params) { @Override public String get(String key) { - if (clientSideCache != null) { - String cachedValue = clientSideCache.getValue(key); - if (cachedValue != null) return cachedValue; - - String value = executeCommand(commandObjects.get(key)); - if (value != null) clientSideCache.setKey(key, value); - return value; - } - return executeCommand(commandObjects.get(key)); + return executeClientSideCacheCommand(commandObjects.get(key), key); } @Override diff --git a/src/main/java/redis/clients/jedis/util/CaffeineCSC.java b/src/main/java/redis/clients/jedis/util/CaffeineCSC.java new file mode 100644 index 00000000000..3bce3504b38 --- /dev/null +++ b/src/main/java/redis/clients/jedis/util/CaffeineCSC.java @@ -0,0 +1,93 @@ +package redis.clients.jedis.util; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.util.concurrent.TimeUnit; +import net.openhft.hashing.LongHashFunction; +import redis.clients.jedis.ClientSideCache; +import redis.clients.jedis.CommandObject; +import redis.clients.jedis.args.Rawable; + +public class CaffeineCSC extends ClientSideCache { + + private static final LongHashFunction DEFAULT_HASH_FUNCTION = LongHashFunction.xx3(); + + private final Cache cache; + private final LongHashFunction function; + + public CaffeineCSC(Cache caffeineCache, LongHashFunction hashFunction) { + this.cache = caffeineCache; + this.function = hashFunction; + } + + @Override + protected final void invalidateAllCommandHashes() { + cache.invalidateAll(); + } + + @Override + protected void invalidateCommandHashes(Iterable hashes) { + cache.invalidateAll(hashes); + } + + @Override + protected void put(long hash, Object value) { + cache.put(hash, value); + } + + @Override + protected Object get(long hash) { + return cache.getIfPresent(hash); + } + + @Override + protected final long getCommandHash(CommandObject command) { + long[] nums = new long[command.getArguments().size() + 1]; + int idx = 0; + for (Rawable raw : command.getArguments()) { + nums[idx++] = function.hashBytes(raw.getRaw()); + } + nums[idx] = function.hashInt(command.getBuilder().hashCode()); + return function.hashLongs(nums); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private long maximumSize = DEFAULT_MAXIMUM_SIZE; + private long expireTime = DEFAULT_EXPIRE_SECONDS; + private final TimeUnit expireTimeUnit = TimeUnit.SECONDS; + + private LongHashFunction hashFunction = DEFAULT_HASH_FUNCTION; + + private Builder() { } + + public Builder maximumSize(int size) { + this.maximumSize = size; + return this; + } + + public Builder ttl(int seconds) { + this.expireTime = seconds; + return this; + } + + public Builder hashFunction(LongHashFunction function) { + this.hashFunction = function; + return this; + } + + public CaffeineCSC build() { + Caffeine cb = Caffeine.newBuilder(); + + cb.maximumSize(maximumSize); + + cb.expireAfterWrite(expireTime, expireTimeUnit); + + return new CaffeineCSC(cb.build(), hashFunction); + } + } +} diff --git a/src/main/java/redis/clients/jedis/util/GuavaCSC.java b/src/main/java/redis/clients/jedis/util/GuavaCSC.java new file mode 100644 index 00000000000..d9973b7dc60 --- /dev/null +++ b/src/main/java/redis/clients/jedis/util/GuavaCSC.java @@ -0,0 +1,90 @@ +package redis.clients.jedis.util; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import java.util.concurrent.TimeUnit; +import redis.clients.jedis.ClientSideCache; +import redis.clients.jedis.CommandObject; + +public class GuavaCSC extends ClientSideCache { + + private static final HashFunction DEFAULT_HASH_FUNCTION = com.google.common.hash.Hashing.fingerprint2011(); + + private final Cache cache; + private final HashFunction function; + + public GuavaCSC(Cache guavaCache, HashFunction hashFunction) { + this.cache = guavaCache; + this.function = hashFunction; + } + + @Override + protected final void invalidateAllCommandHashes() { + cache.invalidateAll(); + } + + @Override + protected void invalidateCommandHashes(Iterable hashes) { + cache.invalidateAll(hashes); + } + + @Override + protected void put(long hash, Object value) { + cache.put(hash, value); + } + + @Override + protected Object get(long hash) { + return cache.getIfPresent(hash); + } + + @Override + protected final long getCommandHash(CommandObject command) { + Hasher hasher = function.newHasher(); + command.getArguments().forEach(raw -> hasher.putBytes(raw.getRaw())); + hasher.putInt(command.getBuilder().hashCode()); + return hasher.hash().asLong(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private long maximumSize = DEFAULT_MAXIMUM_SIZE; + private long expireTime = DEFAULT_EXPIRE_SECONDS; + private final TimeUnit expireTimeUnit = TimeUnit.SECONDS; + + private HashFunction hashFunction = DEFAULT_HASH_FUNCTION; + + private Builder() { } + + public Builder maximumSize(int size) { + this.maximumSize = size; + return this; + } + + public Builder ttl(int seconds) { + this.expireTime = seconds; + return this; + } + + public Builder hashFunction(HashFunction function) { + this.hashFunction = function; + return this; + } + + public GuavaCSC build() { + CacheBuilder cb = CacheBuilder.newBuilder(); + + cb.maximumSize(maximumSize); + + cb.expireAfterWrite(expireTime, expireTimeUnit); + + return new GuavaCSC(cb.build(), hashFunction); + } + } +} diff --git a/src/test/java/redis/clients/jedis/JedisClusterClientSideCacheTest.java b/src/test/java/redis/clients/jedis/JedisClusterClientSideCacheTest.java index 3c8bc18c5ce..60ddf002d73 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterClientSideCacheTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterClientSideCacheTest.java @@ -4,7 +4,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.Set; @@ -14,6 +13,7 @@ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.hamcrest.Matchers; import org.junit.Test; +import redis.clients.jedis.util.MapCSC; public class JedisClusterClientSideCacheTest extends JedisClusterTestBase { @@ -31,7 +31,7 @@ public class JedisClusterClientSideCacheTest extends JedisClusterTestBase { @Test public void simple() { - try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache())) { + try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC())) { jedis.set("foo", "bar"); assertEquals("bar", jedis.get("foo")); jedis.del("foo"); @@ -41,8 +41,8 @@ public void simple() { @Test public void simpleWithSimpleMap() { - HashMap map = new HashMap<>(); - try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache(map), singleConnectionPoolConfig.get())) { + HashMap map = new HashMap<>(); + try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC(map), singleConnectionPoolConfig.get())) { jedis.set("foo", "bar"); assertThat(map, Matchers.aMapWithSize(0)); assertEquals("bar", jedis.get("foo")); @@ -60,7 +60,7 @@ public void simpleWithSimpleMap() { @Test public void flushAll() { - try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache())) { + try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC())) { jedis.set("foo", "bar"); assertEquals("bar", jedis.get("foo")); jedis.flushAll(); @@ -70,8 +70,8 @@ public void flushAll() { @Test public void flushAllWithSimpleMap() { - HashMap map = new HashMap<>(); - try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache(map), singleConnectionPoolConfig.get())) { + HashMap map = new HashMap<>(); + try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC(map), singleConnectionPoolConfig.get())) { jedis.set("foo", "bar"); assertThat(map, Matchers.aMapWithSize(0)); assertEquals("bar", jedis.get("foo")); diff --git a/src/test/java/redis/clients/jedis/JedisPooledClientSideCacheTest.java b/src/test/java/redis/clients/jedis/JedisPooledClientSideCacheTest.java index ad4313a4b72..2e641e0f3a8 100644 --- a/src/test/java/redis/clients/jedis/JedisPooledClientSideCacheTest.java +++ b/src/test/java/redis/clients/jedis/JedisPooledClientSideCacheTest.java @@ -4,7 +4,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.function.Supplier; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -12,6 +11,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import redis.clients.jedis.util.MapCSC; public class JedisPooledClientSideCacheTest { @@ -42,7 +42,7 @@ public void tearDown() throws Exception { @Test public void simple() { - try (JedisPooled jedis = new JedisPooled(hnp, clientConfig.get(), new ClientSideCache())) { + try (JedisPooled jedis = new JedisPooled(hnp, clientConfig.get(), new MapCSC())) { control.set("foo", "bar"); assertEquals("bar", jedis.get("foo")); control.del("foo"); @@ -52,8 +52,8 @@ public void simple() { @Test public void simpleWithSimpleMap() { - HashMap map = new HashMap<>(); - try (JedisPooled jedis = new JedisPooled(hnp, clientConfig.get(), new ClientSideCache(map), singleConnectionPoolConfig.get())) { + HashMap map = new HashMap<>(); + try (JedisPooled jedis = new JedisPooled(hnp, clientConfig.get(), new MapCSC(map), singleConnectionPoolConfig.get())) { control.set("foo", "bar"); assertThat(map, Matchers.aMapWithSize(0)); assertEquals("bar", jedis.get("foo")); @@ -71,7 +71,7 @@ public void simpleWithSimpleMap() { @Test public void flushAll() { - try (JedisPooled jedis = new JedisPooled(hnp, clientConfig.get(), new ClientSideCache())) { + try (JedisPooled jedis = new JedisPooled(hnp, clientConfig.get(), new MapCSC())) { control.set("foo", "bar"); assertEquals("bar", jedis.get("foo")); control.flushAll(); @@ -81,8 +81,8 @@ public void flushAll() { @Test public void flushAllWithSimpleMap() { - HashMap map = new HashMap<>(); - try (JedisPooled jedis = new JedisPooled(hnp, clientConfig.get(), new ClientSideCache(map), singleConnectionPoolConfig.get())) { + HashMap map = new HashMap<>(); + try (JedisPooled jedis = new JedisPooled(hnp, clientConfig.get(), new MapCSC(map), singleConnectionPoolConfig.get())) { control.set("foo", "bar"); assertThat(map, Matchers.aMapWithSize(0)); assertEquals("bar", jedis.get("foo")); diff --git a/src/test/java/redis/clients/jedis/JedisSentineledClientSideCacheTest.java b/src/test/java/redis/clients/jedis/JedisSentineledClientSideCacheTest.java index 9af243ffc7c..9e5f720933f 100644 --- a/src/test/java/redis/clients/jedis/JedisSentineledClientSideCacheTest.java +++ b/src/test/java/redis/clients/jedis/JedisSentineledClientSideCacheTest.java @@ -4,16 +4,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.Set; -import java.util.function.Supplier; import java.util.stream.Collectors; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.hamcrest.Matchers; import org.junit.Test; +import redis.clients.jedis.util.MapCSC; public class JedisSentineledClientSideCacheTest { @@ -28,16 +26,9 @@ public class JedisSentineledClientSideCacheTest { private static final JedisClientConfig sentinelClientConfig = DefaultJedisClientConfig.builder().resp3().build(); - private static final Supplier> singleConnectionPoolConfig - = () -> { - ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); - poolConfig.setMaxTotal(1); - return poolConfig; - }; - @Test public void simple() { - try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new ClientSideCache(), sentinels, sentinelClientConfig)) { + try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new MapCSC(), sentinels, sentinelClientConfig)) { jedis.set("foo", "bar"); assertEquals("bar", jedis.get("foo")); jedis.del("foo"); @@ -47,8 +38,8 @@ public void simple() { @Test public void simpleWithSimpleMap() { - HashMap map = new HashMap<>(); - try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new ClientSideCache(map), sentinels, sentinelClientConfig)) { + HashMap map = new HashMap<>(); + try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new MapCSC(map), sentinels, sentinelClientConfig)) { jedis.set("foo", "bar"); assertThat(map, Matchers.aMapWithSize(0)); assertEquals("bar", jedis.get("foo")); @@ -66,7 +57,7 @@ public void simpleWithSimpleMap() { @Test public void flushAll() { - try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new ClientSideCache(), sentinels, sentinelClientConfig)) { + try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new MapCSC(), sentinels, sentinelClientConfig)) { jedis.set("foo", "bar"); assertEquals("bar", jedis.get("foo")); jedis.flushAll(); @@ -76,8 +67,8 @@ public void flushAll() { @Test public void flushAllWithSimpleMap() { - HashMap map = new HashMap<>(); - try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new ClientSideCache(map), sentinels, sentinelClientConfig)) { + HashMap map = new HashMap<>(); + try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new MapCSC(map), sentinels, sentinelClientConfig)) { jedis.set("foo", "bar"); assertThat(map, Matchers.aMapWithSize(0)); assertEquals("bar", jedis.get("foo")); diff --git a/src/test/java/redis/clients/jedis/util/MapCSC.java b/src/test/java/redis/clients/jedis/util/MapCSC.java new file mode 100644 index 00000000000..eb229036eae --- /dev/null +++ b/src/test/java/redis/clients/jedis/util/MapCSC.java @@ -0,0 +1,50 @@ +package redis.clients.jedis.util; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import redis.clients.jedis.ClientSideCache; +import redis.clients.jedis.CommandObject; +import redis.clients.jedis.args.Rawable; + +public class MapCSC extends ClientSideCache { + + private final Map cache; + + public MapCSC() { + this(new ConcurrentHashMap<>()); + } + + public MapCSC(Map map) { + this.cache = map; + } + + @Override + protected final void invalidateAllCommandHashes() { + cache.clear(); + } + + @Override + protected void invalidateCommandHashes(Iterable hashes) { + hashes.forEach(hash -> cache.remove(hash)); + } + + @Override + protected void put(long hash, Object value) { + cache.put(hash, value); + } + + @Override + protected Object get(long hash) { + return cache.get(hash); + } + + @Override + protected final long getCommandHash(CommandObject command) { + long result = 1; + for (Rawable raw : command.getArguments()) { + result = 31 * result + Arrays.hashCode(raw.getRaw()); + } + return 31 * result + command.getBuilder().hashCode(); + } +}