From fbd7f9ff48f811b20f8e82022ef33307fee609ae Mon Sep 17 00:00:00 2001 From: Igor Malinovskiy Date: Mon, 27 May 2024 19:56:55 +0200 Subject: [PATCH 1/6] Add POC DMC restart test --- pom.xml | 5 + .../redis/clients/jedis/EndpointConfig.java | 6 +- .../jedis/scenario/DMCRestartTest.java | 86 +++++++++++++++ .../jedis/scenario/FaultInjectionClient.java | 101 ++++++++++++++++++ 4 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java create mode 100644 src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java diff --git a/pom.xml b/pom.xml index 1ae12c300c..0dc25f38ca 100644 --- a/pom.xml +++ b/pom.xml @@ -133,6 +133,11 @@ 2.38.0 test + + org.apache.httpcomponents.client5 + httpclient5-fluent + 5.3.1 + diff --git a/src/test/java/redis/clients/jedis/EndpointConfig.java b/src/test/java/redis/clients/jedis/EndpointConfig.java index 5cb322224d..42a44a3c47 100644 --- a/src/test/java/redis/clients/jedis/EndpointConfig.java +++ b/src/test/java/redis/clients/jedis/EndpointConfig.java @@ -1,6 +1,8 @@ package redis.clients.jedis; +import com.google.gson.FieldNamingPolicy; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; import redis.clients.jedis.util.JedisURIHelper; @@ -118,7 +120,9 @@ protected String getURISchema(boolean tls) { } public static HashMap loadFromJSON(String filePath) throws Exception { - Gson gson = new Gson(); + Gson gson = new GsonBuilder().setFieldNamingPolicy( + FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); + HashMap configs; try (FileReader reader = new FileReader(filePath)) { configs = gson.fromJson(reader, new TypeToken>() { diff --git a/src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java b/src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java new file mode 100644 index 0000000000..19737080ef --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java @@ -0,0 +1,86 @@ +package redis.clients.jedis.scenario; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.*; +import redis.clients.jedis.misc.AutomaticFailoverTest; +import redis.clients.jedis.providers.ConnectionProvider; +import redis.clients.jedis.providers.PooledConnectionProvider; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; + +import static org.junit.Assert.*; + +public class DMCRestartTest { + + private static final Logger log = LoggerFactory.getLogger(AutomaticFailoverTest.class); + + private final EndpointConfig endpoint = HostAndPorts.getRedisEndpoint("re-standalone"); + + private final FaultInjectionClient faultClient = new FaultInjectionClient(); + + @Test + public void testWithPool() { + + // Validate recommended configuration + ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); + poolConfig.setMaxTotal(8); + poolConfig.setMaxIdle(8); + poolConfig.setMinIdle(0); + poolConfig.setBlockWhenExhausted(true); + poolConfig.setMaxWait(Duration.ofSeconds(1)); + poolConfig.setTestWhileIdle(true); + poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(1)); + + // Retry settings + int maxAttempts = 3; + Duration maxTotalRetriesDuration = Duration.ofSeconds(10); + + ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(), + endpoint.getClientConfigBuilder().build(), new ConnectionPoolConfig()); + + UnifiedJedis client = new UnifiedJedis(connectionProvider, maxAttempts, + maxTotalRetriesDuration); + String keyName = "counter"; + client.set(keyName, "0"); + assertEquals("0", client.get(keyName)); + + HashMap params = new HashMap<>(); + params.put("bdb_id", endpoint.getBdbId()); + + FaultInjectionClient.TriggerActionResponse actionResponse = null; + + try { + log.info("Triggering DMC restart"); + actionResponse = faultClient.triggerAction("dmc_restart", params); + } catch (IOException e) { + fail("Fault Injection Server error:" + e.getMessage()); + } + + log.info("Action id: {}", actionResponse.getActionId()); + + int checkEachSeconds = 5; + int keepExecutingForSeconds = 60; + int timeoutSeconds = 120; + long commandsExecuted = 0; + + while (!actionResponse.isCompleted(Duration.ofSeconds(checkEachSeconds), + Duration.ofSeconds(keepExecutingForSeconds), Duration.ofSeconds(timeoutSeconds))) { + assertTrue (client.incr(keyName) > 0); + commandsExecuted++; + } + + log.info("Commands executed: {}", commandsExecuted); + log.info("Test took {} seconds", + Duration.between(actionResponse.getFirstRequestAt(), Instant.now()).getSeconds()); + + assertEquals(commandsExecuted, Long.parseLong(client.get(keyName))); + + client.close(); + } + +} diff --git a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java new file mode 100644 index 0000000000..fc824cd0a9 --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java @@ -0,0 +1,101 @@ +package redis.clients.jedis.scenario; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; + +import com.google.gson.FieldNamingPolicy; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import org.apache.hc.client5.http.fluent.Request; +import com.google.gson.Gson; +import org.apache.hc.client5.http.fluent.Response; +import org.apache.hc.core5.http.ContentType; + +public class FaultInjectionClient { + + private static final String BASE_URL = "http://127.0.0.1:20324"; + + public static class TriggerActionResponse { + private final String actionId; + + private Instant lastRequestTime = null; + + private Instant completedAt = null; + + private Instant firstRequestAt = null; + + public TriggerActionResponse(String actionId) { + this.actionId = actionId; + } + + public String getActionId() { + return actionId; + } + + public Instant getFirstRequestAt() { + return completedAt; + } + + public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration timeout) { + if (completedAt != null) { + return Duration.between(completedAt, Instant.now()).compareTo(delayAfter) >= 0; + } + + if (firstRequestAt != null && Duration.between(firstRequestAt, Instant.now()) + .compareTo(timeout) >= 0) { + throw new RuntimeException("Timeout"); + } + + if (lastRequestTime == null || Duration.between(lastRequestTime, Instant.now()) + .compareTo(checkInterval) >= 0) { + lastRequestTime = Instant.now(); + + if (firstRequestAt == null) { + firstRequestAt = lastRequestTime; + } + + Request request = Request.get(BASE_URL + "/action/" + actionId); + try { + Response response = request.execute(); + String result = response.returnContent().asString(); + + if (result.contains("success")) { + completedAt = Instant.now(); + return Duration.between(completedAt, Instant.now()).compareTo(delayAfter) >= 0; + } + + } catch (IOException e) { + throw new RuntimeException("Fault injection proxy error ", e); + } + } + return false; + } + } + + public TriggerActionResponse triggerAction(String actionType, HashMap parameters) + throws IOException { + Gson gson = new GsonBuilder().setFieldNamingPolicy( + FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); + + HashMap payload = new HashMap<>(); + payload.put("type", actionType); + payload.put("parameters", parameters); + + String jsonString = gson.toJson(payload); + + Request request = Request.post(BASE_URL + "/action"); + request.bodyString(jsonString, ContentType.APPLICATION_JSON); + + try { + String result = request.execute().returnContent().asString(); + return gson.fromJson(result, new TypeToken() { + }.getType()); + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + } + +} From a69ca3ed950b0157970b4feb7c26a470189a26da Mon Sep 17 00:00:00 2001 From: Igor Malinovskiy Date: Tue, 28 May 2024 12:22:34 +0200 Subject: [PATCH 2/6] Cleanup --- .../jedis/scenario/DMCRestartTest.java | 63 ++++++++++++++----- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java b/src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java index 19737080ef..4c2894d61b 100644 --- a/src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java +++ b/src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java @@ -10,7 +10,6 @@ import java.io.IOException; import java.time.Duration; -import java.time.Instant; import java.util.HashMap; import static org.junit.Assert.*; @@ -23,6 +22,42 @@ public class DMCRestartTest { private final FaultInjectionClient faultClient = new FaultInjectionClient(); + public static class FakeApp implements Runnable { + + private FaultInjectionClient.TriggerActionResponse actionResponse = null; + private final UnifiedJedis client; + private long commandsExecuted = 0; + private final String keyName; + + public FakeApp(UnifiedJedis client, String keyName) { + this.client = client; + this.keyName = keyName; + } + + public void setAction(FaultInjectionClient.TriggerActionResponse actionResponse) { + this.actionResponse = actionResponse; + } + + public long getExecutedCommandsCount() { + return commandsExecuted; + } + + public void run() { + log.info("Starting FakeApp"); + + int checkEachSeconds = 5; + int keepExecutingForSeconds = 60; + int timeoutSeconds = 120; + + while (actionResponse == null || !actionResponse.isCompleted( + Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds), + Duration.ofSeconds(timeoutSeconds))) { + assertTrue(client.incr(keyName) > 0); + commandsExecuted++; + } + } + } + @Test public void testWithPool() { @@ -49,6 +84,11 @@ public void testWithPool() { client.set(keyName, "0"); assertEquals("0", client.get(keyName)); + // Start thread that imitates an application that uses the client + FakeApp fakeApp = new FakeApp(client, keyName); + Thread t = new Thread(fakeApp); + t.start(); + HashMap params = new HashMap<>(); params.put("bdb_id", endpoint.getBdbId()); @@ -62,23 +102,16 @@ public void testWithPool() { } log.info("Action id: {}", actionResponse.getActionId()); + fakeApp.setAction(actionResponse); - int checkEachSeconds = 5; - int keepExecutingForSeconds = 60; - int timeoutSeconds = 120; - long commandsExecuted = 0; - - while (!actionResponse.isCompleted(Duration.ofSeconds(checkEachSeconds), - Duration.ofSeconds(keepExecutingForSeconds), Duration.ofSeconds(timeoutSeconds))) { - assertTrue (client.incr(keyName) > 0); - commandsExecuted++; + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - log.info("Commands executed: {}", commandsExecuted); - log.info("Test took {} seconds", - Duration.between(actionResponse.getFirstRequestAt(), Instant.now()).getSeconds()); - - assertEquals(commandsExecuted, Long.parseLong(client.get(keyName))); + log.info("Commands executed: {}", fakeApp.getExecutedCommandsCount()); + assertEquals(fakeApp.getExecutedCommandsCount(), Long.parseLong(client.get(keyName))); client.close(); } From 3f51f8c5eccebc3f5ceda43010d634708591dac0 Mon Sep 17 00:00:00 2001 From: Igor Malinovskiy Date: Thu, 6 Jun 2024 20:13:43 +0200 Subject: [PATCH 3/6] More tests --- .../jedis/scenario/DMCRestartTest.java | 119 ------------------ .../jedis/scenario/FaultInjectionClient.java | 21 +++- .../clients/jedis/scenario/RetryingTest.java | 86 +++++++++++++ 3 files changed, 106 insertions(+), 120 deletions(-) delete mode 100644 src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java create mode 100644 src/test/java/redis/clients/jedis/scenario/RetryingTest.java diff --git a/src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java b/src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java deleted file mode 100644 index 4c2894d61b..0000000000 --- a/src/test/java/redis/clients/jedis/scenario/DMCRestartTest.java +++ /dev/null @@ -1,119 +0,0 @@ -package redis.clients.jedis.scenario; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import redis.clients.jedis.*; -import redis.clients.jedis.misc.AutomaticFailoverTest; -import redis.clients.jedis.providers.ConnectionProvider; -import redis.clients.jedis.providers.PooledConnectionProvider; - -import java.io.IOException; -import java.time.Duration; -import java.util.HashMap; - -import static org.junit.Assert.*; - -public class DMCRestartTest { - - private static final Logger log = LoggerFactory.getLogger(AutomaticFailoverTest.class); - - private final EndpointConfig endpoint = HostAndPorts.getRedisEndpoint("re-standalone"); - - private final FaultInjectionClient faultClient = new FaultInjectionClient(); - - public static class FakeApp implements Runnable { - - private FaultInjectionClient.TriggerActionResponse actionResponse = null; - private final UnifiedJedis client; - private long commandsExecuted = 0; - private final String keyName; - - public FakeApp(UnifiedJedis client, String keyName) { - this.client = client; - this.keyName = keyName; - } - - public void setAction(FaultInjectionClient.TriggerActionResponse actionResponse) { - this.actionResponse = actionResponse; - } - - public long getExecutedCommandsCount() { - return commandsExecuted; - } - - public void run() { - log.info("Starting FakeApp"); - - int checkEachSeconds = 5; - int keepExecutingForSeconds = 60; - int timeoutSeconds = 120; - - while (actionResponse == null || !actionResponse.isCompleted( - Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds), - Duration.ofSeconds(timeoutSeconds))) { - assertTrue(client.incr(keyName) > 0); - commandsExecuted++; - } - } - } - - @Test - public void testWithPool() { - - // Validate recommended configuration - ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); - poolConfig.setMaxTotal(8); - poolConfig.setMaxIdle(8); - poolConfig.setMinIdle(0); - poolConfig.setBlockWhenExhausted(true); - poolConfig.setMaxWait(Duration.ofSeconds(1)); - poolConfig.setTestWhileIdle(true); - poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(1)); - - // Retry settings - int maxAttempts = 3; - Duration maxTotalRetriesDuration = Duration.ofSeconds(10); - - ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(), - endpoint.getClientConfigBuilder().build(), new ConnectionPoolConfig()); - - UnifiedJedis client = new UnifiedJedis(connectionProvider, maxAttempts, - maxTotalRetriesDuration); - String keyName = "counter"; - client.set(keyName, "0"); - assertEquals("0", client.get(keyName)); - - // Start thread that imitates an application that uses the client - FakeApp fakeApp = new FakeApp(client, keyName); - Thread t = new Thread(fakeApp); - t.start(); - - HashMap params = new HashMap<>(); - params.put("bdb_id", endpoint.getBdbId()); - - FaultInjectionClient.TriggerActionResponse actionResponse = null; - - try { - log.info("Triggering DMC restart"); - actionResponse = faultClient.triggerAction("dmc_restart", params); - } catch (IOException e) { - fail("Fault Injection Server error:" + e.getMessage()); - } - - log.info("Action id: {}", actionResponse.getActionId()); - fakeApp.setAction(actionResponse); - - try { - t.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - log.info("Commands executed: {}", fakeApp.getExecutedCommandsCount()); - assertEquals(fakeApp.getExecutedCommandsCount(), Long.parseLong(client.get(keyName))); - - client.close(); - } - -} diff --git a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java index fc824cd0a9..306e4786d5 100644 --- a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java +++ b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java @@ -4,19 +4,27 @@ import java.time.Duration; import java.time.Instant; import java.util.HashMap; +import java.util.concurrent.TimeUnit; import com.google.gson.FieldNamingPolicy; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; +import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.fluent.Request; import com.google.gson.Gson; import org.apache.hc.client5.http.fluent.Response; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.core5.http.ContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FaultInjectionClient { private static final String BASE_URL = "http://127.0.0.1:20324"; + private static final Logger log = LoggerFactory.getLogger(FaultInjectionClient.class); + public static class TriggerActionResponse { private final String actionId; @@ -56,11 +64,22 @@ public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration firstRequestAt = lastRequestTime; } + RequestConfig requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(5000, TimeUnit.MILLISECONDS) + .setResponseTimeout(5000, TimeUnit.MILLISECONDS).build(); + + CloseableHttpClient httpClient = HttpClientBuilder.create() + .setDefaultRequestConfig(requestConfig) + .build(); + Request request = Request.get(BASE_URL + "/action/" + actionId); + try { - Response response = request.execute(); + Response response = request.execute(httpClient); String result = response.returnContent().asString(); + log.info("Action status: {}", result); + if (result.contains("success")) { completedAt = Instant.now(); return Duration.between(completedAt, Instant.now()).compareTo(delayAfter) >= 0; diff --git a/src/test/java/redis/clients/jedis/scenario/RetryingTest.java b/src/test/java/redis/clients/jedis/scenario/RetryingTest.java new file mode 100644 index 0000000000..0bedcadf71 --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/RetryingTest.java @@ -0,0 +1,86 @@ +package redis.clients.jedis.scenario; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.*; +import redis.clients.jedis.providers.ConnectionProvider; +import redis.clients.jedis.providers.PooledConnectionProvider; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class RetryingTest { + + private static final Logger log = LoggerFactory.getLogger(RetryingTest.class); + + private final EndpointConfig endpoint = HostAndPorts.getRedisEndpoint("re-standalone"); + + private final FaultInjectionClient faultClient = new FaultInjectionClient(); + + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(/*"dmc_restart",*/ "network_failure"); + } + + @Parameterized.Parameter + public String triggerAction; + + @Test + public void testWithPool() { + ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(), + endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig); + + UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES, + RecommendedSettings.MAX_TOTAL_RETRIES_DURATION); + String keyName = "counter"; + client.set(keyName, "0"); + assertEquals("0", client.get(keyName)); + + AtomicLong commandsExecuted = new AtomicLong(); + + // Start thread that imitates an application that uses the client + FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> { + assertTrue(client.incr(keyName) > 0); + commandsExecuted.getAndIncrement(); + return true; + }); + Thread t = new Thread(fakeApp); + t.start(); + + HashMap params = new HashMap<>(); + params.put("bdb_id", endpoint.getBdbId()); + + FaultInjectionClient.TriggerActionResponse actionResponse = null; + + try { + log.info("Triggering {}", triggerAction); + actionResponse = faultClient.triggerAction(triggerAction, params); + } catch (IOException e) { + fail("Fault Injection Server error:" + e.getMessage()); + } + + log.info("Action id: {}", actionResponse.getActionId()); + fakeApp.setAction(actionResponse); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + log.info("Commands executed: {}", commandsExecuted.get()); + assertEquals(commandsExecuted.get(), Long.parseLong(client.get(keyName))); + assertTrue(fakeApp.capturedExceptions().isEmpty()); + + client.close(); + } + +} From 4040368c9a756816bd668b9484b463dc6cec2725 Mon Sep 17 00:00:00 2001 From: Igor Malinovskiy Date: Thu, 6 Jun 2024 20:17:41 +0200 Subject: [PATCH 4/6] Add missing files --- .../scenario/ClusterTopologyRefreshTest.java | 89 +++++++++++++++++++ .../redis/clients/jedis/scenario/FakeApp.java | 63 +++++++++++++ .../jedis/scenario/RecommendedSettings.java | 31 +++++++ 3 files changed, 183 insertions(+) create mode 100644 src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java create mode 100644 src/test/java/redis/clients/jedis/scenario/FakeApp.java create mode 100644 src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java diff --git a/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java b/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java new file mode 100644 index 0000000000..59e3f9121c --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java @@ -0,0 +1,89 @@ +package redis.clients.jedis.scenario; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.*; +import redis.clients.jedis.providers.ClusterConnectionProvider; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class ClusterTopologyRefreshTest { + + private static final Logger log = LoggerFactory.getLogger(ClusterTopologyRefreshTest.class); + + private final EndpointConfig endpoint = HostAndPorts.getRedisEndpoint( + "re-single-shard-oss-cluster"); + + private final FaultInjectionClient faultClient = new FaultInjectionClient(); + + @Test + public void testWithPool() { + Set jedisClusterNode = new HashSet<>(); + jedisClusterNode.add(endpoint.getHostAndPort()); + + JedisClientConfig config = endpoint.getClientConfigBuilder() + .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) + .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); + + + ClusterConnectionProvider provider = new ClusterConnectionProvider(jedisClusterNode, config, RecommendedSettings.poolConfig); + ClusterConnectionProvider spyProvider = spy(provider); + + try (JedisCluster client = new JedisCluster(spyProvider, + RecommendedSettings.MAX_RETRIES, RecommendedSettings.MAX_TOTAL_RETRIES_DURATION)) { + assertEquals(1, client.getClusterNodes().size()); + + AtomicLong commandsExecuted = new AtomicLong(); + + // Start thread that imitates an application that uses the client + FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> { + long i = commandsExecuted.getAndIncrement(); + client.set(String.valueOf(i), String.valueOf(i)); + return true; + }); + + Thread t = new Thread(fakeApp); + t.start(); + + HashMap params = new HashMap<>(); + params.put("bdb_id", endpoint.getBdbId()); + params.put("actions", "[\"reshard\",\"failover\"]"); + + FaultInjectionClient.TriggerActionResponse actionResponse = null; + + try { + log.info("Triggering Resharding and Failover"); + actionResponse = faultClient.triggerAction("sequence_of_actions", params); + } catch (IOException e) { + fail("Fault Injection Server error:" + e.getMessage()); + } + + log.info("Action id: {}", actionResponse.getActionId()); + fakeApp.setAction(actionResponse); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertTrue(fakeApp.capturedExceptions().isEmpty()); + + log.info("Commands executed: {}", commandsExecuted.get()); + for (long i = 0; i < commandsExecuted.get(); i++) { + assertTrue(client.exists(String.valueOf(i))); + } + + verify(spyProvider, atLeast(2)).renewSlotCache(any(Connection.class)); + } + } + +} diff --git a/src/test/java/redis/clients/jedis/scenario/FakeApp.java b/src/test/java/redis/clients/jedis/scenario/FakeApp.java new file mode 100644 index 0000000000..fedc721629 --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/FakeApp.java @@ -0,0 +1,63 @@ +package redis.clients.jedis.scenario; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.UnifiedJedis; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class FakeApp implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(FakeApp.class); + + private FaultInjectionClient.TriggerActionResponse actionResponse = null; + private final UnifiedJedis client; + private final ExecutedAction action; + private List exceptions = new ArrayList<>(); + + @FunctionalInterface + public interface ExecutedAction { + boolean run(UnifiedJedis client); + } + + public FakeApp(UnifiedJedis client, ExecutedAction action) { + this.client = client; + this.action = action; + } + + public void setAction(FaultInjectionClient.TriggerActionResponse actionResponse) { + this.actionResponse = actionResponse; + } + + public List capturedExceptions() { + return exceptions; + } + + public void run() { + log.info("Starting FakeApp"); + + int checkEachSeconds = 5; + int keepExecutingForSeconds = 60; + int timeoutSeconds = 120; + + while (actionResponse == null || !actionResponse.isCompleted( + Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds), + Duration.ofSeconds(timeoutSeconds))) { + try { + boolean success = action.run(client); + + if (!success) + break; + } catch (JedisConnectionException e) { + log.error("Error executing action", e); + exceptions.add(e); + } + } + } +} diff --git a/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java b/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java new file mode 100644 index 0000000000..b5040aba08 --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java @@ -0,0 +1,31 @@ +package redis.clients.jedis.scenario; + +import redis.clients.jedis.ConnectionPoolConfig; + +import java.time.Duration; + +public class RecommendedSettings { + + public static ConnectionPoolConfig poolConfig; + + static { + poolConfig = new ConnectionPoolConfig(); + ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); + poolConfig.setMaxTotal(8); + poolConfig.setMaxIdle(8); + poolConfig.setMinIdle(0); + poolConfig.setBlockWhenExhausted(true); + poolConfig.setMaxWait(Duration.ofSeconds(1)); + poolConfig.setTestWhileIdle(true); + poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(1)); + } + + public static int MAX_RETRIES = 10; + + public static Duration MAX_TOTAL_RETRIES_DURATION = Duration.ofSeconds(60); + + public static int DEFAULT_TIMEOUT_MS = 2000; + + + +} From 3de941e151eab9c7f050dd3d887f9302feb9866f Mon Sep 17 00:00:00 2001 From: Igor Malinovskiy Date: Tue, 11 Jun 2024 17:21:16 +0200 Subject: [PATCH 5/6] Clean up scenario tests --- .../scenario/ClusterTopologyRefreshTest.java | 17 +- .../scenario/ConnectionInterruptionTest.java | 182 ++++++++++++++++++ .../redis/clients/jedis/scenario/FakeApp.java | 14 +- .../jedis/scenario/FaultInjectionClient.java | 20 +- .../jedis/scenario/RecommendedSettings.java | 6 +- .../clients/jedis/scenario/RetryingTest.java | 86 --------- 6 files changed, 219 insertions(+), 106 deletions(-) create mode 100644 src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java delete mode 100644 src/test/java/redis/clients/jedis/scenario/RetryingTest.java diff --git a/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java b/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java index 59e3f9121c..e4fc9a8b0a 100644 --- a/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java +++ b/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java @@ -1,5 +1,6 @@ package redis.clients.jedis.scenario; +import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,11 +20,20 @@ public class ClusterTopologyRefreshTest { private static final Logger log = LoggerFactory.getLogger(ClusterTopologyRefreshTest.class); - private final EndpointConfig endpoint = HostAndPorts.getRedisEndpoint( - "re-single-shard-oss-cluster"); + private static EndpointConfig endpoint; private final FaultInjectionClient faultClient = new FaultInjectionClient(); + @BeforeClass + public static void beforeClass() { + try { + ClusterTopologyRefreshTest.endpoint = HostAndPorts.getRedisEndpoint("re-single-shard-oss-cluster"); + } catch (IllegalArgumentException e) { + log.warn("Skipping test because no Redis endpoint is configured"); + org.junit.Assume.assumeTrue(false); + } + } + @Test public void testWithPool() { Set jedisClusterNode = new HashSet<>(); @@ -39,7 +49,8 @@ public void testWithPool() { try (JedisCluster client = new JedisCluster(spyProvider, RecommendedSettings.MAX_RETRIES, RecommendedSettings.MAX_TOTAL_RETRIES_DURATION)) { - assertEquals(1, client.getClusterNodes().size()); + assertEquals("Was this BDB used to run this test before?", 1, + client.getClusterNodes().size()); AtomicLong commandsExecuted = new AtomicLong(); diff --git a/src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java b/src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java new file mode 100644 index 0000000000..ee584931ca --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java @@ -0,0 +1,182 @@ +package redis.clients.jedis.scenario; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.*; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.providers.ConnectionProvider; +import redis.clients.jedis.providers.PooledConnectionProvider; +import redis.clients.jedis.util.SafeEncoder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class ConnectionInterruptionTest { + + private static final Logger log = LoggerFactory.getLogger(ConnectionInterruptionTest.class); + + private static EndpointConfig endpoint; + + private final FaultInjectionClient faultClient = new FaultInjectionClient(); + + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList("dmc_restart", "network_failure"); + } + + @Parameterized.Parameter + public String triggerAction; + + @BeforeClass + public static void beforeClass() { + try { + ConnectionInterruptionTest.endpoint = HostAndPorts.getRedisEndpoint("re-standalone"); + } catch (IllegalArgumentException e) { + log.warn("Skipping test because no Redis endpoint is configured"); + org.junit.Assume.assumeTrue(false); + } + } + + @Test + public void testWithPool() { + ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(), + endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig); + + UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES, + RecommendedSettings.MAX_TOTAL_RETRIES_DURATION); + String keyName = "counter"; + client.set(keyName, "0"); + assertEquals("0", client.get(keyName)); + + AtomicLong commandsExecuted = new AtomicLong(); + + // Start thread that imitates an application that uses the client + FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> { + assertTrue(client.incr(keyName) > 0); + long currentCount = commandsExecuted.getAndIncrement(); + log.info("Command executed {}", currentCount); + return true; + }); + fakeApp.setKeepExecutingForSeconds(RecommendedSettings.DEFAULT_TIMEOUT_MS/1000 * 2); + Thread t = new Thread(fakeApp); + t.start(); + + HashMap params = new HashMap<>(); + params.put("bdb_id", endpoint.getBdbId()); + + FaultInjectionClient.TriggerActionResponse actionResponse = null; + + try { + log.info("Triggering {}", triggerAction); + actionResponse = faultClient.triggerAction(triggerAction, params); + } catch (IOException e) { + fail("Fault Injection Server error:" + e.getMessage()); + } + + log.info("Action id: {}", actionResponse.getActionId()); + fakeApp.setAction(actionResponse); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + log.info("Commands executed: {}", commandsExecuted.get()); + assertEquals(commandsExecuted.get(), Long.parseLong(client.get(keyName))); + assertTrue(fakeApp.capturedExceptions().isEmpty()); + + client.close(); + } + + @Test + public void testWithPubSub() { + ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(), + endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig); + + UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES, + RecommendedSettings.MAX_TOTAL_RETRIES_DURATION); + + AtomicLong messagesSent = new AtomicLong(); + AtomicLong messagesReceived = new AtomicLong(); + + final Thread subscriberThread = getSubscriberThread(messagesReceived, connectionProvider); + + // Start thread that imitates a publisher that uses the client + FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> { + log.info("Publishing message"); + long consumed = client.publish("test", String.valueOf(messagesSent.getAndIncrement())); + return consumed > 0; + }); + fakeApp.setKeepExecutingForSeconds(10); + Thread t = new Thread(fakeApp); + t.start(); + + HashMap params = new HashMap<>(); + params.put("bdb_id", endpoint.getBdbId()); + + FaultInjectionClient.TriggerActionResponse actionResponse = null; + + try { + log.info("Triggering {}", triggerAction); + actionResponse = faultClient.triggerAction(triggerAction, params); + } catch (IOException e) { + fail("Fault Injection Server error:" + e.getMessage()); + } + + log.info("Action id: {}", actionResponse.getActionId()); + fakeApp.setAction(actionResponse); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + if (subscriberThread.isAlive()) + subscriberThread.interrupt(); + + assertEquals(messagesSent.get() - 1, messagesReceived.get()); + assertTrue(fakeApp.capturedExceptions().isEmpty()); + + client.close(); + } + + private static Thread getSubscriberThread(AtomicLong messagesReceived, + ConnectionProvider connectionProvider) { + final JedisPubSubBase pubSub = new JedisPubSubBase() { + + @Override + public void onMessage(String channel, String message) { + messagesReceived.incrementAndGet(); + log.info("Received message: {}", message); + } + + @Override + protected String encode(byte[] raw) { + return SafeEncoder.encode(raw); + } + }; + + final Thread subscriberThread = new Thread(() -> { + try { + pubSub.proceed(connectionProvider.getConnection(), "test"); + fail("PubSub should have been interrupted"); + } catch (JedisConnectionException e) { + log.info("Expected exception in Subscriber: {}", e.getMessage()); + assertTrue(e.getMessage().contains("Unexpected end of stream.")); + } + }); + subscriberThread.start(); + return subscriberThread; + } +} diff --git a/src/test/java/redis/clients/jedis/scenario/FakeApp.java b/src/test/java/redis/clients/jedis/scenario/FakeApp.java index fedc721629..7e505862a2 100644 --- a/src/test/java/redis/clients/jedis/scenario/FakeApp.java +++ b/src/test/java/redis/clients/jedis/scenario/FakeApp.java @@ -10,12 +10,16 @@ import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.assertTrue; - public class FakeApp implements Runnable { private static final Logger log = LoggerFactory.getLogger(FakeApp.class); + public void setKeepExecutingForSeconds(int keepExecutingForSeconds) { + this.keepExecutingForSeconds = keepExecutingForSeconds; + } + + private int keepExecutingForSeconds = 60; + private FaultInjectionClient.TriggerActionResponse actionResponse = null; private final UnifiedJedis client; private final ExecutedAction action; @@ -23,7 +27,7 @@ public class FakeApp implements Runnable { @FunctionalInterface public interface ExecutedAction { - boolean run(UnifiedJedis client); + boolean run(UnifiedJedis client); } public FakeApp(UnifiedJedis client, ExecutedAction action) { @@ -43,7 +47,6 @@ public void run() { log.info("Starting FakeApp"); int checkEachSeconds = 5; - int keepExecutingForSeconds = 60; int timeoutSeconds = 120; while (actionResponse == null || !actionResponse.isCompleted( @@ -52,8 +55,7 @@ public void run() { try { boolean success = action.run(client); - if (!success) - break; + if (!success) break; } catch (JedisConnectionException e) { log.error("Error executing action", e); exceptions.add(e); diff --git a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java index 306e4786d5..f6f13ffe16 100644 --- a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java +++ b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java @@ -64,13 +64,7 @@ public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration firstRequestAt = lastRequestTime; } - RequestConfig requestConfig = RequestConfig.custom() - .setConnectionRequestTimeout(5000, TimeUnit.MILLISECONDS) - .setResponseTimeout(5000, TimeUnit.MILLISECONDS).build(); - - CloseableHttpClient httpClient = HttpClientBuilder.create() - .setDefaultRequestConfig(requestConfig) - .build(); + CloseableHttpClient httpClient = getHttpClient(); Request request = Request.get(BASE_URL + "/action/" + actionId); @@ -93,6 +87,15 @@ public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration } } + private static CloseableHttpClient getHttpClient() { + RequestConfig requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(5000, TimeUnit.MILLISECONDS) + .setResponseTimeout(5000, TimeUnit.MILLISECONDS).build(); + + return HttpClientBuilder.create() + .setDefaultRequestConfig(requestConfig).build(); + } + public TriggerActionResponse triggerAction(String actionType, HashMap parameters) throws IOException { Gson gson = new GsonBuilder().setFieldNamingPolicy( @@ -104,11 +107,12 @@ public TriggerActionResponse triggerAction(String actionType, HashMap() { }.getType()); } catch (IOException e) { diff --git a/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java b/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java index b5040aba08..aa1671ad9e 100644 --- a/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java +++ b/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java @@ -20,11 +20,11 @@ public class RecommendedSettings { poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(1)); } - public static int MAX_RETRIES = 10; + public static int MAX_RETRIES = 5; - public static Duration MAX_TOTAL_RETRIES_DURATION = Duration.ofSeconds(60); + public static Duration MAX_TOTAL_RETRIES_DURATION = Duration.ofSeconds(10); - public static int DEFAULT_TIMEOUT_MS = 2000; + public static int DEFAULT_TIMEOUT_MS = 5000; diff --git a/src/test/java/redis/clients/jedis/scenario/RetryingTest.java b/src/test/java/redis/clients/jedis/scenario/RetryingTest.java deleted file mode 100644 index 0bedcadf71..0000000000 --- a/src/test/java/redis/clients/jedis/scenario/RetryingTest.java +++ /dev/null @@ -1,86 +0,0 @@ -package redis.clients.jedis.scenario; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import redis.clients.jedis.*; -import redis.clients.jedis.providers.ConnectionProvider; -import redis.clients.jedis.providers.PooledConnectionProvider; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.concurrent.atomic.AtomicLong; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class RetryingTest { - - private static final Logger log = LoggerFactory.getLogger(RetryingTest.class); - - private final EndpointConfig endpoint = HostAndPorts.getRedisEndpoint("re-standalone"); - - private final FaultInjectionClient faultClient = new FaultInjectionClient(); - - @Parameterized.Parameters - public static Iterable data() { - return Arrays.asList(/*"dmc_restart",*/ "network_failure"); - } - - @Parameterized.Parameter - public String triggerAction; - - @Test - public void testWithPool() { - ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(), - endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig); - - UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES, - RecommendedSettings.MAX_TOTAL_RETRIES_DURATION); - String keyName = "counter"; - client.set(keyName, "0"); - assertEquals("0", client.get(keyName)); - - AtomicLong commandsExecuted = new AtomicLong(); - - // Start thread that imitates an application that uses the client - FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> { - assertTrue(client.incr(keyName) > 0); - commandsExecuted.getAndIncrement(); - return true; - }); - Thread t = new Thread(fakeApp); - t.start(); - - HashMap params = new HashMap<>(); - params.put("bdb_id", endpoint.getBdbId()); - - FaultInjectionClient.TriggerActionResponse actionResponse = null; - - try { - log.info("Triggering {}", triggerAction); - actionResponse = faultClient.triggerAction(triggerAction, params); - } catch (IOException e) { - fail("Fault Injection Server error:" + e.getMessage()); - } - - log.info("Action id: {}", actionResponse.getActionId()); - fakeApp.setAction(actionResponse); - - try { - t.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - log.info("Commands executed: {}", commandsExecuted.get()); - assertEquals(commandsExecuted.get(), Long.parseLong(client.get(keyName))); - assertTrue(fakeApp.capturedExceptions().isEmpty()); - - client.close(); - } - -} From e9bca010948ec24e6cfbe8eb7f8e9671367d8c04 Mon Sep 17 00:00:00 2001 From: Igor Malinovskiy Date: Fri, 12 Jul 2024 14:11:14 +0200 Subject: [PATCH 6/6] Address review suggestions --- pom.xml | 1 + .../clients/jedis/scenario/FaultInjectionClient.java | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 0dc25f38ca..2799dc0465 100644 --- a/pom.xml +++ b/pom.xml @@ -137,6 +137,7 @@ org.apache.httpcomponents.client5 httpclient5-fluent 5.3.1 + test diff --git a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java index f6f13ffe16..c4e1c5717b 100644 --- a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java +++ b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java @@ -21,7 +21,11 @@ public class FaultInjectionClient { - private static final String BASE_URL = "http://127.0.0.1:20324"; + private static final String BASE_URL; + + static { + BASE_URL = System.getenv().getOrDefault("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324"); + } private static final Logger log = LoggerFactory.getLogger(FaultInjectionClient.class); @@ -42,10 +46,6 @@ public String getActionId() { return actionId; } - public Instant getFirstRequestAt() { - return completedAt; - } - public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration timeout) { if (completedAt != null) { return Duration.between(completedAt, Instant.now()).compareTo(delayAfter) >= 0;