Skip to content

Commit

Permalink
Merge branch 'master' into emb-examples
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Feb 27, 2024
2 parents 5d58ae6 + 6f3db46 commit f217596
Show file tree
Hide file tree
Showing 14 changed files with 338 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20231013</version>
<version>20240205</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
Expand All @@ -79,7 +79,7 @@
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
<artifactId>junixsocket-core</artifactId>
<version>2.8.3</version>
<version>2.9.0</version>
<type>pom</type>
<scope>test</scope>
</dependency>
Expand Down
47 changes: 47 additions & 0 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,53 @@ public Map<String, CommandInfo> build(Object data) {
}
};

public static final Builder<Map<String, LatencyLatestInfo>> LATENCY_LATEST_RESPONSE = new Builder<Map<String, LatencyLatestInfo>>() {
@Override
public Map<String, LatencyLatestInfo> build(Object data) {
if (data == null) {
return null;
}

List<Object> rawList = (List<Object>) data;
Map<String, LatencyLatestInfo> map = new HashMap<>(rawList.size());

for (Object rawLatencyLatestInfo : rawList) {
if (rawLatencyLatestInfo == null) {
continue;
}

LatencyLatestInfo latestInfo = LatencyLatestInfo.LATENCY_LATEST_BUILDER.build(rawLatencyLatestInfo);
String name = latestInfo.getCommand();
map.put(name, latestInfo);
}

return map;
}
};

public static final Builder<List<LatencyHistoryInfo>> LATENCY_HISTORY_RESPONSE = new Builder<List<LatencyHistoryInfo>>() {
@Override
public List<LatencyHistoryInfo> build(Object data) {
if (data == null) {
return null;
}

List<Object> rawList = (List<Object>) data;
List<LatencyHistoryInfo> response = new ArrayList<>(rawList.size());

for (Object rawLatencyHistoryInfo : rawList) {
if (rawLatencyHistoryInfo == null) {
continue;
}

LatencyHistoryInfo historyInfo = LatencyHistoryInfo.LATENCY_HISTORY_BUILDER.build(rawLatencyHistoryInfo);
response.add(historyInfo);
}

return response;
}
};

private static final Builder<List<List<Long>>> CLUSTER_SHARD_SLOTS_RANGES = new Builder<List<List<Long>>>() {

@Override
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.Arrays;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
Expand Down Expand Up @@ -9280,6 +9281,26 @@ public String latencyDoctor() {
return connection.getBulkReply();
}

public Map<String, LatencyLatestInfo> latencyLatest() {
checkIsInMultiOrPipeline();
connection.sendCommand(LATENCY, LATEST);
return BuilderFactory.LATENCY_LATEST_RESPONSE.build(connection.getOne());
}

public List<LatencyHistoryInfo> latencyHistory(LatencyEvent event) {
checkIsInMultiOrPipeline();
connection.sendCommand(new CommandArguments(LATENCY).add(HISTORY).add(event));
return BuilderFactory.LATENCY_HISTORY_RESPONSE.build(connection.getOne());
}

public long latencyReset(LatencyEvent... events) {
checkIsInMultiOrPipeline();
CommandArguments arguments = new CommandArguments(LATENCY).add(Keyword.RESET);
Arrays.stream(events).forEach(arguments::add);
connection.sendCommand(arguments);
return connection.getIntegerReply();
}

@Override
public StreamEntryID xadd(final String key, final StreamEntryID id, final Map<String, String> hash) {
checkIsInMultiOrPipeline();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/JedisPubSubBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private void process() {
} else {
throw new JedisException("Unknown message type: " + reply);
}
} while (isSubscribed());
} while (!Thread.currentThread().isInterrupted() && isSubscribed());

// /* Invalidate instance since this thread is no longer listening */
// this.client = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void process() {
} else {
throw new JedisException("Unknown message type: " + reply);
}
} while (isSubscribed());
} while (!Thread.currentThread().isInterrupted() && isSubscribed());

