Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add A-A failover scenario test #3935

Merged
merged 5 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ public Cluster getCluster() {
return multiClusterMap.get(activeMultiClusterIndex);
}

public Cluster getCluster(int multiClusterIndex) {
uglide marked this conversation as resolved.
Show resolved Hide resolved
return multiClusterMap.get(multiClusterIndex);
}

public CircuitBreaker getClusterCircuitBreaker() {
return multiClusterMap.get(activeMultiClusterIndex).getCircuitBreaker();
}
Expand Down
4 changes: 4 additions & 0 deletions src/test/java/redis/clients/jedis/EndpointConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public HostAndPort getHostAndPort() {
return JedisURIHelper.getHostAndPort(endpoints.get(0));
}

public HostAndPort getHostAndPort(int index) {
return JedisURIHelper.getHostAndPort(endpoints.get(index));
}

public String getPassword() {
return password;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package redis.clients.jedis.scenario;

import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.junit.Assert.*;

public class ActiveActiveFailoverTest {
private static final Logger log = LoggerFactory.getLogger(ActiveActiveFailoverTest.class);

private static EndpointConfig endpoint;

private final FaultInjectionClient faultClient = new FaultInjectionClient();

@BeforeClass
public static void beforeClass() {
try {
ActiveActiveFailoverTest.endpoint = HostAndPorts.getRedisEndpoint("re-active-active");
} catch (IllegalArgumentException e) {
log.warn("Skipping test because no Redis endpoint is configured");
org.junit.Assume.assumeTrue(false);
}
}

@Test
public void testFailover() {

MultiClusterClientConfig.ClusterConfig[] clusterConfig = new MultiClusterClientConfig.ClusterConfig[2];

JedisClientConfig config = endpoint.getClientConfigBuilder()
.socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS)
.connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build();

clusterConfig[0] = new MultiClusterClientConfig.ClusterConfig(endpoint.getHostAndPort(0),
config, RecommendedSettings.poolConfig);
clusterConfig[1] = new MultiClusterClientConfig.ClusterConfig(endpoint.getHostAndPort(1),
config, RecommendedSettings.poolConfig);

MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder(clusterConfig);

builder.circuitBreakerSlidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED);
builder.circuitBreakerSlidingWindowSize(1); // SLIDING WINDOW SIZE IN SECONDS
builder.circuitBreakerSlidingWindowMinCalls(1);
builder.circuitBreakerFailureRateThreshold(
10.0f); // percentage of failures to trigger circuit breaker
uglide marked this conversation as resolved.
Show resolved Hide resolved

builder.retryWaitDuration(10);
builder.retryMaxAttempts(1);
builder.retryWaitDurationExponentialBackoffMultiplier(1);

class FailoverReporter implements Consumer<String> {

String currentClusterName = "not set";

boolean failoverHappened = false;

Instant failoverAt = null;

public String getCurrentClusterName() {
return currentClusterName;
}

@Override
public void accept(String clusterName) {
this.currentClusterName = clusterName;
log.info(
"\n\n====FailoverEvent=== \nJedis failover to cluster: {}\n====FailoverEvent===\n\n",
clusterName);
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved

failoverHappened = true;
failoverAt = Instant.now();
}
}

MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(
builder.build());
FailoverReporter reporter = new FailoverReporter();
provider.setClusterFailoverPostProcessor(reporter);
provider.setActiveMultiClusterIndex(1);

UnifiedJedis client = new UnifiedJedis(provider);

AtomicLong retryingThreadsCounter = new AtomicLong(0);
AtomicLong failedCommandsAfterFailover = new AtomicLong(0);
AtomicReference<Instant> lastFailedCommandAt = new AtomicReference<>();

// Start thread that imitates an application that uses the client
MultiThreadedFakeApp fakeApp = new MultiThreadedFakeApp(client, (UnifiedJedis c) -> {

long threadId = Thread.currentThread().getId();

int attempt = 0;
int maxTries = 500;
int retryingDelay = 5;
while (true) {
try {
Map<String, String> executionInfo = new HashMap<String, String>() {{
put("threadId", String.valueOf(threadId));
put("cluster", reporter.getCurrentClusterName());
}};
client.xadd("execution_log", StreamEntryID.NEW_ENTRY, executionInfo);

if (attempt > 0) {
log.info("Thread {} recovered after {} ms. Threads still not recovered: {}", threadId,
attempt * retryingDelay, retryingThreadsCounter.decrementAndGet());
}

break;
} catch (JedisConnectionException e) {

if (reporter.failoverHappened) {
long failedCommands = failedCommandsAfterFailover.incrementAndGet();
lastFailedCommandAt.set(Instant.now());
log.warn(
"Thread {} failed to execute command after failover. Failed commands after failover: {}",
threadId, failedCommands);
}

if (attempt == 0) {
long failedThreads = retryingThreadsCounter.incrementAndGet();
log.warn("Thread {} failed to execute command. Failed threads: {}", threadId,
failedThreads);
}
try {
Thread.sleep(retryingDelay);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
if (++attempt == maxTries) throw e;
}
}
return true;
}, 18);
fakeApp.setKeepExecutingForSeconds(30);
Thread t = new Thread(fakeApp);
t.start();

HashMap<String, Object> params = new HashMap<>();
params.put("bdb_id", endpoint.getBdbId());
params.put("rlutil_command", "pause_bdb");

FaultInjectionClient.TriggerActionResponse actionResponse = null;

try {
log.info("Triggering bdb_pause");
actionResponse = faultClient.triggerAction("execute_rlutil_command", params);
} catch (IOException e) {
fail("Fault Injection Server error:" + e.getMessage());
}

log.info("Action id: {}", actionResponse.getActionId());
fakeApp.setAction(actionResponse);

try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

ConnectionPool pool = provider.getCluster(1).getConnectionPool();

log.info("First connection pool state: active: {}, idle: {}", pool.getNumActive(),
pool.getNumIdle());
log.info("Full failover time: {} s",
Duration.between(reporter.failoverAt, lastFailedCommandAt.get()).getSeconds());

assertEquals(0, pool.getNumActive());
assertTrue(fakeApp.capturedExceptions().isEmpty());

client.close();
}

}
12 changes: 6 additions & 6 deletions src/test/java/redis/clients/jedis/scenario/FakeApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@

