Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,13 @@ public StreamEntry build(Object data) {
while (hashIterator.hasNext()) {
map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
}
return new StreamEntry(entryID, map);
Long idle = null;
Long times = null;
if (objectList.size() >= 4) {
idle = LONG.build(objectList.get(2));
times = LONG.build(objectList.get(3));
}
return new StreamEntry(entryID, map, idle, times);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly unused code

}

@Override
Expand Down Expand Up @@ -1418,7 +1424,19 @@ public List<StreamEntry> build(Object data) {
while (hashIterator.hasNext()) {
map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
}
responses.add(new StreamEntry(entryID, map));
Long idle = null;
Long times = null;
if (res.size() >= 4) {
Object idleObj = res.get(2);
Object timesObj = res.get(3);
idle = (idleObj instanceof Long) ? (Long) idleObj : Long.valueOf(STRING.build(idleObj));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useless ternary - type is byteArray and it should be parsed to String via builder

times = (timesObj instanceof Long) ? (Long) timesObj : Long.valueOf(STRING.build(timesObj));
if (idle != null && times != null && idle == 0L && times == 0L) {
idle = null;
times = null;
}
}
responses.add(new StreamEntry(entryID, map, idle, times));
}

return responses;
Expand Down Expand Up @@ -1969,7 +1987,19 @@ public List<StreamEntryBinary> build(Object data) {
while (hashIterator.hasNext()) {
map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next()));
}
responses.add(new StreamEntryBinary(entryID, map));
Long idle = null;
Long times = null;
if (res.size() >= 4) {
Object idleObj = res.get(2);
Object timesObj = res.get(3);
idle = (idleObj instanceof Long) ? (Long) idleObj : Long.valueOf(STRING.build(idleObj));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

times = (timesObj instanceof Long) ? (Long) timesObj : Long.valueOf(STRING.build(timesObj));
if (idle != null && times != null && idle == 0L && times == 0L) {
idle = null;
times = null;
}
}
responses.add(new StreamEntryBinary(entryID, map, idle, times));
}

