Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider null values in empty StreamPendingSummary #3793

Merged
merged 2 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1812,15 +1812,14 @@ public StreamPendingSummary build(Object data) {
}

List<Object> objectList = (List<Object>) data;
long total = BuilderFactory.LONG.build(objectList.get(0));
String minId = SafeEncoder.encode((byte[]) objectList.get(1));
String maxId = SafeEncoder.encode((byte[]) objectList.get(2));
List<List<Object>> consumerObjList = (List<List<Object>>) objectList.get(3);
Map<String, Long> map = new HashMap<>(consumerObjList.size());
for (List<Object> consumerObj : consumerObjList) {
map.put(SafeEncoder.encode((byte[]) consumerObj.get(0)), Long.parseLong(SafeEncoder.encode((byte[]) consumerObj.get(1))));
}
return new StreamPendingSummary(total, new StreamEntryID(minId), new StreamEntryID(maxId), map);
long total = LONG.build(objectList.get(0));
StreamEntryID minId = STREAM_ENTRY_ID.build(objectList.get(1));
StreamEntryID maxId = STREAM_ENTRY_ID.build(objectList.get(2));
Map<String, Long> map = objectList.get(3) == null ? null
: ((List<List<Object>>) objectList.get(3)).stream().collect(
Collectors.toMap(pair -> STRING.build(pair.get(0)),
pair -> Long.parseLong(STRING.build(pair.get(1)))));
return new StreamPendingSummary(total, minId, maxId, map);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,13 +599,22 @@ public void xack() {

@Test
public void xpendingWithParams() {
final String stream = "xpendeing-stream";

assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, true));

// Get the summary from empty stream
StreamPendingSummary emptySummary = jedis.xpending(stream, "xpendeing-group");
assertEquals(0, emptySummary.getTotal());
assertNull(emptySummary.getMinId());
assertNull(emptySummary.getMaxId());
assertNull(emptySummary.getConsumerMessageCount());

Map<String, String> map = new HashMap<>();
map.put("f1", "v1");
StreamEntryID id1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
StreamEntryID id1 = jedis.xadd(stream, (StreamEntryID) null, map);

assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false));

Map<String, StreamEntryID> streamQeury1 = singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
Map<String, StreamEntryID> streamQeury1 = singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

// Read the event from Stream put it on pending
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xpendeing-group",
Expand All @@ -614,8 +623,14 @@ public void xpendingWithParams() {
assertEquals(1, range.get(0).getValue().size());
assertEquals(map, range.get(0).getValue().get(0).getFields());

// Get the summary about the pending messages
StreamPendingSummary pendingSummary = jedis.xpending(stream, "xpendeing-group");
assertEquals(1, pendingSummary.getTotal());
assertEquals(id1, pendingSummary.getMinId());
assertEquals(1l, pendingSummary.getConsumerMessageCount().get("xpendeing-consumer").longValue());

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
new XPendingParams().count(3).consumer("xpendeing-consumer"));
assertEquals(1, pendingRange.size());
assertEquals(id1, pendingRange.get(0).getID());
Expand All @@ -624,40 +639,41 @@ public void xpendingWithParams() {
assertTrue(pendingRange.get(0).toString().contains("xpendeing-consumer"));

// Without consumer
pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", new XPendingParams().count(3));
pendingRange = jedis.xpending(stream, "xpendeing-group", new XPendingParams().count(3));
assertEquals(1, pendingRange.size());
assertEquals(id1, pendingRange.get(0).getID());
assertEquals(1, pendingRange.get(0).getDeliveredTimes());
assertEquals("xpendeing-consumer", pendingRange.get(0).getConsumerName());

// with idle
pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
pendingRange = jedis.xpending(stream, "xpendeing-group",
new XPendingParams().idle(Duration.ofMinutes(1).toMillis()).count(3));
assertEquals(0, pendingRange.size());
}

@Test
public void xpendingRange() {
final String stream = "xpendeing-stream";
Map<String, String> map = new HashMap<>();
map.put("foo", "bar");
StreamEntryID m1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
StreamEntryID m2 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false);
StreamEntryID m1 = jedis.xadd(stream, (StreamEntryID) null, map);
StreamEntryID m2 = jedis.xadd(stream, (StreamEntryID) null, map);
jedis.xgroupCreate(stream, "xpendeing-group", null, false);

// read 1 message from the group with each consumer
Map<String, StreamEntryID> streamQeury = singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
Map<String, StreamEntryID> streamQeury = singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
jedis.xreadGroup("xpendeing-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury);
jedis.xreadGroup("xpendeing-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury);

List<StreamPendingEntry> response = jedis.xpending("xpendeing-stream", "xpendeing-group",
List<StreamPendingEntry> response = jedis.xpending(stream, "xpendeing-group",
XPendingParams.xPendingParams("(0", "+", 5));
assertEquals(2, response.size());
assertEquals(m1, response.get(0).getID());
assertEquals("consumer1", response.get(0).getConsumerName());
assertEquals(m2, response.get(1).getID());
assertEquals("consumer2", response.get(1).getConsumerName());

response = jedis.xpending("xpendeing-stream", "xpendeing-group",
response = jedis.xpending(stream, "xpendeing-group",
XPendingParams.xPendingParams(StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID, 5));
assertEquals(2, response.size());
assertEquals(m1, response.get(0).getID());
Expand All @@ -668,18 +684,19 @@ public void xpendingRange() {

@Test
public void xclaimWithParams() {
final String stream = "xpendeing-stream";
Map<String, String> map = new HashMap<>();
map.put("f1", "v1");
jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
jedis.xadd(stream, (StreamEntryID) null, map);

assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false));
assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false));

// Read the event from Stream put it on pending
jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));
singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
XPendingParams.xPendingParams().count(3).consumer("xpendeing-consumer"));

// Sleep for 100ms so we can claim events pending for more than 50ms
Expand All @@ -689,7 +706,7 @@ public void xclaimWithParams() {
e.printStackTrace();
}

List<StreamEntry> streamEntrys = jedis.xclaim("xpendeing-stream", "xpendeing-group",
List<StreamEntry> streamEntrys = jedis.xclaim(stream, "xpendeing-group",
"xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0),
pendingRange.get(0).getID());
assertEquals(1, streamEntrys.size());
Expand All @@ -699,18 +716,19 @@ public void xclaimWithParams() {

@Test
public void xclaimJustId() {
final String stream = "xpendeing-stream";
Map<String, String> map = new HashMap<>();
map.put("f1", "v1");
jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
jedis.xadd(stream, (StreamEntryID) null, map);

assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false));
assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false));

// Read the event from Stream put it on pending
jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));
singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
XPendingParams.xPendingParams().count(3).consumer("xpendeing-consumer"));
// Sleep for 100ms so we can claim events pending for more than 50ms
try {
Expand All @@ -719,7 +737,7 @@ public void xclaimJustId() {
e.printStackTrace();
}

List<StreamEntryID> streamEntryIDS = jedis.xclaimJustId("xpendeing-stream", "xpendeing-group",
List<StreamEntryID> streamEntryIDS = jedis.xclaimJustId(stream, "xpendeing-group",
"xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0),
pendingRange.get(0).getID());
assertEquals(1, streamEntryIDS.size());
Expand Down
Loading