Skip to content

Commit

Permalink
Merge 66f6336 into 56ef26a
Browse files Browse the repository at this point in the history
  • Loading branch information
killergerbah authored Mar 18, 2024
2 parents 56ef26a + 66f6336 commit 28eb185
Show file tree
Hide file tree
Showing 17 changed files with 394 additions and 17 deletions.
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -4258,6 +4258,16 @@ public final CommandObject<Object> tFunctionCallAsync(String library, String fun
}
// RedisGears commands

// Transaction commands
public final CommandObject<String> watch(String... keys) {
return new CommandObject<>(commandArguments(WATCH).keys((Object[]) keys), BuilderFactory.STRING);
}

public final CommandObject<String> watch(byte[]... keys) {
return new CommandObject<>(commandArguments(WATCH).keys((Object[]) keys), BuilderFactory.STRING);
}
// Transaction commands

/**
* Get the instance for JsonObjectMapper if not null, otherwise a new instance reference with
* default implementation will be created and returned.
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

Expand Down Expand Up @@ -222,6 +223,22 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}

/**
* Constructor that allows CommandObjects to be customized. The RedisProtocol specified will be written into the given
* CommandObjects.
*
* @param provider The ClusterConnectionProvider.
* @param maxAttempts Max number of attempts execute a command.
* @param maxTotalRetriesDuration Max amount of time to execute a command.
* @param commandObjects The CommandObjects.
* @param protocol The RedisProtocol that will be written into the given CommandObjects.
*/
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
ClusterCommandObjects commandObjects, RedisProtocol protocol) {
super(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider, commandObjects,
protocol);
}

public Map<String, ConnectionPool> getClusterNodes() {
return ((ClusterConnectionProvider) provider).getNodes();
}
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.executors.DefaultCommandExecutor;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.JedisURIHelper;
import redis.clients.jedis.util.Pool;
Expand Down Expand Up @@ -394,6 +396,19 @@ public JedisPooled(PooledConnectionProvider provider) {
super(provider);
}

/**
* Constructor that allows CommandObjects to be customized. The RedisProtocol specified will be written into the given
* CommandObjects.
*
* @param provider The PooledConnectionProvider.
* @param commandObjects The CommandObjects.
* @param redisProtocol The RedisProtocol that will be written into the given CommandObjects.
*/
public JedisPooled(PooledConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol redisProtocol) {
super(new DefaultCommandExecutor(provider), provider, commandObjects, redisProtocol);
}

public final Pool<Connection> getPool() {
return ((PooledConnectionProvider) provider).getPool();
}
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.executors.DefaultCommandExecutor;
import redis.clients.jedis.providers.SentineledConnectionProvider;

public class JedisSentineled extends UnifiedJedis {
Expand All @@ -23,6 +25,18 @@ public JedisSentineled(SentineledConnectionProvider sentineledConnectionProvider
super(sentineledConnectionProvider);
}

/**
* Constructor that allows CommandObjects to be customized. The RedisProtocol specified will be written into the given
* CommandObjects.
*
* @param provider The SentineledConnectionProvider.
* @param commandObjects The CommandObjects.
* @param redisProtocol The RedisProtocol that will be written into the given CommandObjects.
*/
public JedisSentineled(SentineledConnectionProvider provider, CommandObjects commandObjects, RedisProtocol redisProtocol) {
super(new DefaultCommandExecutor(provider), provider, commandObjects, redisProtocol);
}

public HostAndPort getCurrentMaster() {
return ((SentineledConnectionProvider) provider).getCurrentMaster();
}
Expand Down
19 changes: 15 additions & 4 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,23 @@ public Pipeline(Connection connection) {
}

public Pipeline(Connection connection, boolean closeConnection) {
super(new CommandObjects());
this(connection, commandObjects(connection), closeConnection);
}

private static CommandObjects commandObjects(Connection connection) {
RedisProtocol proto = connection.getRedisProtocol();
CommandObjects commandObjects = new CommandObjects();
if (proto != null) commandObjects.setProtocol(proto);
return commandObjects;
}

public Pipeline(Connection connection, CommandObjects commandObjects, boolean closeConnection) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
RedisProtocol proto = this.connection.getRedisProtocol();
if (proto != null) this.commandObjects.setProtocol(proto);
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
}

