diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index ddd6071776..ed383c72db 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1377,7 +1377,13 @@ public StreamEntry build(Object data) { while (hashIterator.hasNext()) { map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next())); } - return new StreamEntry(entryID, map); + Long idle = null; + Long times = null; + if (objectList.size() >= 4) { + idle = LONG.build(objectList.get(2)); + times = LONG.build(objectList.get(3)); + } + return new StreamEntry(entryID, map, idle, times); } @Override @@ -1418,7 +1424,19 @@ public List build(Object data) { while (hashIterator.hasNext()) { map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next())); } - responses.add(new StreamEntry(entryID, map)); + Long idle = null; + Long times = null; + if (res.size() >= 4) { + Object idleObj = res.get(2); + Object timesObj = res.get(3); + idle = (idleObj instanceof Long) ? (Long) idleObj : Long.valueOf(STRING.build(idleObj)); + times = (timesObj instanceof Long) ? (Long) timesObj : Long.valueOf(STRING.build(timesObj)); + if (idle != null && times != null && idle == 0L && times == 0L) { + idle = null; + times = null; + } + } + responses.add(new StreamEntry(entryID, map, idle, times)); } return responses; @@ -1969,7 +1987,19 @@ public List build(Object data) { while (hashIterator.hasNext()) { map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next())); } - responses.add(new StreamEntryBinary(entryID, map)); + Long idle = null; + Long times = null; + if (res.size() >= 4) { + Object idleObj = res.get(2); + Object timesObj = res.get(3); + idle = (idleObj instanceof Long) ? (Long) idleObj : Long.valueOf(STRING.build(idleObj)); + times = (timesObj instanceof Long) ? (Long) timesObj : Long.valueOf(STRING.build(timesObj)); + if (idle != null && times != null && idle == 0L && times == 0L) { + idle = null; + times = null; + } + } + responses.add(new StreamEntryBinary(entryID, map, idle, times)); } return responses; diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index ccc185188a..be0301deb5 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -331,7 +331,7 @@ public static enum Keyword implements Rawable { AGGREGATE, ALPHA, BY, GET, LIMIT, NO, NOSORT, ONE, SET, STORE, WEIGHTS, WITHSCORE, WITHSCORES, RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, LEN, HELP, SCHEDULE, MATCH, COUNT, TYPE, KEYS, REFCOUNT, ENCODING, IDLETIME, FREQ, REPLACE, GETNAME, SETNAME, SETINFO, LIST, ID, KILL, PERSIST, - STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK, + STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK, CLAIM, RETRYCOUNT, STREAM, GROUPS, CONSUMERS, JUSTID, WITHVALUES, NOMKSTREAM, MINID, CREATECONSUMER, SETUSER, GETUSER, DELUSER, WHOAMI, USERS, CAT, GENPASS, LOG, SAVE, DRYRUN, COPY, AUTH, AUTH2, NX, XX, EX, PX, EXAT, PXAT, ABSTTL, KEEPTTL, INCR, LT, GT, CH, INFO, PAUSE, UNPAUSE, UNBLOCK, diff --git a/src/main/java/redis/clients/jedis/params/XReadGroupParams.java b/src/main/java/redis/clients/jedis/params/XReadGroupParams.java index a761f166b2..7af8aa0b4d 100644 --- a/src/main/java/redis/clients/jedis/params/XReadGroupParams.java +++ b/src/main/java/redis/clients/jedis/params/XReadGroupParams.java @@ -10,6 +10,7 @@ public class XReadGroupParams implements IParams { private Integer count = null; private Integer block = null; private boolean noack = false; + private Long claim = null; public static XReadGroupParams xReadGroupParams() { return new XReadGroupParams(); @@ -30,6 +31,12 @@ public XReadGroupParams noAck() { return this; } + public XReadGroupParams claim(long minIdleMillis) { + this.claim = minIdleMillis; + return this; + } + + @Override public void addParams(CommandArguments args) { if (count != null) { @@ -41,6 +48,9 @@ public void addParams(CommandArguments args) { if (noack) { args.add(Keyword.NOACK); } + if (claim != null) { + args.add(Keyword.CLAIM).add(claim); + } } @Override @@ -48,11 +58,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; XReadGroupParams that = (XReadGroupParams) o; - return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block); + return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block) + && Objects.equals(claim, that.claim); } @Override public int hashCode() { - return Objects.hash(count, block, noack); + return Objects.hash(count, block, noack, claim); } } diff --git a/src/main/java/redis/clients/jedis/resps/StreamEntry.java b/src/main/java/redis/clients/jedis/resps/StreamEntry.java index 0d76ad1e12..2b8e515261 100644 --- a/src/main/java/redis/clients/jedis/resps/StreamEntry.java +++ b/src/main/java/redis/clients/jedis/resps/StreamEntry.java @@ -12,10 +12,28 @@ public class StreamEntry implements Serializable { private StreamEntryID id; private Map fields; + private Long idleTime; // milliseconds since last delivery (claimed entries) + private Long deliveredTimes; // delivery count (claimed entries) + public StreamEntry(StreamEntryID id, Map fields) { this.id = id; this.fields = fields; } + public StreamEntry(StreamEntryID id, Map fields, Long idleTime, Long deliveredTimes) { + this.id = id; + this.fields = fields; + this.idleTime = idleTime; + this.deliveredTimes = deliveredTimes; + } + + public Long getIdleTime() { + return idleTime; + } + + public Long getDeliveredTimes() { + return deliveredTimes; + } + public StreamEntryID getID() { return id; diff --git a/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java b/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java index 24f0b5f5fb..26d906cd1a 100644 --- a/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java +++ b/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java @@ -12,10 +12,28 @@ public class StreamEntryBinary implements Serializable { private StreamEntryID id; private Map fields; + private Long idleTime; // milliseconds since last delivery (claimed entries) + private Long deliveredTimes; // delivery count (claimed entries) + public StreamEntryBinary(StreamEntryID id, Map fields) { this.id = id; this.fields = fields; } + public StreamEntryBinary(StreamEntryID id, Map fields, Long idleTime, Long deliveredTimes) { + this.id = id; + this.fields = fields; + this.idleTime = idleTime; + this.deliveredTimes = deliveredTimes; + } + + public Long getIdleTime() { + return idleTime; + } + + public Long getDeliveredTimes() { + return deliveredTimes; + } + public StreamEntryID getID() { return id; @@ -39,4 +57,4 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN this.id = (StreamEntryID) in.readUnshared(); this.fields = (Map) in.readUnshared(); } -} \ No newline at end of file +} diff --git a/src/test/java/io/redis/test/utils/RedisVersion.java b/src/test/java/io/redis/test/utils/RedisVersion.java index bbaab9b4c0..59d3769265 100644 --- a/src/test/java/io/redis/test/utils/RedisVersion.java +++ b/src/test/java/io/redis/test/utils/RedisVersion.java @@ -7,6 +7,8 @@ public class RedisVersion implements Comparable { public static final RedisVersion V7_4 = RedisVersion.of("7.4"); public static final RedisVersion V8_0_0_PRE = RedisVersion.of("7.9.0"); public static final RedisVersion V8_0_0 = RedisVersion.of("8.0.0"); + public static final String V8_4_RC1_STRING = "8.3.224"; + public static final RedisVersion V8_4_RC1 = RedisVersion.of(V8_4_RC1_STRING); public static final RedisVersion V8_4_0 = RedisVersion.of("8.4.0"); private final int major; @@ -91,4 +93,4 @@ public boolean isGreaterThan(RedisVersion other) { public static int compare(RedisVersion v1, RedisVersion v2) { return v1.compareTo(v2); } -} \ No newline at end of file +} diff --git a/src/test/java/redis/clients/jedis/commands/commandobjects/CommandObjectsStreamCommandsTest.java b/src/test/java/redis/clients/jedis/commands/commandobjects/CommandObjectsStreamCommandsTest.java index 30bfd654e8..1a8f74154f 100644 --- a/src/test/java/redis/clients/jedis/commands/commandobjects/CommandObjectsStreamCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/commandobjects/CommandObjectsStreamCommandsTest.java @@ -1,5 +1,9 @@ package redis.clients.jedis.commands.commandobjects; + +import io.redis.test.annotations.SinceRedisVersion; + +import static io.redis.test.utils.RedisVersion.V8_4_RC1_STRING; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -1102,4 +1106,121 @@ public void testXReadGroupAsMap() { assertThat(xreadGroupConsumer2.get(streamKey).get(0).getID(), equalTo(secondMessageId)); assertThat(xreadGroupConsumer2.get(streamKey).get(0).getFields(), equalTo(messageBody)); } + + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimReturnsPendingThenNewEntries_commandObjects() throws InterruptedException { + String key = "co-claim-stream"; + String group = "co-claim-group"; + String consumer = "c1"; + + exec(commandObjects.del(key)); + exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true)); + + // Make 3 entries pending + exec(commandObjects.xadd(key, new StreamEntryID("1-0"), Collections.singletonMap("f", "v"))); + exec(commandObjects.xadd(key, new StreamEntryID("2-0"), Collections.singletonMap("f", "v"))); + exec(commandObjects.xadd(key, new StreamEntryID("3-0"), Collections.singletonMap("f", "v"))); + + Map stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(3), stream)); + + Thread.sleep(60); + + // Add two fresh entries + exec(commandObjects.xadd(key, new StreamEntryID("4-0"), Collections.singletonMap("f", "v"))); + exec(commandObjects.xadd(key, new StreamEntryID("5-0"), Collections.singletonMap("f", "v"))); + + List>> messages = exec( + commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(5).claim(50), stream)); + + assertThat(messages, hasSize(1)); + List entries = messages.get(0).getValue(); + org.junit.jupiter.api.Assertions.assertEquals(5, entries.size()); + + for (int i = 0; i < 3; i++) { + org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getIdleTime()); + org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime()); + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimNoEligiblePendingReturnsOnlyNewEntries_commandObjects() { + String key = "co-claim-noeligible"; + String group = "co-claim-group"; + String consumer = "c1"; + + exec(commandObjects.del(key)); + exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true)); + + // Put 2 entries in PEL + exec(commandObjects.xadd(key, new StreamEntryID("1-0"), Collections.singletonMap("f", "v"))); + exec(commandObjects.xadd(key, new StreamEntryID("2-0"), Collections.singletonMap("f", "v"))); + Map stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(2), stream)); + + // Add two new entries that should be returned + exec(commandObjects.xadd(key, new StreamEntryID("3-0"), Collections.singletonMap("f", "v"))); + exec(commandObjects.xadd(key, new StreamEntryID("4-0"), Collections.singletonMap("f", "v"))); + + List>> messages = exec( + commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(4).claim(500), stream)); + + assertThat(messages, hasSize(1)); + List entries = messages.get(0).getValue(); + org.junit.jupiter.api.Assertions.assertEquals(2, entries.size()); + for (StreamEntry e : entries) { + org.junit.jupiter.api.Assertions.assertNull(e.getIdleTime()); + org.junit.jupiter.api.Assertions.assertNull(e.getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimAndNoAckDoesNotAddNewEntriesToPEL_commandObjects() throws InterruptedException { + String key = "co-claim-noack"; + String group = "co-claim-group"; + String consumer = "c1"; + + exec(commandObjects.del(key)); + exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true)); + + // Make 3 entries pending + exec(commandObjects.xadd(key, new StreamEntryID("1-0"), java.util.Collections.singletonMap("f", "v"))); + exec(commandObjects.xadd(key, new StreamEntryID("2-0"), java.util.Collections.singletonMap("f", "v"))); + exec(commandObjects.xadd(key, new StreamEntryID("3-0"), java.util.Collections.singletonMap("f", "v"))); + java.util.Map stream = java.util.Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(3), stream)); + + // Wait then add fresh entries + Thread.sleep(60); + exec(commandObjects.xadd(key, new StreamEntryID("4-0"), java.util.Collections.singletonMap("f", "v"))); + exec(commandObjects.xadd(key, new StreamEntryID("5-0"), java.util.Collections.singletonMap("f", "v"))); + + // Read with CLAIM and NOACK + java.util.List>> messages = exec( + commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(5).claim(50).noAck(), stream)); + + assertThat(messages, hasSize(1)); + java.util.List entries = messages.get(0).getValue(); + org.junit.jupiter.api.Assertions.assertEquals(5, entries.size()); + for (int i = 0; i < 3; i++) { + org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getIdleTime()); + org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime()); + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes()); + } + + Long acked = exec(commandObjects.xack(key, group, new StreamEntryID("4-0"), new StreamEntryID("5-0"))); + org.junit.jupiter.api.Assertions.assertEquals(0L, acked); + } + } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java index 85b1a11a44..95ff2f191b 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import static io.redis.test.utils.RedisVersion.V8_4_RC1_STRING; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; @@ -527,4 +528,127 @@ public void testXtrimWithAcknowledged() { assertTrue(jedis.xlen(STREAM_KEY_1) <= 5); // Should not exceed original length } + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupBinaryWithClaimReturnsPendingThenNewEntries() throws InterruptedException { + // Make 3 entries pending + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroupBinary(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + Thread.sleep(60); + + // Add new entries + jedis.xadd(STREAM_KEY_1, new XAddParams().id("4-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("5-0"), HASH_2); + + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(5).claim(50), streams); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(5, entries.size()); + + for (int i = 0; i < 3; i++) { + assertNotNull(entries.get(i).getIdleTime()); + assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + assertNull(entries.get(i).getIdleTime()); + assertNull(entries.get(i).getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupBinaryWithClaimNoEligiblePendingReturnsOnlyNewEntries() { + // Make 2 entries pending but below min-idle + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroupBinary(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams); + + // Add new entries that should be returned + jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("4-0"), HASH_2); + + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(4).claim(500), streams); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(2, entries.size()); + for (StreamEntryBinary e : entries) { + assertNull(e.getIdleTime()); + assertNull(e.getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupBinaryWithClaimRespectsCountAndReturnsPendingFirst() throws InterruptedException { + // Make 4 entries pending + for (int i = 1; i <= 4; i++) { + jedis.xadd(STREAM_KEY_1, new XAddParams().id(i + "-0"), HASH_1); + } + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroupBinary(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(4), streams); + + Thread.sleep(60); + + // Add 2 fresh entries + jedis.xadd(STREAM_KEY_1, new XAddParams().id("5-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("6-0"), HASH_2); + + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2).claim(50), streams); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(2, entries.size()); + for (StreamEntryBinary e : entries) { + assertNotNull(e.getIdleTime()); + assertNotNull(e.getDeliveredTimes()); + } + } + + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupBinaryWithClaimAndNoAckDoesNotAddNewEntriesToPEL() throws InterruptedException { + // Make 3 entries pending + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroupBinary(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + // Wait, then add fresh entries + Thread.sleep(60); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("4-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("5-0"), HASH_2); + + // Read with CLAIM and NOACK + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(5).claim(50).noAck(), streams); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(5, entries.size()); + for (int i = 0; i < 3; i++) { + assertNotNull(entries.get(i).getIdleTime()); + assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + assertNull(entries.get(i).getIdleTime()); + assertNull(entries.get(i).getDeliveredTimes()); + } + + // NOACK must ensure fresh entries are not part of PEL; acknowledging them should be a no-op + long ackedNew = jedis.xack(STREAM_KEY_1, GROUP_NAME, "4-0".getBytes(), "5-0".getBytes()); + assertEquals(0L, ackedNew); + } + } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java index 691b7fa427..a35447e081 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java @@ -1,5 +1,6 @@ package redis.clients.jedis.commands.jedis; +import static io.redis.test.utils.RedisVersion.V8_4_RC1_STRING; import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; @@ -1426,4 +1427,171 @@ public void transaction() { assertEquals(id2.get(), entries.get(1).getID()); assertEquals(map, entries.get(1).getFields()); } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimReturnsPendingThenNewEntries() throws InterruptedException { + String key = "xrg-claim-stream"; + String group = "xrg-claim-group"; + String consumer = "c1"; + + jedis.del(key); + jedis.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true); + + // Make three entries pending (in PEL) + jedis.xadd(key, new StreamEntryID("1-0"), singletonMap("f", "v")); + jedis.xadd(key, new StreamEntryID("2-0"), singletonMap("f", "v")); + jedis.xadd(key, new StreamEntryID("3-0"), singletonMap("f", "v")); + Map streams = singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(3), streams); + + // Wait a bit so they become eligible according to min-idle + Thread.sleep(60); + + // Add two new (never delivered) entries + jedis.xadd(key, new StreamEntryID("4-0"), singletonMap("f", "v")); + jedis.xadd(key, new StreamEntryID("5-0"), singletonMap("f", "v")); + + List>> messages = jedis.xreadGroup( + group, consumer, XReadGroupParams.xReadGroupParams().count(5).claim(50), streams); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(5, entries.size()); + + // First 3 must be claimed PEL entries with metadata + for (int i = 0; i < 3; i++) { + assertNotNull(entries.get(i).getIdleTime()); + assertNotNull(entries.get(i).getDeliveredTimes()); + } + // Last 2 are fresh entries with no metadata + for (int i = 3; i < 5; i++) { + assertNull(entries.get(i).getIdleTime()); + assertNull(entries.get(i).getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimNoEligiblePendingReturnsOnlyNewEntries() throws InterruptedException { + String key = "xrg-claim-noeligible"; + String group = "xrg-claim-group"; + String consumer = "c1"; + + jedis.del(key); + jedis.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true); + + // Put 2 entries in PEL but do NOT wait enough for min-idle + jedis.xadd(key, new StreamEntryID("1-0"), singletonMap("f", "v")); + jedis.xadd(key, new StreamEntryID("2-0"), singletonMap("f", "v")); + Map streams = singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(2), streams); + + // Add new entries that should be returned instead of pending ones + jedis.xadd(key, new StreamEntryID("3-0"), singletonMap("f", "v")); + jedis.xadd(key, new StreamEntryID("4-0"), singletonMap("f", "v")); + + List>> messages = jedis.xreadGroup( + group, consumer, XReadGroupParams.xReadGroupParams().count(4).claim(500), streams); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(2, entries.size()); + for (StreamEntry e : entries) { + assertNull(e.getIdleTime()); + assertNull(e.getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimRespectsCountAndReturnsPendingFirst() throws InterruptedException { + String key = "xrg-claim-count"; + String group = "xrg-claim-group"; + String consumer = "c1"; + + jedis.del(key); + jedis.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true); + + // Make 4 entries pending + for (int i = 1; i <= 4; i++) { + jedis.xadd(key, new StreamEntryID(i + "-0"), singletonMap("f", "v")); + } + Map streams = singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(4), streams); + + Thread.sleep(60); + + // Add 2 fresh entries + jedis.xadd(key, new StreamEntryID("5-0"), singletonMap("f", "v")); + jedis.xadd(key, new StreamEntryID("6-0"), singletonMap("f", "v")); + + List>> messages = jedis.xreadGroup( + group, consumer, XReadGroupParams.xReadGroupParams().count(2).claim(50), streams); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(2, entries.size()); + for (StreamEntry e : entries) { + assertNotNull(e.getIdleTime()); + assertNotNull(e.getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimAndNoAckDoesNotAddNewEntriesToPEL() throws InterruptedException { + String key = "xrg-claim-noack"; + String group = "xrg-claim-group"; + String consumer = "c1"; + + jedis.del(key); + jedis.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true); + + // Make three entries pending (in PEL) + jedis.xadd(key, new StreamEntryID("1-0"), singletonMap("f", "v")); + jedis.xadd(key, new StreamEntryID("2-0"), singletonMap("f", "v")); + jedis.xadd(key, new StreamEntryID("3-0"), singletonMap("f", "v")); + Map streams = singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(3), streams); + + // Verify PEL contains the 3 pending entries + List pendingBefore = jedis.xpending(key, group, XPendingParams.xPendingParams().count(10)); + assertEquals(3, pendingBefore.size()); + + // Wait so PEL entries cross the idle threshold, then add two new entries + Thread.sleep(60); + jedis.xadd(key, new StreamEntryID("4-0"), singletonMap("f", "v")); + jedis.xadd(key, new StreamEntryID("5-0"), singletonMap("f", "v")); + + // Read with CLAIM and NOACK: pending first (with metadata), then fresh entries (no metadata) + List>> messages = jedis.xreadGroup( + group, consumer, XReadGroupParams.xReadGroupParams().count(5).claim(50).noAck(), streams); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(5, entries.size()); + + for (int i = 0; i < 3; i++) { + assertNotNull(entries.get(i).getIdleTime()); + assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + assertNull(entries.get(i).getIdleTime()); + assertNull(entries.get(i).getDeliveredTimes()); + } + + // NOACK must not add the fresh entries to PEL; only the original 3 should remain pending + List pendingAfter = jedis.xpending(key, group, XPendingParams.xPendingParams().count(20)); + assertEquals(3, pendingAfter.size()); + boolean has4 = false, has5 = false; + for (StreamPendingEntry pe : pendingAfter) { + String id = pe.getID().toString(); + if ("4-0".equals(id)) has4 = true; + if ("5-0".equals(id)) has5 = true; + } + assertFalse(has4); + assertFalse(has5); + } + } diff --git a/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java b/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java index a1ad0fd23d..b007c8354d 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java +++ b/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import static io.redis.test.utils.RedisVersion.V8_4_RC1_STRING; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; @@ -542,4 +543,86 @@ public void testXtrimWithAcknowledged() { assertTrue(jedis.xlen(STREAM_KEY_1) <= 5); // Should not exceed original length } + + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupBinaryWithClaimIncludesMetadata() throws InterruptedException { + setUpTestStream(); + + // Add initial entries and read to create PEL entries + for (int i = 1; i <= 3; i++) { + jedis.xadd(STREAM_KEY_1, new XAddParams().id(i + "-0"), HASH_1); + } + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroupBinary(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + // Wait so pending entries cross idle threshold + Thread.sleep(60); + + // Add new entries + jedis.xadd(STREAM_KEY_1, new XAddParams().id("4-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("5-0"), HASH_2); + + // Read with CLAIM + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, + XReadGroupParams.xReadGroupParams().count(5).claim(50), + streams); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(5, entries.size()); + + // First 3 are claimed pending entries and must contain metadata + for (int i = 0; i < 3; i++) { + assertNotNull(entries.get(i).getIdleTime()); + assertNotNull(entries.get(i).getDeliveredTimes()); + assertTrue(entries.get(i).getDeliveredTimes() >= 1); + } + + // Last 2 are new entries without claim metadata + for (int i = 3; i < 5; i++) { + assertNull(entries.get(i).getIdleTime()); + assertNull(entries.get(i).getDeliveredTimes()); + } + } + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupBinaryWithClaimAndNoAckDoesNotAddNewEntriesToPEL() throws InterruptedException { + setUpTestStream(); + + // Make 3 entries pending + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroupBinary(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams); + + // Wait then add fresh entries + Thread.sleep(60); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("4-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("5-0"), HASH_2); + + // Read with CLAIM and NOACK + List>> messages = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(5).claim(50).noAck(), streams); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(5, entries.size()); + for (int i = 0; i < 3; i++) { + assertNotNull(entries.get(i).getIdleTime()); + assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + assertNull(entries.get(i).getIdleTime()); + assertNull(entries.get(i).getDeliveredTimes()); + } + + long ackedNew = jedis.xack(STREAM_KEY_1, GROUP_NAME, "4-0".getBytes(), "5-0".getBytes()); + assertEquals(0L, ackedNew); + } + + } diff --git a/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java b/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java index ecbd23020b..650db00c92 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java +++ b/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import static io.redis.test.utils.RedisVersion.V8_4_RC1_STRING; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; @@ -815,4 +816,85 @@ public void xdelexNotAcknowledged() { assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED, result.get(0)); } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimReturnsPendingThenNewEntries() throws InterruptedException { + setUpTestStream(); + + // Add initial entries and read them with the consumer group to create PEL entries + populateTestStreamWithValues(STREAM_KEY_1, 3, HASH_1); + Map streamQuery = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streamQuery); + + // Wait so the PEL entries cross the idle threshold + Thread.sleep(60); + + // Add new entries that should come after claimed PEL entries + jedis.xadd(STREAM_KEY_1, new StreamEntryID("4-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new StreamEntryID("5-0"), HASH_2); + + // Read with CLAIM so idle PEL entries are returned first, followed by new entries + List>> messages = jedis.xreadGroup( + GROUP_NAME, CONSUMER_NAME, + XReadGroupParams.xReadGroupParams().count(5).claim(50), + streamQuery); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(5, entries.size()); + + // First 3 should be claimed pending entries with metadata + for (int i = 0; i < 3; i++) { + assertNotNull(entries.get(i).getIdleTime()); + assertNotNull(entries.get(i).getDeliveredTimes()); + assertTrue(entries.get(i).getDeliveredTimes() >= 1); + } + + // Last 2 are new entries, without claim metadata + for (int i = 3; i < 5; i++) { + assertNull(entries.get(i).getIdleTime()); + assertNull(entries.get(i).getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimAndNoAckDoesNotAddNewEntriesToPEL() throws InterruptedException { + setUpTestStream(); + + // Make 3 entries pending + populateTestStreamWithValues(STREAM_KEY_1, 3, HASH_1); + Map streamQuery = singletonMap(STREAM_KEY_1, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streamQuery); + + // Wait so PEL entries cross the idle threshold, then add fresh entries + Thread.sleep(60); + jedis.xadd(STREAM_KEY_1, new StreamEntryID("4-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new StreamEntryID("5-0"), HASH_2); + + // Read with CLAIM and NOACK: pending first (with metadata), then fresh entries (no metadata) + List>> messages = jedis.xreadGroup( + GROUP_NAME, CONSUMER_NAME, + XReadGroupParams.xReadGroupParams().count(5).claim(50).noAck(), + streamQuery); + + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(5, entries.size()); + + for (int i = 0; i < 3; i++) { + assertNotNull(entries.get(i).getIdleTime()); + assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + assertNull(entries.get(i).getIdleTime()); + assertNull(entries.get(i).getDeliveredTimes()); + } + + StreamPendingSummary sumAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME); + assertEquals(3L, sumAfter.getTotal()); + } + } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pipeline/BinaryStreamsPipelineCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pipeline/BinaryStreamsPipelineCommandsTest.java index c22559e7a2..375b414422 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pipeline/BinaryStreamsPipelineCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pipeline/BinaryStreamsPipelineCommandsTest.java @@ -1,5 +1,7 @@ package redis.clients.jedis.commands.unified.pipeline; +import io.redis.test.annotations.SinceRedisVersion; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; @@ -18,6 +20,8 @@ import java.util.List; import java.util.Map; +import static io.redis.test.utils.RedisVersion.V8_4_RC1; +import static io.redis.test.utils.RedisVersion.V8_4_RC1_STRING; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; @@ -115,7 +119,7 @@ public void xreadBinary() { Response>>> response = pipe.xreadBinary( XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); - + pipe.sync(); List>> actualEntries = response.get(); @@ -221,4 +225,155 @@ public void xreadGroupBinaryAsMapMultipleStreams() { assertThat(actualEntries.get(STREAM_KEY_2), equalsStreamEntries(stream2Entries)); } + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupBinaryWithClaimReturnsPendingThenNewEntries_pipeline() throws InterruptedException { + // Make 3 entries pending + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + + Response>>> resp = pipe.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + pipe.sync(); + assertThat(resp.get(), hasSize(1)); + + Thread.sleep(60); + + // Add two fresh entries + jedis.xadd(STREAM_KEY_1, new XAddParams().id("4-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("5-0"), HASH_2); + + Response>>> claimResp = pipe.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(5).claim(50), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + pipe.sync(); + + List>> messages = claimResp.get(); + assertThat(messages, hasSize(1)); + List entries = messages.get(0).getValue(); + org.junit.jupiter.api.Assertions.assertEquals(5, entries.size()); + + for (int i = 0; i < 3; i++) { + org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getIdleTime()); + org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime()); + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupBinaryWithClaimNoEligiblePendingReturnsOnlyNewEntries_pipeline() { + // Make 2 entries pending but below min-idle + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + + Response>>> resp = pipe.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + pipe.sync(); + assertThat(resp.get(), hasSize(1)); + + // Add fresh entries that should be returned + jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("4-0"), HASH_2); + + Response>>> claimResp = pipe.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(4).claim(500), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + pipe.sync(); + + List>> messages = claimResp.get(); + assertThat(messages, hasSize(1)); + List entries = messages.get(0).getValue(); + org.junit.jupiter.api.Assertions.assertEquals(2, entries.size()); + for (StreamEntryBinary e : entries) { + org.junit.jupiter.api.Assertions.assertNull(e.getIdleTime()); + org.junit.jupiter.api.Assertions.assertNull(e.getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupBinaryWithClaimRespectsCountAndReturnsPendingFirst_pipeline() throws InterruptedException { + // Make 3 entries pending + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + + Response>>> resp = pipe.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + pipe.sync(); + assertThat(resp.get(), hasSize(1)); + + Thread.sleep(60); + + // Add new entries + jedis.xadd(STREAM_KEY_1, new XAddParams().id("4-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("5-0"), HASH_2); + + Response>>> claimResp = pipe.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2).claim(50), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + pipe.sync(); + + List>> messages = claimResp.get(); + assertThat(messages, hasSize(1)); + List entries = messages.get(0).getValue(); + org.junit.jupiter.api.Assertions.assertEquals(2, entries.size()); + for (StreamEntryBinary e : entries) { + org.junit.jupiter.api.Assertions.assertNotNull(e.getIdleTime()); + org.junit.jupiter.api.Assertions.assertNotNull(e.getDeliveredTimes()); + } + } + + + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupBinaryWithClaimAndNoAckDoesNotAddNewEntriesToPEL_pipeline() throws InterruptedException { + // Make 3 entries pending + jedis.xadd(STREAM_KEY_1, new XAddParams().id("1-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("2-0"), HASH_2); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("3-0"), HASH_1); + + Response>>> first = pipe.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + pipe.sync(); + assertThat(first.get(), hasSize(1)); + + // Wait then add fresh entries + Thread.sleep(60); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("4-0"), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id("5-0"), HASH_2); + + // Read with CLAIM and NOACK + Response>>> resp = pipe.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(5).claim(50).noAck(), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + pipe.sync(); + + List>> messages = resp.get(); + assertThat(messages, hasSize(1)); + List entries = messages.get(0).getValue(); + org.junit.jupiter.api.Assertions.assertEquals(5, entries.size()); + for (int i = 0; i < 3; i++) { + org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getIdleTime()); + org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime()); + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes()); + } + + long acked = jedis.xack(STREAM_KEY_1, GROUP_NAME, "4-0".getBytes(), "5-0".getBytes()); + org.junit.jupiter.api.Assertions.assertEquals(0L, acked); + } + } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java index 4a6fba0101..aed9ee31b6 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java @@ -1,5 +1,6 @@ package redis.clients.jedis.commands.unified.pipeline; +import static io.redis.test.utils.RedisVersion.V8_4_RC1_STRING; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.both; @@ -1323,4 +1324,171 @@ public void xinfoStreamFullWithPending() { List consumerPendingEntry = consumer.getPending().get(0); assertEquals(id1, consumerPendingEntry.get(0)); } + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimReturnsPendingThenNewEntries_pipeline() throws InterruptedException { + String key = "xrgp-claim-stream"; + String group = "xrgp-claim-group"; + String consumer = "c1"; + + jedis.del(key); + jedis.xgroupCreate(key, group, null, true); + + // Make 3 entries pending + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + Map streams = singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + pipe.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(3), streams); + pipe.sync(); + + Thread.sleep(60); + + // Add two fresh entries + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + + Response>>> resp = + pipe.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(5).claim(50), streams); + pipe.sync(); + + List>> messages = resp.get(); + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(5, entries.size()); + + for (int i = 0; i < 3; i++) { + assertNotNull(entries.get(i).getIdleTime()); + assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime()); + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion("8.3.90") + public void xreadGroupWithClaimNoEligiblePendingReturnsOnlyNewEntries_pipeline() { + String key = "xrgp-claim-noeligible"; + String group = "xrgp-claim-group"; + String consumer = "c1"; + + jedis.del(key); + jedis.xgroupCreate(key, group, null, true); + + // Make 2 entries pending but keep below min-idle + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + Map streams = singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + Response>>> first = + pipe.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(2), streams); + pipe.sync(); + assertThat(first.get(), hasSize(1)); + + // Add new entries that should be returned + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + + Response>>> resp = + pipe.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(4).claim(500), streams); + pipe.sync(); + + List>> messages = resp.get(); + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(2, entries.size()); + for (StreamEntry e : entries) { + org.junit.jupiter.api.Assertions.assertNull(e.getIdleTime()); + org.junit.jupiter.api.Assertions.assertNull(e.getDeliveredTimes()); + } + } + + @Test + @SinceRedisVersion("8.3.90") + public void xreadGroupWithClaimRespectsCountAndReturnsPendingFirst_pipeline() throws InterruptedException { + String key = "xrgp-claim-count"; + String group = "xrgp-claim-group"; + String consumer = "c1"; + + jedis.del(key); + jedis.xgroupCreate(key, group, null, true); + + // Make 3 entries pending + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + Map streams = singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + pipe.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(3), streams); + pipe.sync(); + + Thread.sleep(60); + + // Add new entries + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + + Response>>> resp = + pipe.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(2).claim(50), streams); + pipe.sync(); + + List>> messages = resp.get(); + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(2, entries.size()); + for (StreamEntry e : entries) { + assertNotNull(e.getIdleTime()); + assertNotNull(e.getDeliveredTimes()); + } + } + + + @Test + @SinceRedisVersion(V8_4_RC1_STRING) + public void xreadGroupWithClaimAndNoAckDoesNotAddNewEntriesToPEL_pipeline() throws InterruptedException { + String key = "xrgp-claim-noack"; + String group = "xrgp-claim-group"; + String consumer = "c1"; + + jedis.del(key); + jedis.xgroupCreate(key, group, null, true); + + // Make 3 entries pending + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + Map streams = singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + pipe.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(3), streams); + pipe.sync(); + + // Wait, then add fresh entries + Thread.sleep(60); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + jedis.xadd(key, (StreamEntryID) null, singletonMap("f", "v")); + + // Read with CLAIM and NOACK + Response>>> resp = + pipe.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(5).claim(50).noAck(), streams); + pipe.sync(); + + List>> messages = resp.get(); + assertEquals(1, messages.size()); + List entries = messages.get(0).getValue(); + assertEquals(5, entries.size()); + + for (int i = 0; i < 3; i++) { + assertNotNull(entries.get(i).getIdleTime()); + assertNotNull(entries.get(i).getDeliveredTimes()); + } + for (int i = 3; i < 5; i++) { + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime()); + org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes()); + } + + // NOACK must not add the fresh entries to PEL; only the original 3 should remain pending + List pendingAfter = jedis.xpending(key, group, XPendingParams.xPendingParams().count(20)); + assertEquals(3, pendingAfter.size()); + } + } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledCommandsTestHelper.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledCommandsTestHelper.java index 7136142e80..1b4b01be5a 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledCommandsTestHelper.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledCommandsTestHelper.java @@ -13,7 +13,10 @@ public static JedisPooled getPooled(RedisProtocol redisProtocol) { public static void clearData() { try (Jedis node = new Jedis(nodeInfo.getHostAndPort())) { - node.auth(nodeInfo.getPassword()); + String password = nodeInfo.getPassword(); + if (password != null && !password.isEmpty()) { + node.auth(password); + } node.flushAll(); } }