Skip to content

Commit

Permalink
Merge 0ba4d7d into 3ab6bdc
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 authored Jan 30, 2024
2 parents 3ab6bdc + 0ba4d7d commit a760c5b
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 80 deletions.
16 changes: 16 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@
<version>2.10.1</version>
</dependency>

<!-- Optional dependencies -->
<!-- Client-side caching -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
<optional>true</optional>
</dependency>

<!-- UNIX socket connection support -->
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
Expand All @@ -90,6 +105,7 @@
<version>1.19.0</version>
<scope>test</scope>
</dependency>

<!-- test -->
<dependency>
<groupId>junit</groupId>
Expand Down
114 changes: 76 additions & 38 deletions src/main/java/redis/clients/jedis/ClientSideCache.java
Original file line number Diff line number Diff line change
@@ -1,71 +1,109 @@
package redis.clients.jedis;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import redis.clients.jedis.exceptions.JedisException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import redis.clients.jedis.util.SafeEncoder;

public class ClientSideCache {
public abstract class ClientSideCache {

private final Map<ByteBuffer, Object> cache;
private final Map<ByteBuffer, Set<Long>> keyHashes; // TODO: clean-up
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock readLock = rwl.readLock();
private final Lock writeLock = rwl.writeLock();

public ClientSideCache() {
this.cache = new HashMap<>();
protected ClientSideCache() {
this.keyHashes = new ConcurrentHashMap<>();
}

/**
* For testing purpose only.
* @param map
*/
ClientSideCache(Map<ByteBuffer, Object> map) {
this.cache = map;
protected ClientSideCache(Map<ByteBuffer, Set<Long>> keyHashes) {
this.keyHashes = keyHashes;
}

public final void clear() {
cache.clear();
}
public abstract void invalidateAll();

protected abstract void invalidateAll(Iterable<Long> hashes);

protected abstract void put(long hash, Object value);

public final void invalidateKeys(List list) {
protected abstract Object get(long hash);

final void invalidate(List list) {
if (list == null) {
clear();
invalidateAll();
return;
}

list.forEach(this::invalidateKey);
Set<Long> hashes = new HashSet<>();
list.forEach(key -> hashes.addAll(getHashes(key)));
invalidateAll(hashes);
// TODO: clean-up keyHashes
}

private void invalidateKey(Object key) {
if (key instanceof byte[]) {
cache.remove(convertKey((byte[]) key));
} else {
throw new JedisException("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
private Set<Long> getHashes(Object key) {
if (!(key instanceof byte[])) {
throw new AssertionError("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
}
}

protected void setKey(Object key, Object value) {
cache.put(getMapKey(key), value);
final ByteBuffer mapKey = makeKey((byte[]) key);
readLock.lock();
try {
Set<Long> hashes = keyHashes.get(mapKey);
return hashes != null ? hashes : Collections.emptySet();
} finally {
readLock.unlock();
}
}

protected <T> T getValue(Object key) {
return (T) getMapValue(key);
final <T> T getValue(Function<CommandObject<T>, T> loader, CommandObject<T> command, String... keys) {

final long hash = getHash(command);

T value = (T) get(hash);
if (value != null) {
return value;
}

value = loader.apply(command);
if (value != null) {
writeLock.lock();
try {
put(hash, value);
for (String key : keys) {
ByteBuffer mapKey = makeKey(key);
if (keyHashes.containsKey(mapKey)) {
keyHashes.get(mapKey).add(hash);
} else {
Set<Long> set = new HashSet<>();
set.add(hash);
keyHashes.put(mapKey, set);
}
}
} finally {
writeLock.unlock();
}
}

return value;
}

private Object getMapValue(Object key) {
return cache.get(getMapKey(key));
private long getHash(CommandObject command) {
// TODO:
return 0;
}

private ByteBuffer getMapKey(Object key) {
if (key instanceof byte[]) {
return convertKey((byte[]) key);
} else {
return convertKey(SafeEncoder.encode(String.valueOf(key)));
}
private ByteBuffer makeKey(String key) {
return makeKey(SafeEncoder.encode(key));
}

private static ByteBuffer convertKey(byte[] b) {
private static ByteBuffer makeKey(byte[] b) {
return ByteBuffer.wrap(b);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void discoverClusterSlots(Connection jedis) {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.invalidateAll();
}
Set<String> hostAndPortKeys = new HashSet<>();

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private static void processPush(final RedisInputStream is, ClientSideCache cache
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
cache.invalidateKeys((List) list.get(1));
cache.invalidate((List) list.get(1));
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig co
this.commandObjects.setBroadcastAndRoundRobinConfig(this.broadcastAndRoundRobinConfig);
}

private <T> T executeClientSideCacheCommand(CommandObject<T> command, String... keys) {
if (clientSideCache == null) {
return executeCommand(command);
}

return clientSideCache.getValue((cmd) -> executeCommand(cmd), command, keys);
}

public String ping() {
return checkAndBroadcastCommand(commandObjects.ping());
}
Expand Down Expand Up @@ -749,15 +757,7 @@ public String set(String key, String value, SetParams params) {

@Override
public String get(String key) {
if (clientSideCache != null) {
String cachedValue = clientSideCache.getValue(key);
if (cachedValue != null) return cachedValue;

String value = executeCommand(commandObjects.get(key));
if (value != null) clientSideCache.setKey(key, value);
return value;
}
return executeCommand(commandObjects.get(key));
return executeClientSideCacheCommand(commandObjects.get(key), key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void initMaster(HostAndPort master) {
pool = newPool;
LOG.info("Created connection pool to master at {}.", master);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.invalidateAll();
}

if (existingPool != null) {
Expand Down
50 changes: 50 additions & 0 deletions src/main/java/redis/clients/jedis/util/CaffeineCSC.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package redis.clients.jedis.util;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.ClientSideCache;

public class CaffeineCSC extends ClientSideCache {

private static final int DEFAULT_MAXIMUM_SIZE = 10_000;

private final Cache<Long, Object> cache;

public CaffeineCSC() {
this(DEFAULT_MAXIMUM_SIZE);
}

public CaffeineCSC(int maximumSize) {
this(Caffeine.newBuilder().maximumSize(maximumSize).build());
}

public CaffeineCSC(int maximumSize, int ttlSeconds) {
this(Caffeine.newBuilder().maximumSize(maximumSize)
.expireAfterWrite(ttlSeconds, TimeUnit.SECONDS).build());
}

public CaffeineCSC(Cache<Long, Object> caffeineCache) {
this.cache = caffeineCache;
}

@Override
public final void invalidateAll() {
cache.invalidateAll();
}

@Override
protected void invalidateAll(Iterable<Long> hashes) {
cache.invalidateAll(hashes);
}

@Override
protected void put(long hash, Object value) {
cache.put(hash, value);
}

@Override
protected Object get(long hash) {
return cache.getIfPresent(hash);
}
}
50 changes: 50 additions & 0 deletions src/main/java/redis/clients/jedis/util/GuavaCSC.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package redis.clients.jedis.util;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.ClientSideCache;

public class GuavaCSC extends ClientSideCache {

private static final int DEFAULT_MAXIMUM_SIZE = 10_000;

private final Cache<Long, Object> cache;

public GuavaCSC() {
this(DEFAULT_MAXIMUM_SIZE);
}

public GuavaCSC(int maximumSize) {
this(CacheBuilder.newBuilder().maximumSize(maximumSize).build());
}

public GuavaCSC(int maximumSize, int ttlSeconds) {
this(CacheBuilder.newBuilder().maximumSize(maximumSize)
.expireAfterWrite(ttlSeconds, TimeUnit.SECONDS).build());
}

public GuavaCSC(Cache<Long, Object> guavaCache) {
this.cache = guavaCache;
}

@Override
public final void invalidateAll() {
cache.invalidateAll();
}

@Override
protected void invalidateAll(Iterable<Long> hashes) {
cache.invalidateAll(hashes);
}

@Override
protected void put(long hash, Object value) {
cache.put(hash, value);
}

@Override
protected Object get(long hash) {
return cache.getIfPresent(hash);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Set;
Expand All @@ -14,6 +13,7 @@
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.hamcrest.Matchers;
import org.junit.Test;
import redis.clients.jedis.util.MapCSC;

public class JedisClusterClientSideCacheTest extends JedisClusterTestBase {

Expand All @@ -31,7 +31,7 @@ public class JedisClusterClientSideCacheTest extends JedisClusterTestBase {

@Test
public void simple() {
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache())) {
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC())) {
jedis.set("foo", "bar");
assertEquals("bar", jedis.get("foo"));
jedis.del("foo");
Expand All @@ -41,8 +41,8 @@ public void simple() {

@Test
public void simpleWithSimpleMap() {
HashMap<ByteBuffer, Object> map = new HashMap<>();
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache(map), singleConnectionPoolConfig.get())) {
HashMap<Long, Object> map = new HashMap<>();
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC(map), singleConnectionPoolConfig.get())) {
jedis.set("foo", "bar");
assertThat(map, Matchers.aMapWithSize(0));
assertEquals("bar", jedis.get("foo"));
Expand All @@ -60,7 +60,7 @@ public void simpleWithSimpleMap() {

@Test
public void flushAll() {
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache())) {
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC())) {
jedis.set("foo", "bar");
assertEquals("bar", jedis.get("foo"));
jedis.flushAll();
Expand All @@ -70,8 +70,8 @@ public void flushAll() {

@Test
public void flushAllWithSimpleMap() {
HashMap<ByteBuffer, Object> map = new HashMap<>();
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new ClientSideCache(map), singleConnectionPoolConfig.get())) {
HashMap<Long, Object> map = new HashMap<>();
try (JedisCluster jedis = new JedisCluster(hnp, clientConfig.get(), new MapCSC(map), singleConnectionPoolConfig.get())) {
jedis.set("foo", "bar");
assertThat(map, Matchers.aMapWithSize(0));
assertEquals("bar", jedis.get("foo"));
Expand Down
Loading

0 comments on commit a760c5b

Please sign in to comment.