return responses;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public static enum Keyword implements Rawable {
AGGREGATE, ALPHA, BY, GET, LIMIT, NO, NOSORT, ONE, SET, STORE, WEIGHTS, WITHSCORE, WITHSCORES,
RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, LEN, HELP, SCHEDULE, MATCH, COUNT, TYPE, KEYS,
REFCOUNT, ENCODING, IDLETIME, FREQ, REPLACE, GETNAME, SETNAME, SETINFO, LIST, ID, KILL, PERSIST,
STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK,
STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK, CLAIM,
RETRYCOUNT, STREAM, GROUPS, CONSUMERS, JUSTID, WITHVALUES, NOMKSTREAM, MINID, CREATECONSUMER,
SETUSER, GETUSER, DELUSER, WHOAMI, USERS, CAT, GENPASS, LOG, SAVE, DRYRUN, COPY, AUTH, AUTH2,
NX, XX, EX, PX, EXAT, PXAT, ABSTTL, KEEPTTL, INCR, LT, GT, CH, INFO, PAUSE, UNPAUSE, UNBLOCK,
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/redis/clients/jedis/params/XReadGroupParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class XReadGroupParams implements IParams {
private Integer count = null;
private Integer block = null;
private boolean noack = false;
private Long claim = null;

public static XReadGroupParams xReadGroupParams() {
return new XReadGroupParams();
Expand All @@ -30,6 +31,12 @@ public XReadGroupParams noAck() {
return this;
}

public XReadGroupParams claim(long minIdleMillis) {
this.claim = minIdleMillis;
return this;
}


@Override
public void addParams(CommandArguments args) {
if (count != null) {
Expand All @@ -41,18 +48,22 @@ public void addParams(CommandArguments args) {
if (noack) {
args.add(Keyword.NOACK);
}
if (claim != null) {
args.add(Keyword.CLAIM).add(claim);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
XReadGroupParams that = (XReadGroupParams) o;
return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block);
return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block)
&& Objects.equals(claim, that.claim);
}

@Override
public int hashCode() {
return Objects.hash(count, block, noack);
return Objects.hash(count, block, noack, claim);
}
}
18 changes: 18 additions & 0 deletions src/main/java/redis/clients/jedis/resps/StreamEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,28 @@ public class StreamEntry implements Serializable {
private StreamEntryID id;
private Map<String, String> fields;

private Long idleTime; // milliseconds since last delivery (claimed entries)
private Long deliveredTimes; // delivery count (claimed entries)

public StreamEntry(StreamEntryID id, Map<String, String> fields) {
this.id = id;
this.fields = fields;
}
public StreamEntry(StreamEntryID id, Map<String, String> fields, Long idleTime, Long deliveredTimes) {
this.id = id;
this.fields = fields;
this.idleTime = idleTime;
this.deliveredTimes = deliveredTimes;
}

public Long getIdleTime() {
return idleTime;
}

public Long getDeliveredTimes() {
return deliveredTimes;
}


public StreamEntryID getID() {
return id;
Expand Down
20 changes: 19 additions & 1 deletion src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,28 @@ public class StreamEntryBinary implements Serializable {
private StreamEntryID id;
private Map<byte[], byte[]> fields;

private Long idleTime; // milliseconds since last delivery (claimed entries)
private Long deliveredTimes; // delivery count (claimed entries)

public StreamEntryBinary(StreamEntryID id, Map<byte[], byte[]> fields) {
this.id = id;
this.fields = fields;
}
public StreamEntryBinary(StreamEntryID id, Map<byte[], byte[]> fields, Long idleTime, Long deliveredTimes) {
this.id = id;
this.fields = fields;
this.idleTime = idleTime;
this.deliveredTimes = deliveredTimes;
}

public Long getIdleTime() {
return idleTime;
}

public Long getDeliveredTimes() {
return deliveredTimes;
}


public StreamEntryID getID() {
return id;
Expand All @@ -39,4 +57,4 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN
this.id = (StreamEntryID) in.readUnshared();
this.fields = (Map<byte[], byte[]>) in.readUnshared();
}
}
}
4 changes: 3 additions & 1 deletion src/test/java/io/redis/test/utils/RedisVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public class RedisVersion implements Comparable<RedisVersion> {
public static final RedisVersion V7_4 = RedisVersion.of("7.4");
public static final RedisVersion V8_0_0_PRE = RedisVersion.of("7.9.0");
public static final RedisVersion V8_0_0 = RedisVersion.of("8.0.0");
public static final String V8_4_RC1_STRING = "8.3.224";
public static final RedisVersion V8_4_RC1 = RedisVersion.of(V8_4_RC1_STRING);
public static final RedisVersion V8_4_0 = RedisVersion.of("8.4.0");

private final int major;
Expand Down Expand Up @@ -91,4 +93,4 @@ public boolean isGreaterThan(RedisVersion other) {
public static int compare(RedisVersion v1, RedisVersion v2) {
return v1.compareTo(v2);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package redis.clients.jedis.commands.commandobjects;


import io.redis.test.annotations.SinceRedisVersion;

import static io.redis.test.utils.RedisVersion.V8_4_RC1_STRING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -1102,4 +1106,121 @@ public void testXReadGroupAsMap() {
assertThat(xreadGroupConsumer2.get(streamKey).get(0).getID(), equalTo(secondMessageId));
assertThat(xreadGroupConsumer2.get(streamKey).get(0).getFields(), equalTo(messageBody));
}


@Test
@SinceRedisVersion(V8_4_RC1_STRING)
public void xreadGroupWithClaimReturnsPendingThenNewEntries_commandObjects() throws InterruptedException {
String key = "co-claim-stream";
String group = "co-claim-group";
String consumer = "c1";

exec(commandObjects.del(key));
exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true));

// Make 3 entries pending
exec(commandObjects.xadd(key, new StreamEntryID("1-0"), Collections.singletonMap("f", "v")));
exec(commandObjects.xadd(key, new StreamEntryID("2-0"), Collections.singletonMap("f", "v")));
exec(commandObjects.xadd(key, new StreamEntryID("3-0"), Collections.singletonMap("f", "v")));

Map<String, StreamEntryID> stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(3), stream));

Thread.sleep(60);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lessen time waiting or think of a way to modify tests.


// Add two fresh entries
exec(commandObjects.xadd(key, new StreamEntryID("4-0"), Collections.singletonMap("f", "v")));
exec(commandObjects.xadd(key, new StreamEntryID("5-0"), Collections.singletonMap("f", "v")));

List<Map.Entry<String, List<StreamEntry>>> messages = exec(
commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(5).claim(50), stream));

assertThat(messages, hasSize(1));
List<StreamEntry> entries = messages.get(0).getValue();
org.junit.jupiter.api.Assertions.assertEquals(5, entries.size());

for (int i = 0; i < 3; i++) {
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getIdleTime());
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getDeliveredTimes());
}
for (int i = 3; i < 5; i++) {
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime());
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes());
}
}

