Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add last entry id for XREADs and support XREADs reply as map #3791

Merged
merged 5 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1420,10 +1420,10 @@
.collect(Collectors.toList());
} else {
List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(list.size());
for (Object streamObj : list) {
List<Object> stream = (List<Object>) streamObj;
String streamKey = STRING.build(stream.get(0));
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(stream.get(1));
for (Object anObj : list) {
List<Object> streamObj = (List<Object>) anObj;
String streamKey = STRING.build(streamObj.get(0));
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1));
result.add(KeyValue.of(streamKey, streamEntries));
}
return result;
Expand All @@ -1436,6 +1436,35 @@
}
};

public static final Builder<Map<String, List<StreamEntry>>> STREAM_READ_MAP_RESPONSE
= new Builder<Map<String, List<StreamEntry>>>() {
@Override
public Map<String, List<StreamEntry>> build(Object data) {
if (data == null) return null;
List list = (List) data;
if (list.isEmpty()) return Collections.emptyMap();

if (list.get(0) instanceof KeyValue) {
return ((List<KeyValue>) list).stream()
.collect(Collectors.toMap(kv -> STRING.build(kv.getKey()), kv -> STREAM_ENTRY_LIST.build(kv.getValue())));
} else {
Map<String, List<StreamEntry>> result = new HashMap<>(list.size());
for (Object anObj : list) {
List<Object> streamObj = (List<Object>) anObj;
String streamKey = STRING.build(streamObj.get(0));
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1));
result.put(streamKey, streamEntries);
}
return result;
}
}

@Override
public String toString() {
return "Map<String, List<StreamEntry>>";

Check warning on line 1464 in src/main/java/redis/clients/jedis/BuilderFactory.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/redis/clients/jedis/BuilderFactory.java#L1464

Added line #L1464 was not covered by tests
}
};

public static final Builder<List<StreamPendingEntry>> STREAM_PENDING_ENTRY_LIST = new Builder<List<StreamPendingEntry>>() {
@Override
@SuppressWarnings("unchecked")
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -2666,6 +2666,15 @@
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
}

public final CommandObject<Map<String, List<StreamEntry>>> xreadAsMap(
XReadParams xReadParams, Map<String, StreamEntryID> streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
Set<Map.Entry<String, StreamEntryID>> entrySet = streams.entrySet();
entrySet.forEach(entry -> args.key(entry.getKey()));
entrySet.forEach(entry -> args.add(entry.getValue()));
return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE);
}

public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(
String groupName, String consumer, XReadGroupParams xReadGroupParams,
Map<String, StreamEntryID> streams) {
Expand All @@ -2678,6 +2687,18 @@
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
}

public final CommandObject<Map<String, List<StreamEntry>>> xreadGroupAsMap(
String groupName, String consumer, XReadGroupParams xReadGroupParams,
Map<String, StreamEntryID> streams) {
CommandArguments args = commandArguments(XREADGROUP)
.add(GROUP).add(groupName).add(consumer)
.addParams(xReadGroupParams).add(STREAMS);
Set<Map.Entry<String, StreamEntryID>> entrySet = streams.entrySet();
entrySet.forEach(entry -> args.key(entry.getKey()));
entrySet.forEach(entry -> args.add(entry.getValue()));
return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE);

Check warning on line 2699 in src/main/java/redis/clients/jedis/CommandObjects.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/redis/clients/jedis/CommandObjects.java#L2693-L2699

Added lines #L2693 - L2699 were not covered by tests
}

public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
for (Map.Entry<byte[], byte[]> entry : streams) {
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -9394,6 +9394,12 @@
return connection.executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadAsMap(final XReadParams xReadParams, final Map<String, StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadAsMap(xReadParams, streams));
}

@Override
public long xack(final String key, final String group, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -9450,13 +9456,19 @@
}

@Override
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(final String groupName,
final String consumer, final XReadGroupParams xReadGroupParams,
final Map<String, StreamEntryID> streams) {
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(final String groupName, final String consumer,
final XReadGroupParams xReadGroupParams, final Map<String, StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadGroupAsMap(final String groupName, final String consumer,
final XReadGroupParams xReadGroupParams, final Map<String, StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));

Check warning on line 9469 in src/main/java/redis/clients/jedis/Jedis.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/redis/clients/jedis/Jedis.java#L9468-L9469

Added lines #L9468 - L9469 were not covered by tests
}

@Override
public StreamPendingSummary xpending(final String key, final String groupName) {
checkIsInMultiOrPipeline();
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/PipeliningBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1536,11 +1536,21 @@
return appendCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Response<Map<String, List<StreamEntry>>> xreadAsMap(XReadParams xReadParams, Map<String, StreamEntryID> streams) {
return appendCommand(commandObjects.xreadAsMap(xReadParams, streams));

Check warning on line 1541 in src/main/java/redis/clients/jedis/PipeliningBase.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/redis/clients/jedis/PipeliningBase.java#L1541

Added line #L1541 was not covered by tests
}

@Override
public Response<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Response<Map<String, List<StreamEntry>>> xreadGroupAsMap(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return appendCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));

