Skip to content

Commit

Permalink
Iteration on key-prefixing POC
Browse files Browse the repository at this point in the history
- Demonstrated automatic key-prefixing for all subclasses of
  UnifiedJedis: JedisCluster, JedisPooled, and JedisSentineled
- Key-prefixing is possible as long as the underlying CommandObjects can
  be customized.
- CommandObjects cannot use commandArguments in its constructor since
  in the specific case of key-prefixing, commandArguments depends on the
  child constructor running first. So we lose caching of argument-less
  CommandObjects.
- Based on this POC, the minimum changes required to jedis would be:
  - public constructors that allow UnifiedJedis and its subclasses to
    take a custom CommandObjects.
  - Consistent use of supplied CommandObjects throughout code (e.g. in
    Pipeline, Transaction, etc).
  - Removal of caching of argument-less CommandObjects in the
    constructor of CommandObjects.
- Applications can then supply CommandObjects with custom behavior as
  necessary. Sample classes CommandObjectsWithPrefixedKeys that
  implement the behavior of prefixed keys, etc are provided but these
  can be supplied by the application as long as required constructors
  are available.
  • Loading branch information
R-J Lim committed Mar 14, 2024
1 parent a0efc76 commit d1e4110
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import redis.clients.jedis.commands.ProtocolCommand;

