Skip to content

Commit

Permalink
Add last entry id for XREADs and support XREADs reply as map (#3791)
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Jul 10, 2024
1 parent 1bca985 commit be7e61f
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 60 deletions.
37 changes: 33 additions & 4 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1420,10 +1420,10 @@ public List<Map.Entry<String, List<StreamEntry>>> build(Object data) {
.collect(Collectors.toList());
} else {
List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(list.size());
for (Object streamObj : list) {
List<Object> stream = (List<Object>) streamObj;
String streamKey = STRING.build(stream.get(0));
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(stream.get(1));
for (Object anObj : list) {
List<Object> streamObj = (List<Object>) anObj;
String streamKey = STRING.build(streamObj.get(0));
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1));
result.add(KeyValue.of(streamKey, streamEntries));
}
return result;
Expand All @@ -1436,6 +1436,35 @@ public String toString() {
}
};

public static final Builder<Map<String, List<StreamEntry>>> STREAM_READ_MAP_RESPONSE
= new Builder<Map<String, List<StreamEntry>>>() {
@Override
public Map<String, List<StreamEntry>> build(Object data) {
if (data == null) return null;
List list = (List) data;
if (list.isEmpty()) return Collections.emptyMap();

if (list.get(0) instanceof KeyValue) {
return ((List<KeyValue>) list).stream()
.collect(Collectors.toMap(kv -> STRING.build(kv.getKey()), kv -> STREAM_ENTRY_LIST.build(kv.getValue())));
} else {
Map<String, List<StreamEntry>> result = new HashMap<>(list.size());
for (Object anObj : list) {
List<Object> streamObj = (List<Object>) anObj;
String streamKey = STRING.build(streamObj.get(0));
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1));
result.put(streamKey, streamEntries);
}
return result;
}
}

@Override
public String toString() {
return "Map<String, List<StreamEntry>>";
}
};

public static final Builder<List<StreamPendingEntry>> STREAM_PENDING_ENTRY_LIST = new Builder<List<StreamPendingEntry>>() {
@Override
@SuppressWarnings("unchecked")
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -2658,6 +2658,15 @@ public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xread(
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
}

public final CommandObject<Map<String, List<StreamEntry>>> xreadAsMap(
XReadParams xReadParams, Map<String, StreamEntryID> streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
Set<Map.Entry<String, StreamEntryID>> entrySet = streams.entrySet();
entrySet.forEach(entry -> args.key(entry.getKey()));
entrySet.forEach(entry -> args.add(entry.getValue()));
return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE);
}

public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(
String groupName, String consumer, XReadGroupParams xReadGroupParams,
Map<String, StreamEntryID> streams) {
Expand All @@ -2670,6 +2679,18 @@ public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xreadGrou
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
}

public final CommandObject<Map<String, List<StreamEntry>>> xreadGroupAsMap(
String groupName, String consumer, XReadGroupParams xReadGroupParams,
Map<String, StreamEntryID> streams) {
CommandArguments args = commandArguments(XREADGROUP)
.add(GROUP).add(groupName).add(consumer)
.addParams(xReadGroupParams).add(STREAMS);
Set<Map.Entry<String, StreamEntryID>> entrySet = streams.entrySet();
entrySet.forEach(entry -> args.key(entry.getKey()));
entrySet.forEach(entry -> args.add(entry.getValue()));
return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE);
}

