diff --git a/pom.xml b/pom.xml
index cee45b01e9..fef8874ecd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,6 +133,12 @@
2.38.0
test
+
+ org.apache.httpcomponents.client5
+ httpclient5-fluent
+ 5.3.1
+ test
+
diff --git a/src/test/java/redis/clients/jedis/EndpointConfig.java b/src/test/java/redis/clients/jedis/EndpointConfig.java
index 5cb322224d..42a44a3c47 100644
--- a/src/test/java/redis/clients/jedis/EndpointConfig.java
+++ b/src/test/java/redis/clients/jedis/EndpointConfig.java
@@ -1,6 +1,8 @@
package redis.clients.jedis;
+import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import redis.clients.jedis.util.JedisURIHelper;
@@ -118,7 +120,9 @@ protected String getURISchema(boolean tls) {
}
public static HashMap loadFromJSON(String filePath) throws Exception {
- Gson gson = new Gson();
+ Gson gson = new GsonBuilder().setFieldNamingPolicy(
+ FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create();
+
HashMap configs;
try (FileReader reader = new FileReader(filePath)) {
configs = gson.fromJson(reader, new TypeToken>() {
diff --git a/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java b/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java
new file mode 100644
index 0000000000..e4fc9a8b0a
--- /dev/null
+++ b/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java
@@ -0,0 +1,100 @@
+package redis.clients.jedis.scenario;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.*;
+import redis.clients.jedis.providers.ClusterConnectionProvider;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class ClusterTopologyRefreshTest {
+
+ private static final Logger log = LoggerFactory.getLogger(ClusterTopologyRefreshTest.class);
+
+ private static EndpointConfig endpoint;
+
+ private final FaultInjectionClient faultClient = new FaultInjectionClient();
+
+ @BeforeClass
+ public static void beforeClass() {
+ try {
+ ClusterTopologyRefreshTest.endpoint = HostAndPorts.getRedisEndpoint("re-single-shard-oss-cluster");
+ } catch (IllegalArgumentException e) {
+ log.warn("Skipping test because no Redis endpoint is configured");
+ org.junit.Assume.assumeTrue(false);
+ }
+ }
+
+ @Test
+ public void testWithPool() {
+ Set jedisClusterNode = new HashSet<>();
+ jedisClusterNode.add(endpoint.getHostAndPort());
+
+ JedisClientConfig config = endpoint.getClientConfigBuilder()
+ .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS)
+ .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build();
+
+
+ ClusterConnectionProvider provider = new ClusterConnectionProvider(jedisClusterNode, config, RecommendedSettings.poolConfig);
+ ClusterConnectionProvider spyProvider = spy(provider);
+
+ try (JedisCluster client = new JedisCluster(spyProvider,
+ RecommendedSettings.MAX_RETRIES, RecommendedSettings.MAX_TOTAL_RETRIES_DURATION)) {
+ assertEquals("Was this BDB used to run this test before?", 1,
+ client.getClusterNodes().size());
+
+ AtomicLong commandsExecuted = new AtomicLong();
+
+ // Start thread that imitates an application that uses the client
+ FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
+ long i = commandsExecuted.getAndIncrement();
+ client.set(String.valueOf(i), String.valueOf(i));
+ return true;
+ });
+
+ Thread t = new Thread(fakeApp);
+ t.start();
+
+ HashMap params = new HashMap<>();
+ params.put("bdb_id", endpoint.getBdbId());
+ params.put("actions", "[\"reshard\",\"failover\"]");
+
+ FaultInjectionClient.TriggerActionResponse actionResponse = null;
+
+ try {
+ log.info("Triggering Resharding and Failover");
+ actionResponse = faultClient.triggerAction("sequence_of_actions", params);
+ } catch (IOException e) {
+ fail("Fault Injection Server error:" + e.getMessage());
+ }
+
+ log.info("Action id: {}", actionResponse.getActionId());
+ fakeApp.setAction(actionResponse);
+
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ assertTrue(fakeApp.capturedExceptions().isEmpty());
+
+ log.info("Commands executed: {}", commandsExecuted.get());
+ for (long i = 0; i < commandsExecuted.get(); i++) {
+ assertTrue(client.exists(String.valueOf(i)));
+ }
+
+ verify(spyProvider, atLeast(2)).renewSlotCache(any(Connection.class));
+ }
+ }
+
+}
diff --git a/src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java b/src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java
new file mode 100644
index 0000000000..ee584931ca
--- /dev/null
+++ b/src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java
@@ -0,0 +1,182 @@
+package redis.clients.jedis.scenario;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.*;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.providers.ConnectionProvider;
+import redis.clients.jedis.providers.PooledConnectionProvider;
+import redis.clients.jedis.util.SafeEncoder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class ConnectionInterruptionTest {
+
+ private static final Logger log = LoggerFactory.getLogger(ConnectionInterruptionTest.class);
+
+ private static EndpointConfig endpoint;
+
+ private final FaultInjectionClient faultClient = new FaultInjectionClient();
+
+ @Parameterized.Parameters
+ public static Iterable> data() {
+ return Arrays.asList("dmc_restart", "network_failure");
+ }
+
+ @Parameterized.Parameter
+ public String triggerAction;
+
+ @BeforeClass
+ public static void beforeClass() {
+ try {
+ ConnectionInterruptionTest.endpoint = HostAndPorts.getRedisEndpoint("re-standalone");
+ } catch (IllegalArgumentException e) {
+ log.warn("Skipping test because no Redis endpoint is configured");
+ org.junit.Assume.assumeTrue(false);
+ }
+ }
+
+ @Test
+ public void testWithPool() {
+ ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(),
+ endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig);
+
+ UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES,
+ RecommendedSettings.MAX_TOTAL_RETRIES_DURATION);
+ String keyName = "counter";
+ client.set(keyName, "0");
+ assertEquals("0", client.get(keyName));
+
+ AtomicLong commandsExecuted = new AtomicLong();
+
+ // Start thread that imitates an application that uses the client
+ FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
+ assertTrue(client.incr(keyName) > 0);
+ long currentCount = commandsExecuted.getAndIncrement();
+ log.info("Command executed {}", currentCount);
+ return true;
+ });
+ fakeApp.setKeepExecutingForSeconds(RecommendedSettings.DEFAULT_TIMEOUT_MS/1000 * 2);
+ Thread t = new Thread(fakeApp);
+ t.start();
+
+ HashMap params = new HashMap<>();
+ params.put("bdb_id", endpoint.getBdbId());
+
+ FaultInjectionClient.TriggerActionResponse actionResponse = null;
+
+ try {
+ log.info("Triggering {}", triggerAction);
+ actionResponse = faultClient.triggerAction(triggerAction, params);
+ } catch (IOException e) {
+ fail("Fault Injection Server error:" + e.getMessage());
+ }
+
+ log.info("Action id: {}", actionResponse.getActionId());
+ fakeApp.setAction(actionResponse);
+
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ log.info("Commands executed: {}", commandsExecuted.get());
+ assertEquals(commandsExecuted.get(), Long.parseLong(client.get(keyName)));
+ assertTrue(fakeApp.capturedExceptions().isEmpty());
+
+ client.close();
+ }
+
+ @Test
+ public void testWithPubSub() {
+ ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(),
+ endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig);
+
+ UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES,
+ RecommendedSettings.MAX_TOTAL_RETRIES_DURATION);
+
+ AtomicLong messagesSent = new AtomicLong();
+ AtomicLong messagesReceived = new AtomicLong();
+
+ final Thread subscriberThread = getSubscriberThread(messagesReceived, connectionProvider);
+
+ // Start thread that imitates a publisher that uses the client
+ FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
+ log.info("Publishing message");
+ long consumed = client.publish("test", String.valueOf(messagesSent.getAndIncrement()));
+ return consumed > 0;
+ });
+ fakeApp.setKeepExecutingForSeconds(10);
+ Thread t = new Thread(fakeApp);
+ t.start();
+
+ HashMap params = new HashMap<>();
+ params.put("bdb_id", endpoint.getBdbId());
+
+ FaultInjectionClient.TriggerActionResponse actionResponse = null;
+
+ try {
+ log.info("Triggering {}", triggerAction);
+ actionResponse = faultClient.triggerAction(triggerAction, params);
+ } catch (IOException e) {
+ fail("Fault Injection Server error:" + e.getMessage());
+ }
+
+ log.info("Action id: {}", actionResponse.getActionId());
+ fakeApp.setAction(actionResponse);
+
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (subscriberThread.isAlive())
+ subscriberThread.interrupt();
+
+ assertEquals(messagesSent.get() - 1, messagesReceived.get());
+ assertTrue(fakeApp.capturedExceptions().isEmpty());
+
+ client.close();
+ }
+
+ private static Thread getSubscriberThread(AtomicLong messagesReceived,
+ ConnectionProvider connectionProvider) {
+ final JedisPubSubBase pubSub = new JedisPubSubBase() {
+
+ @Override
+ public void onMessage(String channel, String message) {
+ messagesReceived.incrementAndGet();
+ log.info("Received message: {}", message);
+ }
+
+ @Override
+ protected String encode(byte[] raw) {
+ return SafeEncoder.encode(raw);
+ }
+ };
+
+ final Thread subscriberThread = new Thread(() -> {
+ try {
+ pubSub.proceed(connectionProvider.getConnection(), "test");
+ fail("PubSub should have been interrupted");
+ } catch (JedisConnectionException e) {
+ log.info("Expected exception in Subscriber: {}", e.getMessage());
+ assertTrue(e.getMessage().contains("Unexpected end of stream."));
+ }
+ });
+ subscriberThread.start();
+ return subscriberThread;
+ }
+}
diff --git a/src/test/java/redis/clients/jedis/scenario/FakeApp.java b/src/test/java/redis/clients/jedis/scenario/FakeApp.java
new file mode 100644
index 0000000000..7e505862a2
--- /dev/null
+++ b/src/test/java/redis/clients/jedis/scenario/FakeApp.java
@@ -0,0 +1,65 @@
+package redis.clients.jedis.scenario;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.UnifiedJedis;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisException;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FakeApp implements Runnable {
+
+ private static final Logger log = LoggerFactory.getLogger(FakeApp.class);
+
+ public void setKeepExecutingForSeconds(int keepExecutingForSeconds) {
+ this.keepExecutingForSeconds = keepExecutingForSeconds;
+ }
+
+ private int keepExecutingForSeconds = 60;
+
+ private FaultInjectionClient.TriggerActionResponse actionResponse = null;
+ private final UnifiedJedis client;
+ private final ExecutedAction action;
+ private List exceptions = new ArrayList<>();
+
+ @FunctionalInterface
+ public interface ExecutedAction {
+ boolean run(UnifiedJedis client);
+ }
+
+ public FakeApp(UnifiedJedis client, ExecutedAction action) {
+ this.client = client;
+ this.action = action;
+ }
+
+ public void setAction(FaultInjectionClient.TriggerActionResponse actionResponse) {
+ this.actionResponse = actionResponse;
+ }
+
+ public List capturedExceptions() {
+ return exceptions;
+ }
+
+ public void run() {
+ log.info("Starting FakeApp");
+
+ int checkEachSeconds = 5;
+ int timeoutSeconds = 120;
+
+ while (actionResponse == null || !actionResponse.isCompleted(
+ Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds),
+ Duration.ofSeconds(timeoutSeconds))) {
+ try {
+ boolean success = action.run(client);
+
+ if (!success) break;
+ } catch (JedisConnectionException e) {
+ log.error("Error executing action", e);
+ exceptions.add(e);
+ }
+ }
+ }
+}
diff --git a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java
new file mode 100644
index 0000000000..c4e1c5717b
--- /dev/null
+++ b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java
@@ -0,0 +1,124 @@
+package redis.clients.jedis.scenario;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.fluent.Request;
+import com.google.gson.Gson;
+import org.apache.hc.client5.http.fluent.Response;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
+import org.apache.hc.core5.http.ContentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FaultInjectionClient {
+
+ private static final String BASE_URL;
+
+ static {
+ BASE_URL = System.getenv().getOrDefault("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324");
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(FaultInjectionClient.class);
+
+ public static class TriggerActionResponse {
+ private final String actionId;
+
+ private Instant lastRequestTime = null;
+
+ private Instant completedAt = null;
+
+ private Instant firstRequestAt = null;
+
+ public TriggerActionResponse(String actionId) {
+ this.actionId = actionId;
+ }
+
+ public String getActionId() {
+ return actionId;
+ }
+
+ public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration timeout) {
+ if (completedAt != null) {
+ return Duration.between(completedAt, Instant.now()).compareTo(delayAfter) >= 0;
+ }
+
+ if (firstRequestAt != null && Duration.between(firstRequestAt, Instant.now())
+ .compareTo(timeout) >= 0) {
+ throw new RuntimeException("Timeout");
+ }
+
+ if (lastRequestTime == null || Duration.between(lastRequestTime, Instant.now())
+ .compareTo(checkInterval) >= 0) {
+ lastRequestTime = Instant.now();
+
+ if (firstRequestAt == null) {
+ firstRequestAt = lastRequestTime;
+ }
+
+ CloseableHttpClient httpClient = getHttpClient();
+
+ Request request = Request.get(BASE_URL + "/action/" + actionId);
+
+ try {
+ Response response = request.execute(httpClient);
+ String result = response.returnContent().asString();
+
+ log.info("Action status: {}", result);
+
+ if (result.contains("success")) {
+ completedAt = Instant.now();
+ return Duration.between(completedAt, Instant.now()).compareTo(delayAfter) >= 0;
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException("Fault injection proxy error ", e);
+ }
+ }
+ return false;
+ }
+ }
+
+ private static CloseableHttpClient getHttpClient() {
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectionRequestTimeout(5000, TimeUnit.MILLISECONDS)
+ .setResponseTimeout(5000, TimeUnit.MILLISECONDS).build();
+
+ return HttpClientBuilder.create()
+ .setDefaultRequestConfig(requestConfig).build();
+ }
+
+ public TriggerActionResponse triggerAction(String actionType, HashMap parameters)
+ throws IOException {
+ Gson gson = new GsonBuilder().setFieldNamingPolicy(
+ FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create();
+
+ HashMap payload = new HashMap<>();
+ payload.put("type", actionType);
+ payload.put("parameters", parameters);
+
+ String jsonString = gson.toJson(payload);
+
+ CloseableHttpClient httpClient = getHttpClient();
+ Request request = Request.post(BASE_URL + "/action");
+ request.bodyString(jsonString, ContentType.APPLICATION_JSON);
+
+ try {
+ String result = request.execute(httpClient).returnContent().asString();
+ return gson.fromJson(result, new TypeToken() {
+ }.getType());
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+}
diff --git a/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java b/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java
new file mode 100644
index 0000000000..aa1671ad9e
--- /dev/null
+++ b/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java
@@ -0,0 +1,31 @@
+package redis.clients.jedis.scenario;
+
+import redis.clients.jedis.ConnectionPoolConfig;
+
+import java.time.Duration;
+
+public class RecommendedSettings {
+
+ public static ConnectionPoolConfig poolConfig;
+
+ static {
+ poolConfig = new ConnectionPoolConfig();
+ ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
+ poolConfig.setMaxTotal(8);
+ poolConfig.setMaxIdle(8);
+ poolConfig.setMinIdle(0);
+ poolConfig.setBlockWhenExhausted(true);
+ poolConfig.setMaxWait(Duration.ofSeconds(1));
+ poolConfig.setTestWhileIdle(true);
+ poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(1));
+ }
+
+ public static int MAX_RETRIES = 5;
+
+ public static Duration MAX_TOTAL_RETRIES_DURATION = Duration.ofSeconds(10);
+
+ public static int DEFAULT_TIMEOUT_MS = 5000;
+
+
+
+}