Skip to content

Commit

Permalink
Consider null values in empty StreamPendingSummary (#3793)
Browse files Browse the repository at this point in the history
* Consider null values in empty StreamPendingSummary

* java stream
  • Loading branch information
sazzad16 authored Mar 29, 2024
1 parent a02b2eb commit a3b710b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 32 deletions.
17 changes: 8 additions & 9 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1812,15 +1812,14 @@ public StreamPendingSummary build(Object data) {
}

List<Object> objectList = (List<Object>) data;
long total = BuilderFactory.LONG.build(objectList.get(0));
String minId = SafeEncoder.encode((byte[]) objectList.get(1));
String maxId = SafeEncoder.encode((byte[]) objectList.get(2));
List<List<Object>> consumerObjList = (List<List<Object>>) objectList.get(3);
Map<String, Long> map = new HashMap<>(consumerObjList.size());
for (List<Object> consumerObj : consumerObjList) {
map.put(SafeEncoder.encode((byte[]) consumerObj.get(0)), Long.parseLong(SafeEncoder.encode((byte[]) consumerObj.get(1))));
}
return new StreamPendingSummary(total, new StreamEntryID(minId), new StreamEntryID(maxId), map);
long total = LONG.build(objectList.get(0));
StreamEntryID minId = STREAM_ENTRY_ID.build(objectList.get(1));
StreamEntryID maxId = STREAM_ENTRY_ID.build(objectList.get(2));
Map<String, Long> map = objectList.get(3) == null ? null
: ((List<List<Object>>) objectList.get(3)).stream().collect(
Collectors.toMap(pair -> STRING.build(pair.get(0)),
pair -> Long.parseLong(STRING.build(pair.get(1)))));
return new StreamPendingSummary(total, minId, maxId, map);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,13 +599,22 @@ public void xack() {

@Test
public void xpendingWithParams() {
final String stream = "xpendeing-stream";

assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, true));

// Get the summary from empty stream
StreamPendingSummary emptySummary = jedis.xpending(stream, "xpendeing-group");
assertEquals(0, emptySummary.getTotal());
assertNull(emptySummary.getMinId());
assertNull(emptySummary.getMaxId());
assertNull(emptySummary.getConsumerMessageCount());

Map<String, String> map = new HashMap<>();
map.put("f1", "v1");
StreamEntryID id1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
StreamEntryID id1 = jedis.xadd(stream, (StreamEntryID) null, map);

assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false));

Map<String, StreamEntryID> streamQeury1 = singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
Map<String, StreamEntryID> streamQeury1 = singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

// Read the event from Stream put it on pending
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xpendeing-group",
Expand All @@ -614,8 +623,14 @@ public void xpendingWithParams() {
assertEquals(1, range.get(0).getValue().size());
assertEquals(map, range.get(0).getValue().get(0).getFields());

// Get the summary about the pending messages
StreamPendingSummary pendingSummary = jedis.xpending(stream, "xpendeing-group");
assertEquals(1, pendingSummary.getTotal());
assertEquals(id1, pendingSummary.getMinId());
assertEquals(1l, pendingSummary.getConsumerMessageCount().get("xpendeing-consumer").longValue());

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
new XPendingParams().count(3).consumer("xpendeing-consumer"));
assertEquals(1, pendingRange.size());
assertEquals(id1, pendingRange.get(0).getID());
Expand All @@ -624,40 +639,41 @@ public void xpendingWithParams() {
assertTrue(pendingRange.get(0).toString().contains("xpendeing-consumer"));

// Without consumer
pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", new XPendingParams().count(3));
pendingRange = jedis.xpending(stream, "xpendeing-group", new XPendingParams().count(3));
assertEquals(1, pendingRange.size());
assertEquals(id1, pendingRange.get(0).getID());
assertEquals(1, pendingRange.get(0).getDeliveredTimes());
assertEquals("xpendeing-consumer", pendingRange.get(0).getConsumerName());

// with idle
pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
pendingRange = jedis.xpending(stream, "xpendeing-group",
new XPendingParams().idle(Duration.ofMinutes(1).toMillis()).count(3));
assertEquals(0, pendingRange.size());
}

@Test
public void xpendingRange() {
final String stream = "xpendeing-stream";
Map<String, String> map = new HashMap<>();
map.put("foo", "bar");
StreamEntryID m1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
StreamEntryID m2 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false);
StreamEntryID m1 = jedis.xadd(stream, (StreamEntryID) null, map);
StreamEntryID m2 = jedis.xadd(stream, (StreamEntryID) null, map);
jedis.xgroupCreate(stream, "xpendeing-group", null, false);

// read 1 message from the group with each consumer
Map<String, StreamEntryID> streamQeury = singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
Map<String, StreamEntryID> streamQeury = singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
jedis.xreadGroup("xpendeing-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury);
jedis.xreadGroup("xpendeing-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury);

List<StreamPendingEntry> response = jedis.xpending("xpendeing-stream", "xpendeing-group",
List<StreamPendingEntry> response = jedis.xpending(stream, "xpendeing-group",
XPendingParams.xPendingParams("(0", "+", 5));
assertEquals(2, response.size());
assertEquals(m1, response.get(0).getID());
assertEquals("consumer1", response.get(0).getConsumerName());
assertEquals(m2, response.get(1).getID());
assertEquals("consumer2", response.get(1).getConsumerName());

response = jedis.xpending("xpendeing-stream", "xpendeing-group",
response = jedis.xpending(stream, "xpendeing-group",
XPendingParams.xPendingParams(StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID, 5));
assertEquals(2, response.size());
assertEquals(m1, response.get(0).getID());
Expand All @@ -668,18 +684,19 @@ public void xpendingRange() {

@Test
public void xclaimWithParams() {
final String stream = "xpendeing-stream";
Map<String, String> map = new HashMap<>();
map.put("f1", "v1");
jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
jedis.xadd(stream, (StreamEntryID) null, map);

assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false));
assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false));

// Read the event from Stream put it on pending
jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));
singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
XPendingParams.xPendingParams().count(3).consumer("xpendeing-consumer"));

// Sleep for 100ms so we can claim events pending for more than 50ms
Expand All @@ -689,7 +706,7 @@ public void xclaimWithParams() {
e.printStackTrace();
}

List<StreamEntry> streamEntrys = jedis.xclaim("xpendeing-stream", "xpendeing-group",
List<StreamEntry> streamEntrys = jedis.xclaim(stream, "xpendeing-group",
"xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0),
pendingRange.get(0).getID());
assertEquals(1, streamEntrys.size());
Expand All @@ -699,18 +716,19 @@ public void xclaimWithParams() {

@Test
public void xclaimJustId() {
final String stream = "xpendeing-stream";
Map<String, String> map = new HashMap<>();
map.put("f1", "v1");
jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
jedis.xadd(stream, (StreamEntryID) null, map);

assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false));
assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false));

// Read the event from Stream put it on pending
jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));
singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
XPendingParams.xPendingParams().count(3).consumer("xpendeing-consumer"));
// Sleep for 100ms so we can claim events pending for more than 50ms
try {
Expand All @@ -719,7 +737,7 @@ public void xclaimJustId() {
e.printStackTrace();
}

List<StreamEntryID> streamEntryIDS = jedis.xclaimJustId("xpendeing-stream", "xpendeing-group",
List<StreamEntryID> streamEntryIDS = jedis.xclaimJustId(stream, "xpendeing-group",
"xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0),
pendingRange.get(0).getID());
assertEquals(1, streamEntryIDS.size());
Expand Down

0 comments on commit a3b710b

Please sign in to comment.