diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index c7349f3e89..684a6d6172 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1736,15 +1736,14 @@ public StreamPendingSummary build(Object data) { } List objectList = (List) data; - long total = BuilderFactory.LONG.build(objectList.get(0)); - String minId = SafeEncoder.encode((byte[]) objectList.get(1)); - String maxId = SafeEncoder.encode((byte[]) objectList.get(2)); - List> consumerObjList = (List>) objectList.get(3); - Map map = new HashMap<>(consumerObjList.size()); - for (List consumerObj : consumerObjList) { - map.put(SafeEncoder.encode((byte[]) consumerObj.get(0)), Long.parseLong(SafeEncoder.encode((byte[]) consumerObj.get(1)))); - } - return new StreamPendingSummary(total, new StreamEntryID(minId), new StreamEntryID(maxId), map); + long total = LONG.build(objectList.get(0)); + StreamEntryID minId = STREAM_ENTRY_ID.build(objectList.get(1)); + StreamEntryID maxId = STREAM_ENTRY_ID.build(objectList.get(2)); + Map map = objectList.get(3) == null ? null + : ((List>) objectList.get(3)).stream().collect( + Collectors.toMap(pair -> STRING.build(pair.get(0)), + pair -> Long.parseLong(STRING.build(pair.get(1))))); + return new StreamPendingSummary(total, minId, maxId, map); } @Override diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java index 6b28f9ed19..7c985b9e47 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java @@ -520,14 +520,22 @@ public void xack() { @Test public void xpendingWithParams() { + final String stream = "xpendeing-stream"; + + assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, true)); + + // Get the summary from empty stream + StreamPendingSummary emptySummary = jedis.xpending(stream, "xpendeing-group"); + assertEquals(0, emptySummary.getTotal()); + assertNull(emptySummary.getMinId()); + assertNull(emptySummary.getMaxId()); + assertNull(emptySummary.getConsumerMessageCount()); + Map map = new HashMap<>(); map.put("f1", "v1"); - StreamEntryID id1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); + StreamEntryID id1 = jedis.xadd(stream, (StreamEntryID) null, map); - assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false)); - - Map streamQeury1 = singletonMap( - "xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY); // Read the event from Stream put it on pending List>> range = jedis.xreadGroup("xpendeing-group", @@ -536,8 +544,14 @@ public void xpendingWithParams() { assertEquals(1, range.get(0).getValue().size()); assertEquals(map, range.get(0).getValue().get(0).getFields()); + // Get the summary about the pending messages + StreamPendingSummary pendingSummary = jedis.xpending(stream, "xpendeing-group"); + assertEquals(1, pendingSummary.getTotal()); + assertEquals(id1, pendingSummary.getMinId()); + assertEquals(1l, pendingSummary.getConsumerMessageCount().get("xpendeing-consumer").longValue()); + // Get the pending event - List pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", + List pendingRange = jedis.xpending(stream, "xpendeing-group", new XPendingParams().count(3).consumer("xpendeing-consumer")); assertEquals(1, pendingRange.size()); assertEquals(id1, pendingRange.get(0).getID()); @@ -546,33 +560,33 @@ public void xpendingWithParams() { assertTrue(pendingRange.get(0).toString().contains("xpendeing-consumer")); // Without consumer - pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", new XPendingParams().count(3)); + pendingRange = jedis.xpending(stream, "xpendeing-group", new XPendingParams().count(3)); assertEquals(1, pendingRange.size()); assertEquals(id1, pendingRange.get(0).getID()); assertEquals(1, pendingRange.get(0).getDeliveredTimes()); assertEquals("xpendeing-consumer", pendingRange.get(0).getConsumerName()); // with idle - pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", + pendingRange = jedis.xpending(stream, "xpendeing-group", new XPendingParams().idle(Duration.ofMinutes(1).toMillis()).count(3)); assertEquals(0, pendingRange.size()); } @Test public void xpendingRange() { + final String stream = "xpendeing-stream"; Map map = new HashMap<>(); map.put("foo", "bar"); - StreamEntryID m1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); - StreamEntryID m2 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); - jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false); + StreamEntryID m1 = jedis.xadd(stream, (StreamEntryID) null, map); + StreamEntryID m2 = jedis.xadd(stream, (StreamEntryID) null, map); + jedis.xgroupCreate(stream, "xpendeing-group", null, false); // read 1 message from the group with each consumer - Map streamQeury = singletonMap( - "xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury = singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY); jedis.xreadGroup("xpendeing-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury); jedis.xreadGroup("xpendeing-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury); - List response = jedis.xpending("xpendeing-stream", "xpendeing-group", + List response = jedis.xpending(stream, "xpendeing-group", XPendingParams.xPendingParams("(0", "+", 5)); assertEquals(2, response.size()); assertEquals(m1, response.get(0).getID()); @@ -580,7 +594,7 @@ public void xpendingRange() { assertEquals(m2, response.get(1).getID()); assertEquals("consumer2", response.get(1).getConsumerName()); - response = jedis.xpending("xpendeing-stream", "xpendeing-group", + response = jedis.xpending(stream, "xpendeing-group", XPendingParams.xPendingParams(StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID, 5)); assertEquals(2, response.size()); assertEquals(m1, response.get(0).getID()); @@ -591,18 +605,19 @@ public void xpendingRange() { @Test public void xclaimWithParams() { + final String stream = "xpendeing-stream"; Map map = new HashMap<>(); map.put("f1", "v1"); - jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); + jedis.xadd(stream, (StreamEntryID) null, map); - assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false)); + assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false)); // Read the event from Stream put it on pending jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY)); // Get the pending event - List pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", + List pendingRange = jedis.xpending(stream, "xpendeing-group", XPendingParams.xPendingParams().count(3).consumer("xpendeing-consumer")); // Sleep for 100ms so we can claim events pending for more than 50ms @@ -612,7 +627,7 @@ public void xclaimWithParams() { e.printStackTrace(); } - List streamEntrys = jedis.xclaim("xpendeing-stream", "xpendeing-group", + List streamEntrys = jedis.xclaim(stream, "xpendeing-group", "xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0), pendingRange.get(0).getID()); assertEquals(1, streamEntrys.size()); @@ -622,18 +637,19 @@ public void xclaimWithParams() { @Test public void xclaimJustId() { + final String stream = "xpendeing-stream"; Map map = new HashMap<>(); map.put("f1", "v1"); - jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); + jedis.xadd(stream, (StreamEntryID) null, map); - assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false)); + assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false)); // Read the event from Stream put it on pending jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY)); // Get the pending event - List pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", + List pendingRange = jedis.xpending(stream, "xpendeing-group", XPendingParams.xPendingParams().count(3).consumer("xpendeing-consumer")); // Sleep for 100ms so we can claim events pending for more than 50ms try { @@ -642,7 +658,7 @@ public void xclaimJustId() { e.printStackTrace(); } - List streamEntryIDS = jedis.xclaimJustId("xpendeing-stream", "xpendeing-group", + List streamEntryIDS = jedis.xclaimJustId(stream, "xpendeing-group", "xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0), pendingRange.get(0).getID()); assertEquals(1, streamEntryIDS.size());