// /* Invalidate instance since this thread is no longer listening */
// this.client = null;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
queue = pipelinedResponses.get(nodeKey);
connection = connections.get(nodeKey);
} else {
pipelinedResponses.putIfAbsent(nodeKey, new LinkedList<>());
queue = pipelinedResponses.get(nodeKey);

Connection newOne = getConnection(nodeKey);
connections.putIfAbsent(nodeKey, newOne);
connection = connections.get(nodeKey);
if (connection != newOne) {
log.debug("Duplicate connection to {}, closing it.", nodeKey);
IOUtils.closeQuietly(newOne);
}

pipelinedResponses.putIfAbsent(nodeKey, new LinkedList<>());
queue = pipelinedResponses.get(nodeKey);
}

connection.sendCommand(commandObject.getArguments());
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public static enum Keyword implements Rawable {
REV, WITHCOORD, WITHDIST, WITHHASH, ANY, FROMMEMBER, FROMLONLAT, BYRADIUS, BYBOX, BYLEX, BYSCORE,
STOREDIST, TO, FORCE, TIMEOUT, DB, UNLOAD, ABORT, IDX, MINMATCHLEN, WITHMATCHLEN, FULL,
DELETE, LIBRARYNAME, WITHCODE, DESCRIPTION, GETKEYS, GETKEYSANDFLAGS, DOCS, FILTERBY, DUMP,
MODULE, ACLCAT, PATTERN, DOCTOR, USAGE, SAMPLES, PURGE, STATS, LOADEX, CONFIG, ARGS, RANK,
MODULE, ACLCAT, PATTERN, DOCTOR, LATEST, HISTORY, USAGE, SAMPLES, PURGE, STATS, LOADEX, CONFIG, ARGS, RANK,
NOW, VERSION, ADDR, SKIPME, USER, LADDR,
CHANNELS, NUMPAT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB;

Expand Down
24 changes: 24 additions & 0 deletions src/main/java/redis/clients/jedis/args/LatencyEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package redis.clients.jedis.args;

import redis.clients.jedis.util.SafeEncoder;

public enum LatencyEvent implements Rawable {

ACTIVE_DEFRAG_CYCLE("active-defrag-cycle"), AOF_FSYNC_ALWAYS("aof-fsync-always"), AOF_STAT("aof-stat"),
AOF_REWRITE_DIFF_WRITE("aof-rewrite-diff-write"), AOF_RENAME("aof-rename"), AOF_WRITE("aof-write"),
AOF_WRITE_ACTIVE_CHILD("aof-write-active-child"), AOF_WRITE_ALONE("aof-write-alone"),
AOF_WRITE_PENDING_FSYNC("aof-write-pending-fsync"), COMMAND("command"), EXPIRE_CYCLE("expire-cycle"),
EVICTION_CYCLE("eviction-cycle"), EVICTION_DEL("eviction-del"), FAST_COMMAND("fast-command"),
FORK("fork"), RDB_UNLINK_TEMP_FILE("rdb-unlink-temp-file");

private final byte[] raw;

private LatencyEvent(String s) {
raw = SafeEncoder.encode(s);
}

@Override
public byte[] getRaw() {
return raw;
}
}
12 changes: 12 additions & 0 deletions src/main/java/redis/clients/jedis/commands/ServerCommands.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package redis.clients.jedis.commands;

import redis.clients.jedis.args.FlushMode;
import redis.clients.jedis.args.LatencyEvent;
import redis.clients.jedis.args.SaveMode;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.params.LolwutParams;
import redis.clients.jedis.params.ShutdownParams;
import redis.clients.jedis.resps.LatencyHistoryInfo;
import redis.clients.jedis.resps.LatencyLatestInfo;
import redis.clients.jedis.util.KeyValue;

import java.util.List;
import java.util.Map;

public interface ServerCommands {

/**
Expand Down Expand Up @@ -246,4 +252,10 @@ default void shutdown(SaveMode saveMode) throws JedisException {
* @return the report
*/
String latencyDoctor();

Map<String, LatencyLatestInfo> latencyLatest();

List<LatencyHistoryInfo> latencyHistory(LatencyEvent events);

long latencyReset(LatencyEvent... events);
}
38 changes: 38 additions & 0 deletions src/main/java/redis/clients/jedis/resps/LatencyHistoryInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package redis.clients.jedis.resps;

import redis.clients.jedis.Builder;

import java.util.List;

import static redis.clients.jedis.BuilderFactory.LONG;

public class LatencyHistoryInfo {

private final long timestamp;
private final long latency;

public LatencyHistoryInfo(long timestamp, long latency) {
this.timestamp = timestamp;
this.latency = latency;
}

public long getTimestamp() {
return timestamp;
}

public long getLatency() {
return latency;
}

public static final Builder<LatencyHistoryInfo> LATENCY_HISTORY_BUILDER = new Builder<LatencyHistoryInfo>() {
@Override
public LatencyHistoryInfo build(Object data) {
List<Object> commandData = (List<Object>) data;

long timestamp = LONG.build(commandData.get(0));
long latency = LONG.build(commandData.get(1));

return new LatencyHistoryInfo(timestamp, latency);
}
};
}
53 changes: 53 additions & 0 deletions src/main/java/redis/clients/jedis/resps/LatencyLatestInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package redis.clients.jedis.resps;

import redis.clients.jedis.Builder;

import java.util.List;

import static redis.clients.jedis.BuilderFactory.LONG;
import static redis.clients.jedis.BuilderFactory.STRING;

public class LatencyLatestInfo {

private final String command;
private final long timestamp;
private final long lastEventLatency;
private final long maxEventLatency;

public LatencyLatestInfo(String command, long timestamp, long lastEventLatency, long maxEventLatency) {
this.command = command;
this.timestamp = timestamp;
this.lastEventLatency = lastEventLatency;
this.maxEventLatency = maxEventLatency;
}

public String getCommand() {
return command;
}

public long getTimestamp() {
return timestamp;
}

public long getLastEventLatency() {
return lastEventLatency;
}

public long getMaxEventLatency() {
return maxEventLatency;
}

public static final Builder<LatencyLatestInfo> LATENCY_LATEST_BUILDER = new Builder<LatencyLatestInfo>() {
@Override
public LatencyLatestInfo build(Object data) {
List<Object> commandData = (List<Object>) data;

String command = STRING.build(commandData.get(0));
long timestamp = LONG.build(commandData.get(1));
long lastEventLatency = LONG.build(commandData.get(2));
long maxEventLatency = LONG.build(commandData.get(3));

return new LatencyLatestInfo(command, timestamp, lastEventLatency, maxEventLatency);
}
};
}
59 changes: 59 additions & 0 deletions src/test/java/redis/clients/jedis/JedisPubSubBaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package redis.clients.jedis;

import junit.framework.TestCase;
import redis.clients.jedis.util.SafeEncoder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static redis.clients.jedis.Protocol.ResponseKeyword.MESSAGE;
import static redis.clients.jedis.Protocol.ResponseKeyword.SUBSCRIBE;

public class JedisPubSubBaseTest extends TestCase {

public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
// setup
final JedisPubSubBase<String> pubSub = new JedisPubSubBase<String>() {

@Override
public void onMessage(String channel, String message) {
fail("this should not happen when thread is interrupted");
}

@Override
protected String encode(byte[] raw) {
return SafeEncoder.encode(raw);
}
};

final Connection mockConnection = mock(Connection.class);
final List<Object> mockSubscribe = Arrays.asList(
SUBSCRIBE.getRaw(), "channel".getBytes(), 1L
);
final List<Object> mockResponse = Arrays.asList(
MESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
);

when(mockConnection.getUnflushedObject()).

thenReturn(mockSubscribe, mockResponse);


final CountDownLatch countDownLatch = new CountDownLatch(1);
// action
final Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
pubSub.proceed(mockConnection, "channel");

countDownLatch.countDown();
});
thread.start();

assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));

}
}
Loading

0 comments on commit f217596

Please sign in to comment.