@Override
Expand Down
29 changes: 23 additions & 6 deletions src/main/java/redis/clients/jedis/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ public Transaction(Connection connection) {
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
public Transaction(Connection connection, boolean doMulti) {
this(connection, doMulti, false);
this(connection, new CommandObjects(), doMulti, false);
}


/**
* Creates a new transaction.
*
Expand All @@ -73,9 +74,27 @@ public Transaction(Connection connection, boolean doMulti) {
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public Transaction(Connection connection, boolean doMulti, boolean closeConnection) {
this(connection, new CommandObjects(), doMulti, closeConnection);
}

/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param commandObjects commandObjects
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public Transaction(Connection connection, CommandObjects commandObjects, boolean doMulti, boolean closeConnection) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
if (doMulti) multi();
}

Expand All @@ -88,16 +107,14 @@ public final void multi() {

@Override
public String watch(final String... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}

@Override
public String watch(final byte[]... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ public UnifiedJedis(CommandExecutor executor) {
this(executor, (ConnectionProvider) null);
}

private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider) {
public UnifiedJedis(CommandExecutor executor, ConnectionProvider provider) {
this(executor, provider, new CommandObjects());
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects) {
public UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects) {
this(executor, provider, commandObjects, null);
if (this.provider != null) {
try (Connection conn = this.provider.getConnection()) {
Expand All @@ -223,7 +223,7 @@ private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, Comm
}
}

private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
public UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol protocol) {
this.provider = provider;
this.executor = executor;
Expand Down Expand Up @@ -4859,7 +4859,7 @@ public PipelineBase pipelined() {
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider, commandObjects);
} else {
return new Pipeline(provider.getConnection(), true);
return new Pipeline(provider.getConnection(), commandObjects, true);
}
}

Expand All @@ -4869,7 +4869,7 @@ public AbstractTransaction multi() {
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider, true, commandObjects);
} else {
return new Transaction(provider.getConnection(), true, true);
return new Transaction(provider.getConnection(), commandObjects, true, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public final void multi() {
*/
@Override
public final String watch(String... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), NO_OP_BUILDER));
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
Expand All @@ -102,7 +102,7 @@ public final String watch(String... keys) {
*/
@Override
public final String watch(byte[]... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), NO_OP_BUILDER));
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.ClusterCommandArguments;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.commands.ProtocolCommand;

public class ClusterCommandArgumentsWithPrefixedKeys extends ClusterCommandArguments {
private final byte[] prefixBytes;
private final String prefixString;

public ClusterCommandArgumentsWithPrefixedKeys(ProtocolCommand command, String prefixString, byte[] prefixBytes) {
super(command);
this.prefixString = prefixString;
this.prefixBytes = prefixBytes;
}

public CommandArguments key(Object key) {
return super.key(Prefixer.prefixKey(key, prefixString, prefixBytes));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.ClusterCommandArguments;
import redis.clients.jedis.ClusterCommandObjects;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.util.SafeEncoder;

public class ClusterCommandObjectsWithPrefixedKeys extends ClusterCommandObjects {
private final String prefixString;
private final byte[] prefixBytes;

public ClusterCommandObjectsWithPrefixedKeys(String prefixString) {
this.prefixString = prefixString;
prefixBytes = SafeEncoder.encode(prefixString);
}

@Override
protected ClusterCommandArguments commandArguments(ProtocolCommand command) {
return new ClusterCommandArgumentsWithPrefixedKeys(command, prefixString, prefixBytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.commands.ProtocolCommand;

public class CommandArgumentsWithPrefixedKeys extends CommandArguments {
private final byte[] prefixBytes;
private final String prefixString;

public CommandArgumentsWithPrefixedKeys(ProtocolCommand command, String prefixString, byte[] prefixBytes) {
super(command);
this.prefixString = prefixString;
this.prefixBytes = prefixBytes;
}

public CommandArguments key(Object key) {
return super.key(Prefixer.prefixKey(key, prefixString, prefixBytes));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.CommandObjects;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.util.SafeEncoder;

public class CommandObjectsWithPrefixedKeys extends CommandObjects {
private final String prefixString;
private final byte[] prefixBytes;

public CommandObjectsWithPrefixedKeys(String prefixString) {
this.prefixString = prefixString;
prefixBytes = SafeEncoder.encode(prefixString);
}

@Override
protected CommandArguments commandArguments(ProtocolCommand command) {
return new CommandArgumentsWithPrefixedKeys(command, prefixString, prefixBytes);
}
}
34 changes: 34 additions & 0 deletions src/main/java/redis/clients/jedis/util/prefix/Prefixer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.args.RawableFactory;

final class Prefixer {
private Prefixer() {
}

static Object prefixKey(Object key, String prefixString, byte[] prefixBytes) {
if (key instanceof Rawable) {
byte[] raw = ((Rawable) key).getRaw();
return RawableFactory.from(prefixKeyWithBytes(raw, prefixBytes));
}

if (key instanceof byte[]) {
return prefixKeyWithBytes((byte[]) key, prefixBytes);
}

if (key instanceof String) {
String raw = (String) key;
return prefixString + raw;
}

throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
}

private static byte[] prefixKeyWithBytes(byte[] key, byte[] prefixBytes) {
byte[] namespaced = new byte[prefixBytes.length + key.length];
System.arraycopy(prefixBytes, 0, namespaced, 0, prefixBytes.length);
System.arraycopy(key, 0, namespaced, prefixBytes.length, key.length);
return namespaced;
}
}
Loading

0 comments on commit 28eb185

Please sign in to comment.