Skip to content

Commit

Permalink
More Cluster commands (#2605)
Browse files Browse the repository at this point in the history
* prepare

* CLUSTER RESET (ii)

* CLUSTER FAILOVER with option

* CLUSTER MYID

* CLUSTER REPLICAS

* READWRITE
  • Loading branch information
sazzad16 committed Jul 28, 2021
1 parent 6f701ae commit ffd6d73
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 10 deletions.
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,10 @@ public void cluster(final byte[]... args) {
sendCommand(CLUSTER, args);
}

public void cluster(Protocol.ClusterKeyword keyword, final byte[]... args) {
sendCommand(CLUSTER, joinParameters(keyword.getRaw(), args));
}

public void asking() {
sendCommand(ASKING);
}
Expand All @@ -1503,6 +1507,10 @@ public void readonly() {
sendCommand(READONLY);
}

public void readwrite() {
sendCommand(READWRITE);
}

public void geoadd(final byte[] key, final double longitude, final double latitude,
final byte[] member) {
sendCommand(GEOADD, key, toByteArray(longitude), toByteArray(latitude), member);
Expand Down
33 changes: 32 additions & 1 deletion src/main/java/redis/clients/jedis/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;

import redis.clients.jedis.Protocol.ClusterKeyword;
import redis.clients.jedis.Protocol.SentinelKeyword;
import redis.clients.jedis.args.ClusterFailoverOption;
import redis.clients.jedis.args.ClusterResetType;
import redis.clients.jedis.args.ListDirection;
import redis.clients.jedis.commands.Commands;
import redis.clients.jedis.params.*;
Expand Down Expand Up @@ -1183,17 +1186,33 @@ public void cluster(final String subcommand) {
}

public void clusterNodes() {
cluster(Protocol.CLUSTER_NODES);
cluster(ClusterKeyword.NODES);
}

public void clusterReplicas(final String nodeId) {
cluster(ClusterKeyword.REPLICAS, SafeEncoder.encode(nodeId));
}

public void clusterMeet(final String ip, final int port) {
cluster(Protocol.CLUSTER_MEET, ip, String.valueOf(port));
}

/**
* @deprecated Use {@link Client#clusterReset(redis.clients.jedis.args.ClusterResetType)}.
*/
@Deprecated
public void clusterReset(final ClusterReset resetType) {
cluster(Protocol.CLUSTER_RESET, resetType.name());
}

public void clusterReset(final ClusterResetType resetType) {
if (resetType == null) {
cluster(ClusterKeyword.RESET);
} else {
cluster(ClusterKeyword.RESET, resetType.getRaw());
}
}

public void clusterAddSlots(final int... slots) {
cluster(Protocol.CLUSTER_ADDSLOTS, slots);
}
Expand Down Expand Up @@ -1277,10 +1296,22 @@ public void clusterFailover() {
cluster(Protocol.CLUSTER_FAILOVER);
}

public void clusterFailover(ClusterFailoverOption failoverOption) {
if (failoverOption == null) {
cluster(ClusterKeyword.FAILOVER);
} else {
cluster(ClusterKeyword.FAILOVER, failoverOption.getRaw());
}
}

public void clusterSlots() {
cluster(Protocol.CLUSTER_SLOTS);
}

public void clusterMyId() {
cluster(ClusterKeyword.MYID);
}

public void geoadd(final String key, final double longitude, final double latitude,
final String member) {
geoadd(SafeEncoder.encode(key), longitude, latitude, SafeEncoder.encode(member));
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterReset.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package redis.clients.jedis;

import redis.clients.jedis.args.ClusterResetType;

/**
* @deprecated Use {@link ClusterResetType}.
*/
@Deprecated
public enum ClusterReset {
SOFT, HARD
}
38 changes: 33 additions & 5 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3816,6 +3816,20 @@ public ScanResult<Tuple> zscan(final String key, final String cursor, final Scan
return new ScanResult<>(newcursor, results);
}

@Override
public String readonly() {
checkIsInMultiOrPipeline();
client.readonly();
return client.getStatusCodeReply();
}

@Override
public String readwrite() {
checkIsInMultiOrPipeline();
client.readwrite();
return client.getStatusCodeReply();
}

@Override
public String clusterNodes() {
checkIsInMultiOrPipeline();
Expand All @@ -3824,10 +3838,10 @@ public String clusterNodes() {
}

@Override
public String readonly() {
public String clusterReplicas(final String nodeId) {
checkIsInMultiOrPipeline();
client.readonly();
return client.getStatusCodeReply();
client.clusterReplicas(nodeId);
return client.getBulkReply();
}

@Override
Expand All @@ -3844,6 +3858,13 @@ public String clusterReset(final ClusterReset resetType) {
return client.getStatusCodeReply();
}

@Override
public String clusterReset(final ClusterResetType resetType) {
checkIsInMultiOrPipeline();
client.clusterReset(resetType);
return client.getStatusCodeReply();
}

@Override
public String clusterAddSlots(final int... slots) {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -3950,9 +3971,9 @@ public List<String> clusterSlaves(final String nodeId) {
}

@Override
public String clusterFailover() {
public String clusterFailover(ClusterFailoverOption failoverOption) {
checkIsInMultiOrPipeline();
client.clusterFailover();
client.clusterFailover(failoverOption);
return client.getStatusCodeReply();
}

Expand All @@ -3963,6 +3984,13 @@ public List<Object> clusterSlots() {
return client.getObjectMultiBulkReply();
}

@Override
public String clusterMyId() {
checkIsInMultiOrPipeline();
client.clusterMyId();
return client.getBulkReply();
}

public String asking() {
checkIsInMultiOrPipeline();
client.asking();
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public final class Protocol {
public static final String CLUSTER_SLAVES = "slaves";
public static final String CLUSTER_FAILOVER = "failover";
public static final String CLUSTER_SLOTS = "slots";

public static final String PUBSUB_CHANNELS = "channels";
public static final String PUBSUB_NUMSUB = "numsub";
public static final String PUBSUB_NUM_PAT = "numpat";
Expand Down Expand Up @@ -264,7 +265,7 @@ public static enum Command implements ProtocolCommand {
BITPOS, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL,
DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE,
HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING, PFADD, PFCOUNT, PFMERGE,
READONLY, GEOADD, GEODIST, GEOHASH, GEOPOS, GEORADIUS, GEORADIUS_RO, GEORADIUSBYMEMBER,
READONLY, READWRITE, GEOADD, GEODIST, GEOHASH, GEOPOS, GEORADIUS, GEORADIUS_RO, GEORADIUSBYMEMBER,
GEORADIUSBYMEMBER_RO, MODULE, BITFIELD, HSTRLEN, TOUCH, SWAPDB, MEMORY, XADD, XLEN, XDEL,
XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM, XAUTOCLAIM, ACL, XINFO,
BITFIELD_RO, LPOS, SMISMEMBER, ZMSCORE, BZPOPMIN, BZPOPMAX, BLMOVE, LMOVE, COPY;
Expand Down Expand Up @@ -321,4 +322,19 @@ public byte[] getRaw() {
return raw;
}
}

public static enum ClusterKeyword implements Rawable {
MEET, RESET, INFO, FAILOVER, SLOTS, FORCE, TAKEOVER, NODES, REPLICAS, MYID;

private final byte[] raw;

private ClusterKeyword() {
raw = SafeEncoder.encode(name());
}

@Override
public byte[] getRaw() {
return raw;
}
}
}
19 changes: 19 additions & 0 deletions src/main/java/redis/clients/jedis/args/ClusterFailoverOption.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package redis.clients.jedis.args;

import redis.clients.jedis.util.SafeEncoder;

public enum ClusterFailoverOption implements Rawable {

FORCE, TAKEOVER;

private final byte[] raw;

private ClusterFailoverOption() {
this.raw = SafeEncoder.encode(name());
}

@Override
public byte[] getRaw() {
return raw;
}
}
19 changes: 19 additions & 0 deletions src/main/java/redis/clients/jedis/args/ClusterResetType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package redis.clients.jedis.args;

import redis.clients.jedis.util.SafeEncoder;

public enum ClusterResetType implements Rawable {

SOFT, HARD;

private final byte[] raw;

private ClusterResetType() {
this.raw = SafeEncoder.encode(name());
}

@Override
public byte[] getRaw() {
return raw;
}
}
33 changes: 31 additions & 2 deletions src/main/java/redis/clients/jedis/commands/ClusterCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@
import java.util.List;

import redis.clients.jedis.ClusterReset;
import redis.clients.jedis.args.ClusterResetType;
import redis.clients.jedis.args.ClusterFailoverOption;

public interface ClusterCommands {

String readonly();

String readwrite();

String clusterNodes();

String clusterReplicas(String nodeId);

String clusterMeet(String ip, int port);

String clusterAddSlots(int... slots);
Expand Down Expand Up @@ -37,13 +46,33 @@ public interface ClusterCommands {

String clusterReplicate(String nodeId);

/**
* {@code CLUSTER SLAVES} command is deprecated since Redis 5.
* @deprecated Use {@link ClusterCommands#clusterReplicas(java.lang.String)}.
*/
@Deprecated
List<String> clusterSlaves(String nodeId);

String clusterFailover();
default String clusterFailover() {
return clusterFailover(null);
}

String clusterFailover(ClusterFailoverOption failoverOption);

List<Object> clusterSlots();

/**
* @deprecated Use {@link ClusterCommands#clusterReset(redis.clients.jedis.args.ClusterResetType)}.
*/
@Deprecated
String clusterReset(ClusterReset resetType);

String readonly();
/**
* {@code resetType} can be null for default behavior.
* @param resetType
* @return OK
*/
String clusterReset(ClusterResetType resetType);

String clusterMyId();
}
10 changes: 9 additions & 1 deletion src/test/java/redis/clients/jedis/tests/JedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void testCalculateConnectionPerSlot() {
}

@Test
public void testReadonly() throws Exception {
public void testReadonlyAndReadwrite() throws Exception {
node1.clusterMeet(LOCAL_IP, nodeInfoSlave2.getPort());
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, nodeSlave2);

Expand All @@ -261,6 +261,7 @@ public void testReadonly() throws Exception {
break;
}
}

try {
nodeSlave2.get("test");
fail();
Expand All @@ -269,6 +270,13 @@ public void testReadonly() throws Exception {
nodeSlave2.readonly();
nodeSlave2.get("test");

nodeSlave2.readwrite();
try {
nodeSlave2.get("test");
fail();
} catch (JedisMovedDataException e) {
}

nodeSlave2.clusterReset(ClusterReset.SOFT);
nodeSlave2.flushDB();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ClusterReset;
import redis.clients.jedis.args.ClusterResetType;
import redis.clients.jedis.tests.HostAndPortUtil;
import redis.clients.jedis.tests.utils.JedisClusterTestUtil;

Expand Down Expand Up @@ -57,6 +58,14 @@ public void testClusterSoftReset() {
assertEquals(1, node1.clusterNodes().split("\n").length);
}

@Test
public void testClusterSoftReset2() {
node1.clusterMeet("127.0.0.1", nodeInfo2.getPort());
assertTrue(node1.clusterNodes().split("\n").length > 1);
node1.clusterReset(ClusterResetType.SOFT);
assertEquals(1, node1.clusterNodes().split("\n").length);
}

@Test
public void testClusterHardReset() {
String nodeId = JedisClusterTestUtil.getNodeId(node1.clusterNodes());
Expand All @@ -65,6 +74,14 @@ public void testClusterHardReset() {
assertNotEquals(nodeId, newNodeId);
}

@Test
public void testClusterHardReset2() {
String nodeId = JedisClusterTestUtil.getNodeId(node1.clusterNodes());
node1.clusterReset(ClusterResetType.HARD);
String newNodeId = JedisClusterTestUtil.getNodeId(node1.clusterNodes());
assertNotEquals(nodeId, newNodeId);
}

@Test
public void clusterSetSlotImporting() {
node2.clusterAddSlots(6000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ public void readonly() {
}
}

@Test
public void readwrite() {
try {
jedis.readwrite();
} catch (JedisDataException e) {
assertTrue("ERR This instance has cluster support disabled".equalsIgnoreCase(e.getMessage()));
}
}

@Test
public void monitor() {
new Thread(new Runnable() {
Expand Down

0 comments on commit ffd6d73

Please sign in to comment.