@Test
@SinceRedisVersion(V8_4_RC1_STRING)
public void xreadGroupWithClaimNoEligiblePendingReturnsOnlyNewEntries_commandObjects() {
String key = "co-claim-noeligible";
String group = "co-claim-group";
String consumer = "c1";

exec(commandObjects.del(key));
exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true));

// Put 2 entries in PEL
exec(commandObjects.xadd(key, new StreamEntryID("1-0"), Collections.singletonMap("f", "v")));
exec(commandObjects.xadd(key, new StreamEntryID("2-0"), Collections.singletonMap("f", "v")));
Map<String, StreamEntryID> stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(2), stream));

// Add two new entries that should be returned
exec(commandObjects.xadd(key, new StreamEntryID("3-0"), Collections.singletonMap("f", "v")));
exec(commandObjects.xadd(key, new StreamEntryID("4-0"), Collections.singletonMap("f", "v")));

List<Map.Entry<String, List<StreamEntry>>> messages = exec(
commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(4).claim(500), stream));

assertThat(messages, hasSize(1));
List<StreamEntry> entries = messages.get(0).getValue();
org.junit.jupiter.api.Assertions.assertEquals(2, entries.size());
for (StreamEntry e : entries) {
org.junit.jupiter.api.Assertions.assertNull(e.getIdleTime());
org.junit.jupiter.api.Assertions.assertNull(e.getDeliveredTimes());
}
}

@Test
@SinceRedisVersion(V8_4_RC1_STRING)
public void xreadGroupWithClaimAndNoAckDoesNotAddNewEntriesToPEL_commandObjects() throws InterruptedException {
String key = "co-claim-noack";
String group = "co-claim-group";
String consumer = "c1";

exec(commandObjects.del(key));
exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true));

// Make 3 entries pending
exec(commandObjects.xadd(key, new StreamEntryID("1-0"), java.util.Collections.singletonMap("f", "v")));
exec(commandObjects.xadd(key, new StreamEntryID("2-0"), java.util.Collections.singletonMap("f", "v")));
exec(commandObjects.xadd(key, new StreamEntryID("3-0"), java.util.Collections.singletonMap("f", "v")));
java.util.Map<String, StreamEntryID> stream = java.util.Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(3), stream));

// Wait then add fresh entries
Thread.sleep(60);
exec(commandObjects.xadd(key, new StreamEntryID("4-0"), java.util.Collections.singletonMap("f", "v")));
exec(commandObjects.xadd(key, new StreamEntryID("5-0"), java.util.Collections.singletonMap("f", "v")));

// Read with CLAIM and NOACK
java.util.List<java.util.Map.Entry<String, java.util.List<StreamEntry>>> messages = exec(
commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(5).claim(50).noAck(), stream));

assertThat(messages, hasSize(1));
java.util.List<StreamEntry> entries = messages.get(0).getValue();
org.junit.jupiter.api.Assertions.assertEquals(5, entries.size());
for (int i = 0; i < 3; i++) {
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getIdleTime());
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getDeliveredTimes());
}
for (int i = 3; i < 5; i++) {
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime());
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes());
}

Long acked = exec(commandObjects.xack(key, group, new StreamEntryID("4-0"), new StreamEntryID("5-0")));
org.junit.jupiter.api.Assertions.assertEquals(0L, acked);
}

}
Loading
Loading