public class FakeApp implements Runnable {

private static final Logger log = LoggerFactory.getLogger(FakeApp.class);
protected static final Logger log = LoggerFactory.getLogger(FakeApp.class);

public void setKeepExecutingForSeconds(int keepExecutingForSeconds) {
this.keepExecutingForSeconds = keepExecutingForSeconds;
}

private int keepExecutingForSeconds = 60;
protected int keepExecutingForSeconds = 60;

private FaultInjectionClient.TriggerActionResponse actionResponse = null;
private final UnifiedJedis client;
private final ExecutedAction action;
private List<JedisException> exceptions = new ArrayList<>();
protected FaultInjectionClient.TriggerActionResponse actionResponse = null;
protected final UnifiedJedis client;
protected final ExecutedAction action;
protected List<JedisException> exceptions = new ArrayList<>();

@FunctionalInterface
public interface ExecutedAction {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package redis.clients.jedis.scenario;

import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MultiThreadedFakeApp extends FakeApp {

private final ExecutorService executorService;

public MultiThreadedFakeApp(UnifiedJedis client, FakeApp.ExecutedAction action, int numThreads) {
super(client, action);
this.executorService = Executors.newFixedThreadPool(numThreads);
}

@Override
public void run() {
log.info("Starting FakeApp");

int checkEachSeconds = 5;
int timeoutSeconds = 120;

while (actionResponse == null || !actionResponse.isCompleted(
Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds),
Duration.ofSeconds(timeoutSeconds))) {
try {
executorService.submit(() -> action.run(client));
} catch (JedisConnectionException e) {
log.error("Error executing action", e);
exceptions.add(e);
}
}

executorService.shutdown();

try {
if (!executorService.awaitTermination(keepExecutingForSeconds, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
log.error("Error waiting for executor service to terminate", e);
}
}
}
Loading