public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
for (Map.Entry<byte[], byte[]> entry : streams) {
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -9390,6 +9390,12 @@ public List<Map.Entry<String, List<StreamEntry>>> xread(final XReadParams xReadP
return connection.executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadAsMap(final XReadParams xReadParams, final Map<String, StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadAsMap(xReadParams, streams));
}

@Override
public long xack(final String key, final String group, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -9446,13 +9452,19 @@ public long xtrim(final String key, final XTrimParams params) {
}

@Override
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(final String groupName,
final String consumer, final XReadGroupParams xReadGroupParams,
final Map<String, StreamEntryID> streams) {
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(final String groupName, final String consumer,
final XReadGroupParams xReadGroupParams, final Map<String, StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadGroupAsMap(final String groupName, final String consumer,
final XReadGroupParams xReadGroupParams, final Map<String, StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));
}

@Override
public StreamPendingSummary xpending(final String key, final String groupName) {
checkIsInMultiOrPipeline();
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/PipeliningBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1527,11 +1527,21 @@ public Response<List<Map.Entry<String, List<StreamEntry>>>> xread(XReadParams xR
return appendCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Response<Map<String, List<StreamEntry>>> xreadAsMap(XReadParams xReadParams, Map<String, StreamEntryID> streams) {
return appendCommand(commandObjects.xreadAsMap(xReadParams, streams));
}

@Override
public Response<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Response<Map<String, List<StreamEntry>>> xreadGroupAsMap(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return appendCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Response<Object> eval(String script) {
return appendCommand(commandObjects.eval(script));
Expand Down
57 changes: 48 additions & 9 deletions src/main/java/redis/clients/jedis/StreamEntryID.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN
/**
* Should be used only with XADD
*
* <code>
* XADD mystream * field1 value1
* </code>
* {@code XADD mystream * field1 value1}
*/
public static final StreamEntryID NEW_ENTRY = new StreamEntryID() {

Expand All @@ -97,11 +95,31 @@ public String toString() {
/**
* Should be used only with XGROUP CREATE
*
* <code>
* XGROUP CREATE mystream consumer-group-name $
* </code>
* {@code XGROUP CREATE mystream consumer-group-name $}
*/
public static final StreamEntryID LAST_ENTRY = new StreamEntryID() {
public static final StreamEntryID XGROUP_LAST_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;

@Override
public String toString() {
return "$";
}
};

/**
* @deprecated Use {@link StreamEntryID#XGROUP_LAST_ENTRY} for XGROUP CREATE command or
* {@link StreamEntryID#XREAD_NEW_ENTRY} for XREAD command.
*/
@Deprecated
public static final StreamEntryID LAST_ENTRY = XGROUP_LAST_ENTRY;

/**
* Should be used only with XREAD
*
* {@code XREAD BLOCK 5000 COUNT 100 STREAMS mystream $}
*/
public static final StreamEntryID XREAD_NEW_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;

Expand All @@ -114,9 +132,9 @@ public String toString() {
/**
* Should be used only with XREADGROUP
* <p>
* {@code XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >}
* {@code XREADGROUP GROUP mygroup myconsumer STREAMS mystream >}
*/
public static final StreamEntryID UNRECEIVED_ENTRY = new StreamEntryID() {
public static final StreamEntryID XREADGROUP_UNDELIVERED_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;

Expand All @@ -126,6 +144,12 @@ public String toString() {
}
};

/**
* @deprecated Use {@link StreamEntryID#XREADGROUP_UNDELIVERED_ENTRY}.
*/
@Deprecated
public static final StreamEntryID UNRECEIVED_ENTRY = XREADGROUP_UNDELIVERED_ENTRY;

/**
* Can be used in XRANGE, XREVRANGE and XPENDING commands.
*/
Expand All @@ -151,4 +175,19 @@ public String toString() {
return "+";
}
};

/**
* Should be used only with XREAD
*
* {@code XREAD STREAMS mystream +}
*/
public static final StreamEntryID XREAD_LAST_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;

@Override
public String toString() {
return "+";
}
};
}
11 changes: 11 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3056,12 +3056,23 @@ public List<Map.Entry<String, List<StreamEntry>>> xread(XReadParams xReadParams,
return executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadAsMap(XReadParams xReadParams, Map<String, StreamEntryID> streams) {
return executeCommand(commandObjects.xreadAsMap(xReadParams, streams));
}

@Override
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadGroupAsMap(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));
}

@Override
public byte[] xadd(byte[] key, XAddParams params, Map<byte[], byte[]> hash) {
return executeCommand(commandObjects.xadd(key, params, hash));
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/redis/clients/jedis/commands/StreamCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,19 @@ List<Map.Entry<String, List<StreamEntry>>> xread(XReadParams xReadParams,
/**
* XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
*/
Map<String, List<StreamEntry>> xreadAsMap(XReadParams xReadParams,
Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
Map<String, List<StreamEntry>> xreadGroupAsMap(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,19 @@ Response<List<Map.Entry<String, List<StreamEntry>>>> xread(XReadParams xReadPara
/**
* XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
*/
Response<Map<String, List<StreamEntry>>> xreadAsMap(XReadParams xReadParams,
Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
Response<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
Response<Map<String, List<StreamEntry>>> xreadGroupAsMap(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

}
Loading

0 comments on commit be7e61f

Please sign in to comment.