public class ClusterCommandObjectsWithPrefixedKeys extends ClusterCommandObjects {
// For the purposes of this demonstration, the prefix is assigned statically.
// Additional changes are required to prevent the parent class CommandObjects
// from calling commandArguments in its constructor, which would be a prerequisite
// to making this field into an instance field.
public static String PREFIX_STRING;
private final String prefixString;

public ClusterCommandObjectsWithPrefixedKeys(String prefixString) {
this.prefixString = prefixString;
}

@Override
protected ClusterCommandArguments commandArguments(ProtocolCommand command) {
return new ClusterCommandArgumentsWithPrefixedKeys(command, PREFIX_STRING);
return new ClusterCommandArgumentsWithPrefixedKeys(command, prefixString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package redis.clients.jedis;

import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.args.RawableFactory;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.util.SafeEncoder;

public class CommandArgumentsWithPrefixedKeys extends CommandArguments {
private final byte[] prefix;
private final String prefixString;

public CommandArgumentsWithPrefixedKeys(ProtocolCommand command, String prefixString) {
super(command);
this.prefixString = prefixString;
prefix = SafeEncoder.encode(prefixString);
}

public CommandArguments key(Object key) {
return super.key(namespacedKey(key));
}

private Object namespacedKey(Object key) {
if (key instanceof Rawable) {
byte[] raw = ((Rawable) key).getRaw();
return RawableFactory.from(namespacedKeyBytes(raw));
}

if (key instanceof byte[]) {
return namespacedKeyBytes((byte[]) key);
}

if (key instanceof String) {
String raw = (String) key;
return prefixString + raw;
}

throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
}

private byte[] namespacedKeyBytes(byte[] key) {
byte[] namespaced = new byte[prefix.length + key.length];
System.arraycopy(prefix, 0, namespaced, 0, prefix.length);
System.arraycopy(key, 0, namespaced, prefix.length, key.length);
return namespaced;
}
}
25 changes: 6 additions & 19 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,16 @@ protected CommandArguments commandArguments(ProtocolCommand command) {
return new CommandArguments(command);
}

private final CommandObject<String> PING_COMMAND_OBJECT = new CommandObject<>(commandArguments(PING), BuilderFactory.STRING);

public final CommandObject<String> ping() {
return PING_COMMAND_OBJECT;
return new CommandObject<>(commandArguments(PING), BuilderFactory.STRING);
}

private final CommandObject<String> FLUSHALL_COMMAND_OBJECT = new CommandObject<>(commandArguments(FLUSHALL), BuilderFactory.STRING);

public final CommandObject<String> flushAll() {
return FLUSHALL_COMMAND_OBJECT;
return new CommandObject<>(commandArguments(FLUSHALL), BuilderFactory.STRING);
}

private final CommandObject<String> FLUSHDB_COMMAND_OBJECT = new CommandObject<>(commandArguments(FLUSHDB), BuilderFactory.STRING);

public final CommandObject<String> flushDB() {
return FLUSHDB_COMMAND_OBJECT;
return new CommandObject<>(commandArguments(FLUSHDB), BuilderFactory.STRING);
}

public final CommandObject<String> configSet(String parameter, String value) {
Expand Down Expand Up @@ -2818,10 +2812,8 @@ public final CommandObject<String> scriptLoad(String script, String sampleKey) {
return new CommandObject<>(commandArguments(SCRIPT).add(LOAD).add(script).processKey(sampleKey), BuilderFactory.STRING);
}

private final CommandObject<String> SCRIPT_FLUSH_COMMAND_OBJECT = new CommandObject<>(commandArguments(SCRIPT).add(FLUSH), BuilderFactory.STRING);

public final CommandObject<String> scriptFlush() {
return SCRIPT_FLUSH_COMMAND_OBJECT;
return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH), BuilderFactory.STRING);
}

public final CommandObject<String> scriptFlush(String sampleKey) {
Expand All @@ -2832,10 +2824,8 @@ public final CommandObject<String> scriptFlush(String sampleKey, FlushMode flush
return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH).add(flushMode).processKey(sampleKey), BuilderFactory.STRING);
}

private final CommandObject<String> SCRIPT_KILL_COMMAND_OBJECT = new CommandObject<>(commandArguments(SCRIPT).add(KILL), BuilderFactory.STRING);

public final CommandObject<String> scriptKill() {
return SCRIPT_KILL_COMMAND_OBJECT;
return new CommandObject<>(commandArguments(SCRIPT).add(KILL), BuilderFactory.STRING);
}

public final CommandObject<String> scriptKill(String sampleKey) {
Expand Down Expand Up @@ -2863,11 +2853,8 @@ public final CommandObject<String> scriptKill(byte[] sampleKey) {
return new CommandObject<>(commandArguments(SCRIPT).add(KILL).processKey(sampleKey), BuilderFactory.STRING);
}

private final CommandObject<String> SLOWLOG_RESET_COMMAND_OBJECT
= new CommandObject<>(commandArguments(SLOWLOG).add(Keyword.RESET), BuilderFactory.STRING);

public final CommandObject<String> slowlogReset() {
return SLOWLOG_RESET_COMMAND_OBJECT;
return new CommandObject<>(commandArguments(SLOWLOG).add(Keyword.RESET), BuilderFactory.STRING);
}

public final CommandObject<Object> fcall(String name, List<String> keys, List<String> args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package redis.clients.jedis;

import redis.clients.jedis.commands.ProtocolCommand;

public class CommandObjectsWithPrefixedKeys extends CommandObjects {
private final String prefixString;

public CommandObjectsWithPrefixedKeys(String prefixString) {
this.prefixString = prefixString;
}

@Override
protected CommandArguments commandArguments(ProtocolCommand command) {
return new CommandArgumentsWithPrefixedKeys(command, prefixString);
}
}
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

Expand Down Expand Up @@ -222,6 +223,10 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}

public JedisCluster(ClusterCommandExecutor executor, ClusterConnectionProvider provider, ClusterCommandObjects commandObjects, RedisProtocol protocol) {
super(executor, provider, commandObjects, protocol);
}

public Map<String, ConnectionPool> getClusterNodes() {
return ((ClusterConnectionProvider) provider).getNodes();
}
Expand Down

This file was deleted.

6 changes: 6 additions & 0 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.JedisURIHelper;
import redis.clients.jedis.util.Pool;
Expand Down Expand Up @@ -394,6 +395,11 @@ public JedisPooled(PooledConnectionProvider provider) {
super(provider);
}

public JedisPooled(CommandExecutor executor, PooledConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol redisProtocol) {
super(executor, provider, commandObjects, redisProtocol);
}

public final Pool<Connection> getPool() {
return ((PooledConnectionProvider) provider).getPool();
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.providers.SentineledConnectionProvider;

public class JedisSentineled extends UnifiedJedis {
Expand All @@ -23,6 +24,10 @@ public JedisSentineled(SentineledConnectionProvider sentineledConnectionProvider
super(sentineledConnectionProvider);
}

public JedisSentineled(CommandExecutor executor, SentineledConnectionProvider sentineledConnectionProvider, CommandObjects commandObjects, RedisProtocol redisProtocol) {
super(executor, sentineledConnectionProvider, commandObjects, redisProtocol);
}

public HostAndPort getCurrentMaster() {
return ((SentineledConnectionProvider) provider).getCurrentMaster();
}
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ public Pipeline(Connection connection) {
}

public Pipeline(Connection connection, boolean closeConnection) {
super(new CommandObjects());
this(connection, new CommandObjects(), closeConnection);
}

public Pipeline(Connection connection, CommandObjects commandObjects, boolean closeConnection) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
RedisProtocol proto = this.connection.getRedisProtocol();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4859,7 +4859,7 @@ public PipelineBase pipelined() {
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider, commandObjects);
} else {
return new Pipeline(provider.getConnection(), true);
return new Pipeline(provider.getConnection(), commandObjects, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package redis.clients.jedis;

import org.junit.Before;
import redis.clients.jedis.args.ClusterResetType;
import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterTestUtil;

import java.time.Duration;
import java.util.Collections;

public class JedisClusterPrefixedKeysTest extends PrefixedKeysTest {
private static final DefaultJedisClientConfig DEFAULT_CLIENT_CONFIG = DefaultJedisClientConfig.builder().password("cluster").build();
private static final int DEFAULT_TIMEOUT = 2000;
private static final int DEFAULT_REDIRECTIONS = 5;
private static final ConnectionPoolConfig DEFAULT_POOL_CONFIG = new ConnectionPoolConfig();
private static final HostAndPort HOST_AND_PORT = new HostAndPort("127.0.0.1", 7379);

@Before
public void setUp() throws InterruptedException {
Jedis jedis = new Jedis(HOST_AND_PORT);
jedis.auth("cluster");
jedis.clusterReset(ClusterResetType.HARD);
jedis.flushAll();

int[] slots = new int[Protocol.CLUSTER_HASHSLOTS];

for (int i = 0; i < Protocol.CLUSTER_HASHSLOTS; ++i) {
slots[i] = i;
}

jedis.clusterAddSlots(slots);
JedisClusterTestUtil.waitForClusterReady(jedis);
}

@Before
public void tearDown() throws InterruptedException {
Jedis jedis = new Jedis(HOST_AND_PORT);
jedis.auth("cluster");
jedis.clusterReset(ClusterResetType.HARD);
jedis.flushAll();
}

@Override
public UnifiedJedis prefixingJedis() {
ClusterConnectionProvider connectionProvider = new ClusterConnectionProvider(Collections.singleton(HOST_AND_PORT), DEFAULT_CLIENT_CONFIG);
ClusterCommandExecutor executor = new ClusterCommandExecutor(connectionProvider, 5, Duration.ofSeconds(5 * DEFAULT_TIMEOUT));
ClusterCommandObjectsWithPrefixedKeys commandObjects = new ClusterCommandObjectsWithPrefixedKeys("test-prefix:");
return new JedisCluster(executor, connectionProvider, commandObjects, DEFAULT_CLIENT_CONFIG.getRedisProtocol());
}

@Override
public UnifiedJedis nonPrefixingJedis() {
return new JedisCluster(HOST_AND_PORT, DEFAULT_CLIENT_CONFIG, DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG);
}
}

This file was deleted.

Loading

0 comments on commit d1e4110

Please sign in to comment.