From bab3ed05f37d75829a026faac3950fbc73e8f09b Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Thu, 23 Oct 2025 14:00:54 -0400 Subject: [PATCH] Allow overriding the keyed hashing router We want to implement a custom router (similar to what we did for the scalar router). This updates the RouterFactory interface to include a function for getting a keyedRouter (i.e. `Router>`) It's backwards compatible and should not break any usages. --- .../mantis/network/push/RouterFactory.java | 24 +++++++++++++++++++ .../mantis/network/push/Routers.java | 22 ++++------------- .../WorkerPublisherRemoteObservable.java | 4 +++- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java index 6fc97c9e8..775d93cd8 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java @@ -2,6 +2,30 @@ import rx.functions.Func1; +import java.nio.ByteBuffer; + public interface RouterFactory { public Router scalarStageToStageRouter(String name, final Func1 toBytes); + + public default Router> keyedRouter(String name, + final Func1 keyEncoder, + final Func1 valueEncoder) { + return new ConsistentHashingRouter(name, new Func1, byte[]>() { + @Override + public byte[] call(KeyValuePair kvp) { + byte[] keyBytes = kvp.getKeyBytes(); + byte[] valueBytes = valueEncoder.call(kvp.getValue()); + return + // length + opcode + notification type + key length + ByteBuffer.allocate(4 + 1 + 1 + 4 + keyBytes.length + valueBytes.length) + .putInt(1 + 1 + 4 + keyBytes.length + valueBytes.length) // length + .put((byte) 1) // opcode + .put((byte) 1) // notification type + .putInt(keyBytes.length) // key length + .put(keyBytes) // key bytes + .put(valueBytes) // value bytes + .array(); + } + }, HashFunctions.xxh3()); + }; } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java index 7e8592fba..c508eb865 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java @@ -31,26 +31,14 @@ public class Routers implements RouterFactory { public Routers() {} + /** + * Deprecated: use RouterFactory.keyedRouter instead + */ + @Deprecated public static Router> consistentHashingLegacyTcpProtocol(String name, final Func1 keyEncoder, final Func1 valueEncoder) { - return new ConsistentHashingRouter(name, new Func1, byte[]>() { - @Override - public byte[] call(KeyValuePair kvp) { - byte[] keyBytes = kvp.getKeyBytes(); - byte[] valueBytes = valueEncoder.call(kvp.getValue()); - return - // length + opcode + notification type + key length - ByteBuffer.allocate(4 + 1 + 1 + 4 + keyBytes.length + valueBytes.length) - .putInt(1 + 1 + 4 + keyBytes.length + valueBytes.length) // length - .put((byte) 1) // opcode - .put((byte) 1) // notification type - .putInt(keyBytes.length) // key length - .put(keyBytes) // key bytes - .put(valueBytes) // value bytes - .array(); - } - }, HashFunctions.xxh3()); + return new Routers().keyedRouter(name, keyEncoder, valueEncoder); } private static byte[] dataPayload(byte[] data) { diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java index 73092c926..4cbc3196f 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java @@ -135,6 +135,8 @@ private LegacyTcpPushServer> startKeyValueStage(KeyValueS Func1 valueEncoder = t1 -> stage.getOutputCodec().encode(t1); Func1 keyEncoder = t1 -> stage.getOutputKeyCodec().encode(t1); + Router> router = this.routerFactory.keyedRouter(jobName, keyEncoder, valueEncoder); + ServerConfig> config = new ServerConfig.Builder>() .name(name) .port(serverPort) @@ -144,7 +146,7 @@ private LegacyTcpPushServer> startKeyValueStage(KeyValueS .maxChunkTimeMSec(maxChunkTimeMSec()) .bufferCapacity(bufferCapacity()) .useSpscQueue(useSpsc()) - .router(Routers.consistentHashingLegacyTcpProtocol(jobName, keyEncoder, valueEncoder)) + .router(router) .build(); if (stage instanceof ScalarToGroup || stage instanceof GroupToGroup) {