Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop connection fetching before sync/exec #3756

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/AbstractTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ protected AbstractTransaction() {
super(new CommandObjects());
}

protected AbstractTransaction(CommandObjects commandObjects) {
super(commandObjects);
}

public abstract void multi();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public final class MultiClusterClientConfig {
private static final float CIRCUIT_BREAKER_SLOW_CALL_RATE_THRESHOLD_DEFAULT = 100.0f; // measured as percentage
private static final List<Class> CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT = Arrays.asList(JedisConnectionException.class);

private static final List<Class<? extends Throwable>> FALLBACK_EXCEPTIONS_DEFAULT =
Arrays.asList(CallNotPermittedException.class, JedisConnectionException.class);
private static final List<Class<? extends Throwable>> FALLBACK_EXCEPTIONS_DEFAULT = Arrays.asList(CallNotPermittedException.class);

private final ClusterConfig[] clusterConfigs;

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ public abstract class TransactionBase extends AbstractTransaction {
protected TransactionBase() {
super();
}

protected TransactionBase(CommandObjects commandObjects) {
super(commandObjects);
}
}
4 changes: 2 additions & 2 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4849,7 +4849,7 @@ public PipelineBase pipelined() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider);
return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider, commandObjects);
} else {
return new Pipeline(provider.getConnection(), true);
}
Expand All @@ -4859,7 +4859,7 @@ public AbstractTransaction multi() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider);
return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider, true, commandObjects);
} else {
return new Transaction(provider.getConnection(), true, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class MultiClusterPipeline extends PipelineBase implements Closeable {
private final CircuitBreakerFailoverConnectionProvider failoverProvider;
private final Queue<KeyValue<CommandArguments, Response<?>>> commands = new LinkedList<>();

@Deprecated
public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider) {
super(new CommandObjects());

Expand All @@ -31,6 +32,11 @@ public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider)
}
}

public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider, CommandObjects commandObjects) {
super(commandObjects);
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider);
}

@Override
protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
CommandArguments args = commandObject.getArguments();
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class MultiClusterTransaction extends TransactionBase {
* called with this object.
* @param provider
*/
@Deprecated
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) {
this(provider, true);
}
Expand All @@ -49,6 +50,7 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) {
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
@Deprecated
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti) {
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider);

Expand All @@ -60,6 +62,21 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, bo
if (doMulti) multi();
}

/**
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param commandObjects command objects
*/
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti, CommandObjects commandObjects) {
super(commandObjects);
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider);

if (doMulti) multi();
}

@Override
public final void multi() {
appendCommand(new CommandObject<>(new CommandArguments(MULTI), NO_OP_BUILDER));
Expand Down
32 changes: 23 additions & 9 deletions src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import redis.clients.jedis.MultiClusterClientConfig;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.exceptions.JedisAccessControlException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.util.IOUtils;

Expand Down Expand Up @@ -68,7 +69,7 @@ public void pipelineWithSwitch() {
AbstractPipeline pipe = client.pipelined();
pipe.set("pstr", "foobar");
pipe.hset("phash", "foo", "bar");
//provider.incrementActiveMultiClusterIndex();
provider.incrementActiveMultiClusterIndex();
pipe.sync();
}

Expand All @@ -85,7 +86,7 @@ public void transactionWithSwitch() {
AbstractTransaction tx = client.multi();
tx.set("tstr", "foobar");
tx.hset("thash", "foo", "bar");
//provider.incrementActiveMultiClusterIndex();
provider.incrementActiveMultiClusterIndex();
assertEquals(Arrays.asList("OK", Long.valueOf(1L)), tx.exec());
}

Expand All @@ -109,9 +110,19 @@ public void commandFailover() {

UnifiedJedis jedis = new UnifiedJedis(cacheProvider);

assertFalse(failoverReporter.failedOver);
log.info("Starting calls to Redis");
String key = "hash-" + System.nanoTime();
log.info("Starting calls to Redis");
assertFalse(failoverReporter.failedOver);
for (int attempt = 0; attempt < 10; attempt++) {
try {
jedis.hset(key, "f1", "v1");
} catch (JedisConnectionException jce) {
//
}
assertFalse(failoverReporter.failedOver);
}

// should failover now
jedis.hset(key, "f1", "v1");
assertTrue(failoverReporter.failedOver);

Expand All @@ -129,19 +140,22 @@ public void pipelineFailover() {
MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder(
getClusterConfigs(clientConfig, hostPort_1, hostPort_2))
.circuitBreakerSlidingWindowMinCalls(slidingWindowMinCalls)
.circuitBreakerSlidingWindowSize(slidingWindowSize);
.circuitBreakerSlidingWindowSize(slidingWindowSize)
.fallbackExceptionList(Arrays.asList(JedisConnectionException.class));

RedisFailoverReporter failoverReporter = new RedisFailoverReporter();
MultiClusterPooledConnectionProvider cacheProvider = new MultiClusterPooledConnectionProvider(builder.build());
cacheProvider.setClusterFailoverPostProcessor(failoverReporter);

UnifiedJedis jedis = new UnifiedJedis(cacheProvider);

assertFalse(failoverReporter.failedOver);
String key = "hash-" + System.nanoTime();
log.info("Starting calls to Redis");
assertFalse(failoverReporter.failedOver);
AbstractPipeline pipe = jedis.pipelined();
String key = "hash-" + System.nanoTime();
assertFalse(failoverReporter.failedOver);
pipe.hset(key, "f1", "v1");
assertFalse(failoverReporter.failedOver);
pipe.sync();
assertTrue(failoverReporter.failedOver);

Expand All @@ -168,9 +182,9 @@ public void failoverFromAuthError() {

UnifiedJedis jedis = new UnifiedJedis(cacheProvider);

assertFalse(failoverReporter.failedOver);
log.info("Starting calls to Redis");
String key = "hash-" + System.nanoTime();
log.info("Starting calls to Redis");
assertFalse(failoverReporter.failedOver);
jedis.hset(key, "f1", "v1");
assertTrue(failoverReporter.failedOver);

Expand Down
Loading