Check warning on line 1551 in src/main/java/redis/clients/jedis/PipeliningBase.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/redis/clients/jedis/PipeliningBase.java#L1551

Added line #L1551 was not covered by tests
}

@Override
public Response<Object> eval(String script) {
return appendCommand(commandObjects.eval(script));
Expand Down
51 changes: 44 additions & 7 deletions src/main/java/redis/clients/jedis/StreamEntryID.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN
/**
* Should be used only with XADD
*
* <code>
* XADD mystream * field1 value1
* </code>
* {@code XADD mystream * field1 value1}
*/
public static final StreamEntryID NEW_ENTRY = new StreamEntryID() {

Expand All @@ -97,11 +95,31 @@ public String toString() {
/**
* Should be used only with XGROUP CREATE
*
* <code>
* XGROUP CREATE mystream consumer-group-name $
* </code>
* {@code XGROUP CREATE mystream consumer-group-name $}
*/
public static final StreamEntryID LAST_ENTRY = new StreamEntryID() {
public static final StreamEntryID XGROUP_LAST_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;

@Override
public String toString() {
return "$";
}
};

/**
* @deprecated Use {@link StreamEntryID#XGROUP_LAST_ENTRY} for XREADGROUP command or
* {@link StreamEntryID#XREAD_NEW_ENTRY} for XREAD command.
*/
@Deprecated
public static final StreamEntryID LAST_ENTRY = XGROUP_LAST_ENTRY;

/**
* Should be used only with XREAD
*
* {@code XREAD BLOCK 5000 COUNT 100 STREAMS mystream $}
*/
public static final StreamEntryID XREAD_NEW_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -129,6 +147,7 @@ public String toString() {
/**
* Can be used in XRANGE, XREVRANGE and XPENDING commands.
*/
// TODO: FIRST_ID ?
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
public static final StreamEntryID MINIMUM_ID = new StreamEntryID() {

private static final long serialVersionUID = 1L;
Expand All @@ -142,6 +161,8 @@ public String toString() {
/**
* Can be used in XRANGE, XREVRANGE and XPENDING commands.
*/
// TODO: LAST_ID ?
// TODO: unify with XREAD_LAST_ENTRY ??
public static final StreamEntryID MAXIMUM_ID = new StreamEntryID() {

private static final long serialVersionUID = 1L;
Expand All @@ -151,4 +172,20 @@ public String toString() {
return "+";
}
};

/**
* Should be used only with XREAD
*
* {@code XREAD STREAMS mystream +}
*/
// TODO: unify with MAXIMUM_ID ??
public static final StreamEntryID XREAD_LAST_ENTRY = new StreamEntryID() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be XREAD_LAST_ENTRY = MAXIMUM_ID, to make it obvious that it is only an alias. Somehow I find it more informative to have different names, depending on the usage context.


private static final long serialVersionUID = 1L;

@Override
public String toString() {
return "+";
}
};
}
11 changes: 11 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3066,12 +3066,23 @@
return executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadAsMap(XReadParams xReadParams, Map<String, StreamEntryID> streams) {
return executeCommand(commandObjects.xreadAsMap(xReadParams, streams));

Check warning on line 3071 in src/main/java/redis/clients/jedis/UnifiedJedis.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/redis/clients/jedis/UnifiedJedis.java#L3071

Added line #L3071 was not covered by tests
}

@Override
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadGroupAsMap(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));

Check warning on line 3083 in src/main/java/redis/clients/jedis/UnifiedJedis.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/redis/clients/jedis/UnifiedJedis.java#L3083

Added line #L3083 was not covered by tests
}

@Override
public byte[] xadd(byte[] key, XAddParams params, Map<byte[], byte[]> hash) {
return executeCommand(commandObjects.xadd(key, params, hash));
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/redis/clients/jedis/commands/StreamCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,19 @@ List<Map.Entry<String, List<StreamEntry>>> xread(XReadParams xReadParams,
/**
* XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
*/
Map<String, List<StreamEntry>> xreadAsMap(XReadParams xReadParams,
Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
Map<String, List<StreamEntry>> xreadGroupAsMap(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,19 @@ Response<List<Map.Entry<String, List<StreamEntry>>>> xread(XReadParams xReadPara
/**
* XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
*/
Response<Map<String, List<StreamEntry>>> xreadAsMap(XReadParams xReadParams,
Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
Response<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
Response<Map<String, List<StreamEntry>>> xreadGroupAsMap(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

}
Loading
Loading