Skip to content

Commit

Permalink
Support [S]PUBLISH in pipelines and transactions (redis#3859)
Browse files Browse the repository at this point in the history
1. We already have PUBLISH in simple Pipeline.
2. We have SPUBLISH only in JedisCluster; so it's only added in ClusterPipeline.
3. We don't have transaction with cluster. So obviously [S]PUBLISH isn't implemented for such case.

---
* Support PUBLISH in Transaction(s)

* Support SPUBLISH in ClusterPipeline
  • Loading branch information
sazzad16 authored Jun 10, 2024
1 parent 4ca869e commit 811dca0
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 11 deletions.
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/AbstractTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,12 @@ protected AbstractTransaction(CommandObjects commandObjects) {
public Response<Long> waitReplicas(int replicas, long timeout) {
return appendCommand(commandObjects.waitReplicas(replicas, timeout));
}

public Response<Long> publish(String channel, String message) {
return appendCommand(commandObjects.publish(channel, message));
}

public Response<Long> publish(byte[] channel, byte[] message) {
return appendCommand(commandObjects.publish(channel, message));
}
}
18 changes: 13 additions & 5 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ private static ClusterCommandObjects createClusterCommandObjects(RedisProtocol p
return cco;
}

/**
* This method must be called after constructor, if graph commands are going to be used.
*/
public void prepareGraphCommands() {
super.prepareGraphCommands(provider);
}

@Override
public void close() {
try {
Expand All @@ -65,10 +72,11 @@ protected Connection getConnection(HostAndPort nodeKey) {
return provider.getConnection(nodeKey);
}

/**
* This method must be called after constructor, if graph commands are going to be used.
*/
public void prepareGraphCommands() {
super.prepareGraphCommands(provider);
public Response<Long> spublish(String channel, String message) {
return appendCommand(commandObjects.spublish(channel, message));
}

public Response<Long> spublish(byte[] channel, byte[] message) {
return appendCommand(commandObjects.spublish(channel, message));
}
}
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ protected Object readProtocolWithCheckingBroken() {
try {
return Protocol.read(inputStream);
// Object read = Protocol.read(inputStream);
// System.out.println(SafeEncoder.encodeObject(read));
// System.out.println(redis.clients.jedis.util.SafeEncoder.encodeObject(read));
// return read;
} catch (JedisConnectionException exc) {
broken = true;
Expand Down
12 changes: 12 additions & 0 deletions src/test/java/redis/clients/jedis/ClusterPipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,18 @@ public void testEvalshaKeyAndArgWithBinary() {
}
}

@Test
public void spublishInPipeline() {
try (JedisCluster jedis = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline pipelined = jedis.pipelined();
Response<Long> p1 = pipelined.publish("foo", "bar");
Response<Long> p2 = pipelined.publish("foo".getBytes(), "bar".getBytes());
pipelined.sync();
assertEquals(0, p1.get().longValue());
assertEquals(0, p2.get().longValue());
}
}

@Test
public void simple() { // TODO: move into 'redis.clients.jedis.commands.unified.cluster' package
try (JedisCluster jedis = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/redis/clients/jedis/PipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public void pipelineResponseWithinPipeline() {
}

@Test
public void pipelineWithPubSub() {
public void publishInPipeline() {
Pipeline pipelined = jedis.pipelined();
Response<Long> p1 = pipelined.publish("foo", "bar");
Response<Long> p2 = pipelined.publish("foo".getBytes(), "bar".getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,29 @@ public void transaction() {

@Test
public void watch() {
List<Object> resp;
try (AbstractTransaction tx = jedis.transaction(false)) {
assertEquals("OK", tx.watch("mykey", "somekey"));
tx.multi();

jedis.set("mykey", "bar");

tx.set("mykey", "foo");
resp = tx.exec();
assertNull(tx.exec());

assertEquals("bar", jedis.get("mykey"));
}
}

@Test
public void publishInTransaction() {
try (AbstractTransaction tx = jedis.multi()) {
Response<Long> p1 = tx.publish("foo", "bar");
Response<Long> p2 = tx.publish("foo".getBytes(), "bar".getBytes());
tx.exec();

assertEquals(0, p1.get().longValue());
assertEquals(0, p2.get().longValue());
}
assertNull(resp);
assertEquals("bar", jedis.get("mykey"));
}

@Test
Expand Down

0 comments on commit 811dca0

Please sign in to comment.