diff --git a/src/main/java/redis/clients/jedis/AbstractTransaction.java b/src/main/java/redis/clients/jedis/AbstractTransaction.java index ed6f397caa..2a551224fa 100644 --- a/src/main/java/redis/clients/jedis/AbstractTransaction.java +++ b/src/main/java/redis/clients/jedis/AbstractTransaction.java @@ -9,6 +9,10 @@ protected AbstractTransaction() { super(new CommandObjects()); } + protected AbstractTransaction(CommandObjects commandObjects) { + super(commandObjects); + } + public abstract void multi(); /** diff --git a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java index 980bbb91f9..15956ebed4 100644 --- a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java +++ b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java @@ -40,8 +40,7 @@ public final class MultiClusterClientConfig { private static final float CIRCUIT_BREAKER_SLOW_CALL_RATE_THRESHOLD_DEFAULT = 100.0f; // measured as percentage private static final List CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT = Arrays.asList(JedisConnectionException.class); - private static final List> FALLBACK_EXCEPTIONS_DEFAULT = - Arrays.asList(CallNotPermittedException.class, JedisConnectionException.class); + private static final List> FALLBACK_EXCEPTIONS_DEFAULT = Arrays.asList(CallNotPermittedException.class); private final ClusterConfig[] clusterConfigs; diff --git a/src/main/java/redis/clients/jedis/TransactionBase.java b/src/main/java/redis/clients/jedis/TransactionBase.java index 805a1120c4..efdf332700 100644 --- a/src/main/java/redis/clients/jedis/TransactionBase.java +++ b/src/main/java/redis/clients/jedis/TransactionBase.java @@ -9,4 +9,8 @@ public abstract class TransactionBase extends AbstractTransaction { protected TransactionBase() { super(); } + + protected TransactionBase(CommandObjects commandObjects) { + super(commandObjects); + } } diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index eab452a1f1..a43ffc1090 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -4849,7 +4849,7 @@ public PipelineBase pipelined() { if (provider == null) { throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass()); } else if (provider instanceof MultiClusterPooledConnectionProvider) { - return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider); + return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider, commandObjects); } else { return new Pipeline(provider.getConnection(), true); } @@ -4859,7 +4859,7 @@ public AbstractTransaction multi() { if (provider == null) { throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass()); } else if (provider instanceof MultiClusterPooledConnectionProvider) { - return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider); + return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider, true, commandObjects); } else { return new Transaction(provider.getConnection(), true, true); } diff --git a/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java b/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java index d4052dae7b..00c0ba1d91 100644 --- a/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java @@ -20,6 +20,7 @@ public class MultiClusterPipeline extends PipelineBase implements Closeable { private final CircuitBreakerFailoverConnectionProvider failoverProvider; private final Queue>> commands = new LinkedList<>(); + @Deprecated public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider) { super(new CommandObjects()); @@ -31,6 +32,11 @@ public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider) } } + public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider, CommandObjects commandObjects) { + super(commandObjects); + this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider); + } + @Override protected final Response appendCommand(CommandObject commandObject) { CommandArguments args = commandObject.getArguments(); diff --git a/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java b/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java index 540911f2d6..d759ce1da0 100644 --- a/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java +++ b/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java @@ -38,6 +38,7 @@ public class MultiClusterTransaction extends TransactionBase { * called with this object. * @param provider */ + @Deprecated public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) { this(provider, true); } @@ -49,6 +50,7 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) { * @param provider * @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI */ + @Deprecated public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti) { this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider); @@ -60,6 +62,21 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, bo if (doMulti) multi(); } + /** + * A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should + * be {@code doMulti=false}. + * + * @param provider + * @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI + * @param commandObjects command objects + */ + public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti, CommandObjects commandObjects) { + super(commandObjects); + this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider); + + if (doMulti) multi(); + } + @Override public final void multi() { appendCommand(new CommandObject<>(new CommandArguments(MULTI), NO_OP_BUILDER)); diff --git a/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java b/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java index dc03c52f39..c6b25d764b 100644 --- a/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java +++ b/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java @@ -26,6 +26,7 @@ import redis.clients.jedis.MultiClusterClientConfig; import redis.clients.jedis.UnifiedJedis; import redis.clients.jedis.exceptions.JedisAccessControlException; +import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; import redis.clients.jedis.util.IOUtils; @@ -68,7 +69,7 @@ public void pipelineWithSwitch() { AbstractPipeline pipe = client.pipelined(); pipe.set("pstr", "foobar"); pipe.hset("phash", "foo", "bar"); - //provider.incrementActiveMultiClusterIndex(); + provider.incrementActiveMultiClusterIndex(); pipe.sync(); } @@ -85,7 +86,7 @@ public void transactionWithSwitch() { AbstractTransaction tx = client.multi(); tx.set("tstr", "foobar"); tx.hset("thash", "foo", "bar"); - //provider.incrementActiveMultiClusterIndex(); + provider.incrementActiveMultiClusterIndex(); assertEquals(Arrays.asList("OK", Long.valueOf(1L)), tx.exec()); } @@ -109,9 +110,19 @@ public void commandFailover() { UnifiedJedis jedis = new UnifiedJedis(cacheProvider); - assertFalse(failoverReporter.failedOver); - log.info("Starting calls to Redis"); String key = "hash-" + System.nanoTime(); + log.info("Starting calls to Redis"); + assertFalse(failoverReporter.failedOver); + for (int attempt = 0; attempt < 10; attempt++) { + try { + jedis.hset(key, "f1", "v1"); + } catch (JedisConnectionException jce) { + // + } + assertFalse(failoverReporter.failedOver); + } + + // should failover now jedis.hset(key, "f1", "v1"); assertTrue(failoverReporter.failedOver); @@ -129,7 +140,8 @@ public void pipelineFailover() { MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder( getClusterConfigs(clientConfig, hostPort_1, hostPort_2)) .circuitBreakerSlidingWindowMinCalls(slidingWindowMinCalls) - .circuitBreakerSlidingWindowSize(slidingWindowSize); + .circuitBreakerSlidingWindowSize(slidingWindowSize) + .fallbackExceptionList(Arrays.asList(JedisConnectionException.class)); RedisFailoverReporter failoverReporter = new RedisFailoverReporter(); MultiClusterPooledConnectionProvider cacheProvider = new MultiClusterPooledConnectionProvider(builder.build()); @@ -137,11 +149,13 @@ public void pipelineFailover() { UnifiedJedis jedis = new UnifiedJedis(cacheProvider); - assertFalse(failoverReporter.failedOver); + String key = "hash-" + System.nanoTime(); log.info("Starting calls to Redis"); + assertFalse(failoverReporter.failedOver); AbstractPipeline pipe = jedis.pipelined(); - String key = "hash-" + System.nanoTime(); + assertFalse(failoverReporter.failedOver); pipe.hset(key, "f1", "v1"); + assertFalse(failoverReporter.failedOver); pipe.sync(); assertTrue(failoverReporter.failedOver); @@ -168,9 +182,9 @@ public void failoverFromAuthError() { UnifiedJedis jedis = new UnifiedJedis(cacheProvider); - assertFalse(failoverReporter.failedOver); - log.info("Starting calls to Redis"); String key = "hash-" + System.nanoTime(); + log.info("Starting calls to Redis"); + assertFalse(failoverReporter.failedOver); jedis.hset(key, "f1", "v1"); assertTrue(failoverReporter.failedOver);