From fc603d261d542859936125348f67537b46e9dba0 Mon Sep 17 00:00:00 2001 From: Igor Malinovskiy Date: Wed, 17 Jul 2024 14:43:35 +0200 Subject: [PATCH] Add Scenario tests (#3847) * Add POC DMC restart test * Cleanup * More tests * Add missing files * Clean up scenario tests * Address review suggestions --- pom.xml | 6 + .../redis/clients/jedis/EndpointConfig.java | 6 +- .../scenario/ClusterTopologyRefreshTest.java | 100 ++++++++++ .../scenario/ConnectionInterruptionTest.java | 182 ++++++++++++++++++ .../redis/clients/jedis/scenario/FakeApp.java | 65 +++++++ .../jedis/scenario/FaultInjectionClient.java | 124 ++++++++++++ .../jedis/scenario/RecommendedSettings.java | 31 +++ 7 files changed, 513 insertions(+), 1 deletion(-) create mode 100644 src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java create mode 100644 src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java create mode 100644 src/test/java/redis/clients/jedis/scenario/FakeApp.java create mode 100644 src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java create mode 100644 src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java diff --git a/pom.xml b/pom.xml index cee45b01e9..fef8874ecd 100644 --- a/pom.xml +++ b/pom.xml @@ -133,6 +133,12 @@ 2.38.0 test + + org.apache.httpcomponents.client5 + httpclient5-fluent + 5.3.1 + test + diff --git a/src/test/java/redis/clients/jedis/EndpointConfig.java b/src/test/java/redis/clients/jedis/EndpointConfig.java index 5cb322224d..42a44a3c47 100644 --- a/src/test/java/redis/clients/jedis/EndpointConfig.java +++ b/src/test/java/redis/clients/jedis/EndpointConfig.java @@ -1,6 +1,8 @@ package redis.clients.jedis; +import com.google.gson.FieldNamingPolicy; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; import redis.clients.jedis.util.JedisURIHelper; @@ -118,7 +120,9 @@ protected String getURISchema(boolean tls) { } public static HashMap loadFromJSON(String filePath) throws Exception { - Gson gson = new Gson(); + Gson gson = new GsonBuilder().setFieldNamingPolicy( + FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); + HashMap configs; try (FileReader reader = new FileReader(filePath)) { configs = gson.fromJson(reader, new TypeToken>() { diff --git a/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java b/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java new file mode 100644 index 0000000000..e4fc9a8b0a --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/ClusterTopologyRefreshTest.java @@ -0,0 +1,100 @@ +package redis.clients.jedis.scenario; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.*; +import redis.clients.jedis.providers.ClusterConnectionProvider; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class ClusterTopologyRefreshTest { + + private static final Logger log = LoggerFactory.getLogger(ClusterTopologyRefreshTest.class); + + private static EndpointConfig endpoint; + + private final FaultInjectionClient faultClient = new FaultInjectionClient(); + + @BeforeClass + public static void beforeClass() { + try { + ClusterTopologyRefreshTest.endpoint = HostAndPorts.getRedisEndpoint("re-single-shard-oss-cluster"); + } catch (IllegalArgumentException e) { + log.warn("Skipping test because no Redis endpoint is configured"); + org.junit.Assume.assumeTrue(false); + } + } + + @Test + public void testWithPool() { + Set jedisClusterNode = new HashSet<>(); + jedisClusterNode.add(endpoint.getHostAndPort()); + + JedisClientConfig config = endpoint.getClientConfigBuilder() + .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) + .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); + + + ClusterConnectionProvider provider = new ClusterConnectionProvider(jedisClusterNode, config, RecommendedSettings.poolConfig); + ClusterConnectionProvider spyProvider = spy(provider); + + try (JedisCluster client = new JedisCluster(spyProvider, + RecommendedSettings.MAX_RETRIES, RecommendedSettings.MAX_TOTAL_RETRIES_DURATION)) { + assertEquals("Was this BDB used to run this test before?", 1, + client.getClusterNodes().size()); + + AtomicLong commandsExecuted = new AtomicLong(); + + // Start thread that imitates an application that uses the client + FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> { + long i = commandsExecuted.getAndIncrement(); + client.set(String.valueOf(i), String.valueOf(i)); + return true; + }); + + Thread t = new Thread(fakeApp); + t.start(); + + HashMap params = new HashMap<>(); + params.put("bdb_id", endpoint.getBdbId()); + params.put("actions", "[\"reshard\",\"failover\"]"); + + FaultInjectionClient.TriggerActionResponse actionResponse = null; + + try { + log.info("Triggering Resharding and Failover"); + actionResponse = faultClient.triggerAction("sequence_of_actions", params); + } catch (IOException e) { + fail("Fault Injection Server error:" + e.getMessage()); + } + + log.info("Action id: {}", actionResponse.getActionId()); + fakeApp.setAction(actionResponse); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertTrue(fakeApp.capturedExceptions().isEmpty()); + + log.info("Commands executed: {}", commandsExecuted.get()); + for (long i = 0; i < commandsExecuted.get(); i++) { + assertTrue(client.exists(String.valueOf(i))); + } + + verify(spyProvider, atLeast(2)).renewSlotCache(any(Connection.class)); + } + } + +} diff --git a/src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java b/src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java new file mode 100644 index 0000000000..ee584931ca --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/ConnectionInterruptionTest.java @@ -0,0 +1,182 @@ +package redis.clients.jedis.scenario; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.*; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.providers.ConnectionProvider; +import redis.clients.jedis.providers.PooledConnectionProvider; +import redis.clients.jedis.util.SafeEncoder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class ConnectionInterruptionTest { + + private static final Logger log = LoggerFactory.getLogger(ConnectionInterruptionTest.class); + + private static EndpointConfig endpoint; + + private final FaultInjectionClient faultClient = new FaultInjectionClient(); + + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList("dmc_restart", "network_failure"); + } + + @Parameterized.Parameter + public String triggerAction; + + @BeforeClass + public static void beforeClass() { + try { + ConnectionInterruptionTest.endpoint = HostAndPorts.getRedisEndpoint("re-standalone"); + } catch (IllegalArgumentException e) { + log.warn("Skipping test because no Redis endpoint is configured"); + org.junit.Assume.assumeTrue(false); + } + } + + @Test + public void testWithPool() { + ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(), + endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig); + + UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES, + RecommendedSettings.MAX_TOTAL_RETRIES_DURATION); + String keyName = "counter"; + client.set(keyName, "0"); + assertEquals("0", client.get(keyName)); + + AtomicLong commandsExecuted = new AtomicLong(); + + // Start thread that imitates an application that uses the client + FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> { + assertTrue(client.incr(keyName) > 0); + long currentCount = commandsExecuted.getAndIncrement(); + log.info("Command executed {}", currentCount); + return true; + }); + fakeApp.setKeepExecutingForSeconds(RecommendedSettings.DEFAULT_TIMEOUT_MS/1000 * 2); + Thread t = new Thread(fakeApp); + t.start(); + + HashMap params = new HashMap<>(); + params.put("bdb_id", endpoint.getBdbId()); + + FaultInjectionClient.TriggerActionResponse actionResponse = null; + + try { + log.info("Triggering {}", triggerAction); + actionResponse = faultClient.triggerAction(triggerAction, params); + } catch (IOException e) { + fail("Fault Injection Server error:" + e.getMessage()); + } + + log.info("Action id: {}", actionResponse.getActionId()); + fakeApp.setAction(actionResponse); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + log.info("Commands executed: {}", commandsExecuted.get()); + assertEquals(commandsExecuted.get(), Long.parseLong(client.get(keyName))); + assertTrue(fakeApp.capturedExceptions().isEmpty()); + + client.close(); + } + + @Test + public void testWithPubSub() { + ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(), + endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig); + + UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES, + RecommendedSettings.MAX_TOTAL_RETRIES_DURATION); + + AtomicLong messagesSent = new AtomicLong(); + AtomicLong messagesReceived = new AtomicLong(); + + final Thread subscriberThread = getSubscriberThread(messagesReceived, connectionProvider); + + // Start thread that imitates a publisher that uses the client + FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> { + log.info("Publishing message"); + long consumed = client.publish("test", String.valueOf(messagesSent.getAndIncrement())); + return consumed > 0; + }); + fakeApp.setKeepExecutingForSeconds(10); + Thread t = new Thread(fakeApp); + t.start(); + + HashMap params = new HashMap<>(); + params.put("bdb_id", endpoint.getBdbId()); + + FaultInjectionClient.TriggerActionResponse actionResponse = null; + + try { + log.info("Triggering {}", triggerAction); + actionResponse = faultClient.triggerAction(triggerAction, params); + } catch (IOException e) { + fail("Fault Injection Server error:" + e.getMessage()); + } + + log.info("Action id: {}", actionResponse.getActionId()); + fakeApp.setAction(actionResponse); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + if (subscriberThread.isAlive()) + subscriberThread.interrupt(); + + assertEquals(messagesSent.get() - 1, messagesReceived.get()); + assertTrue(fakeApp.capturedExceptions().isEmpty()); + + client.close(); + } + + private static Thread getSubscriberThread(AtomicLong messagesReceived, + ConnectionProvider connectionProvider) { + final JedisPubSubBase pubSub = new JedisPubSubBase() { + + @Override + public void onMessage(String channel, String message) { + messagesReceived.incrementAndGet(); + log.info("Received message: {}", message); + } + + @Override + protected String encode(byte[] raw) { + return SafeEncoder.encode(raw); + } + }; + + final Thread subscriberThread = new Thread(() -> { + try { + pubSub.proceed(connectionProvider.getConnection(), "test"); + fail("PubSub should have been interrupted"); + } catch (JedisConnectionException e) { + log.info("Expected exception in Subscriber: {}", e.getMessage()); + assertTrue(e.getMessage().contains("Unexpected end of stream.")); + } + }); + subscriberThread.start(); + return subscriberThread; + } +} diff --git a/src/test/java/redis/clients/jedis/scenario/FakeApp.java b/src/test/java/redis/clients/jedis/scenario/FakeApp.java new file mode 100644 index 0000000000..7e505862a2 --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/FakeApp.java @@ -0,0 +1,65 @@ +package redis.clients.jedis.scenario; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.UnifiedJedis; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +public class FakeApp implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(FakeApp.class); + + public void setKeepExecutingForSeconds(int keepExecutingForSeconds) { + this.keepExecutingForSeconds = keepExecutingForSeconds; + } + + private int keepExecutingForSeconds = 60; + + private FaultInjectionClient.TriggerActionResponse actionResponse = null; + private final UnifiedJedis client; + private final ExecutedAction action; + private List exceptions = new ArrayList<>(); + + @FunctionalInterface + public interface ExecutedAction { + boolean run(UnifiedJedis client); + } + + public FakeApp(UnifiedJedis client, ExecutedAction action) { + this.client = client; + this.action = action; + } + + public void setAction(FaultInjectionClient.TriggerActionResponse actionResponse) { + this.actionResponse = actionResponse; + } + + public List capturedExceptions() { + return exceptions; + } + + public void run() { + log.info("Starting FakeApp"); + + int checkEachSeconds = 5; + int timeoutSeconds = 120; + + while (actionResponse == null || !actionResponse.isCompleted( + Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds), + Duration.ofSeconds(timeoutSeconds))) { + try { + boolean success = action.run(client); + + if (!success) break; + } catch (JedisConnectionException e) { + log.error("Error executing action", e); + exceptions.add(e); + } + } + } +} diff --git a/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java new file mode 100644 index 0000000000..c4e1c5717b --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/FaultInjectionClient.java @@ -0,0 +1,124 @@ +package redis.clients.jedis.scenario; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +import com.google.gson.FieldNamingPolicy; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.fluent.Request; +import com.google.gson.Gson; +import org.apache.hc.client5.http.fluent.Response; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.core5.http.ContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FaultInjectionClient { + + private static final String BASE_URL; + + static { + BASE_URL = System.getenv().getOrDefault("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324"); + } + + private static final Logger log = LoggerFactory.getLogger(FaultInjectionClient.class); + + public static class TriggerActionResponse { + private final String actionId; + + private Instant lastRequestTime = null; + + private Instant completedAt = null; + + private Instant firstRequestAt = null; + + public TriggerActionResponse(String actionId) { + this.actionId = actionId; + } + + public String getActionId() { + return actionId; + } + + public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration timeout) { + if (completedAt != null) { + return Duration.between(completedAt, Instant.now()).compareTo(delayAfter) >= 0; + } + + if (firstRequestAt != null && Duration.between(firstRequestAt, Instant.now()) + .compareTo(timeout) >= 0) { + throw new RuntimeException("Timeout"); + } + + if (lastRequestTime == null || Duration.between(lastRequestTime, Instant.now()) + .compareTo(checkInterval) >= 0) { + lastRequestTime = Instant.now(); + + if (firstRequestAt == null) { + firstRequestAt = lastRequestTime; + } + + CloseableHttpClient httpClient = getHttpClient(); + + Request request = Request.get(BASE_URL + "/action/" + actionId); + + try { + Response response = request.execute(httpClient); + String result = response.returnContent().asString(); + + log.info("Action status: {}", result); + + if (result.contains("success")) { + completedAt = Instant.now(); + return Duration.between(completedAt, Instant.now()).compareTo(delayAfter) >= 0; + } + + } catch (IOException e) { + throw new RuntimeException("Fault injection proxy error ", e); + } + } + return false; + } + } + + private static CloseableHttpClient getHttpClient() { + RequestConfig requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(5000, TimeUnit.MILLISECONDS) + .setResponseTimeout(5000, TimeUnit.MILLISECONDS).build(); + + return HttpClientBuilder.create() + .setDefaultRequestConfig(requestConfig).build(); + } + + public TriggerActionResponse triggerAction(String actionType, HashMap parameters) + throws IOException { + Gson gson = new GsonBuilder().setFieldNamingPolicy( + FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); + + HashMap payload = new HashMap<>(); + payload.put("type", actionType); + payload.put("parameters", parameters); + + String jsonString = gson.toJson(payload); + + CloseableHttpClient httpClient = getHttpClient(); + Request request = Request.post(BASE_URL + "/action"); + request.bodyString(jsonString, ContentType.APPLICATION_JSON); + + try { + String result = request.execute(httpClient).returnContent().asString(); + return gson.fromJson(result, new TypeToken() { + }.getType()); + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + } + +} diff --git a/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java b/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java new file mode 100644 index 0000000000..aa1671ad9e --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/RecommendedSettings.java @@ -0,0 +1,31 @@ +package redis.clients.jedis.scenario; + +import redis.clients.jedis.ConnectionPoolConfig; + +import java.time.Duration; + +public class RecommendedSettings { + + public static ConnectionPoolConfig poolConfig; + + static { + poolConfig = new ConnectionPoolConfig(); + ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); + poolConfig.setMaxTotal(8); + poolConfig.setMaxIdle(8); + poolConfig.setMinIdle(0); + poolConfig.setBlockWhenExhausted(true); + poolConfig.setMaxWait(Duration.ofSeconds(1)); + poolConfig.setTestWhileIdle(true); + poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(1)); + } + + public static int MAX_RETRIES = 5; + + public static Duration MAX_TOTAL_RETRIES_DURATION = Duration.ofSeconds(10); + + public static int DEFAULT_TIMEOUT_MS = 5000; + + + +}