diff --git a/pom.xml b/pom.xml index 4e84d69..7b478ce 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ vertx-tcp-eventbus-bridge - 4.3.2-SNAPSHOT + 999-SNAPSHOT Vert.x TCP EventBus Bridge @@ -54,7 +54,11 @@ io.vertx vertx-bridge-common - ${project.version} + + + + io.vertx + vertx-json-schema @@ -73,6 +77,11 @@ test-jar test + + io.vertx + vertx-web-client + test + junit junit diff --git a/src/client/nodejs/package.json b/src/client/nodejs/package.json index 13a1a99..645ea79 100644 --- a/src/client/nodejs/package.json +++ b/src/client/nodejs/package.json @@ -11,4 +11,4 @@ "scripts": { "test": "mocha ./test/index.js" } -} \ No newline at end of file +} diff --git a/src/main/java/examples/HttpBridgeExample.java b/src/main/java/examples/HttpBridgeExample.java new file mode 100644 index 0000000..6648791 --- /dev/null +++ b/src/main/java/examples/HttpBridgeExample.java @@ -0,0 +1,60 @@ +package examples; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.http.WebSocketBase; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.bridge.PermittedOptions; +import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions; +import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge; + +public class HttpBridgeExample extends AbstractVerticle { + + public static void main(String[] args) { + Vertx.vertx().deployVerticle(new HttpBridgeExample()); + } + + @Override + public void start(Promise start) { + vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value")))); + vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body())); + vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi"))); + + JsonRPCBridgeOptions options = new JsonRPCBridgeOptions() + .addInboundPermitted(new PermittedOptions().setAddress("hello")) + .addInboundPermitted(new PermittedOptions().setAddress("echo")) + .addInboundPermitted(new PermittedOptions().setAddress("ping")) + .addOutboundPermitted(new PermittedOptions().setAddress("echo")) + .addOutboundPermitted(new PermittedOptions().setAddress("hello")) + .addOutboundPermitted(new PermittedOptions().setAddress("ping")); + + Handler bridge = JsonRPCStreamEventBusBridge.httpSocketHandler(vertx, options, null); + + vertx + .createHttpServer() + .requestHandler(req -> { + if ("/".equals(req.path())) { + req.response() + .putHeader(HttpHeaders.CONTENT_TYPE, "text/html") + .sendFile("http.html"); + } else if ("/jsonrpc".equals(req.path())){ + bridge.handle(req); + } else { + req.response().setStatusCode(404).end("Not Found"); + } + }) + .listen(8080) + .onFailure(start::fail) + .onSuccess(server -> { + System.out.println("Server listening at http://localhost:8080"); + start.complete(); + }); + + } +} diff --git a/src/main/java/examples/HttpSSEBridgeExample.java b/src/main/java/examples/HttpSSEBridgeExample.java new file mode 100644 index 0000000..42ee4ac --- /dev/null +++ b/src/main/java/examples/HttpSSEBridgeExample.java @@ -0,0 +1,66 @@ +package examples; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.bridge.PermittedOptions; +import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions; +import io.vertx.ext.eventbus.bridge.tcp.impl.HttpJsonRPCStreamEventBusBridgeImpl; + +import java.util.function.Consumer; + +public class HttpSSEBridgeExample extends AbstractVerticle { + + public static void main(String[] args) { + Vertx.vertx().deployVerticle(new HttpSSEBridgeExample()); + } + + @Override + public void start(Promise start) { + // just to have some messages flowing around + vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value")))); + vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body())); + vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi"))); + + // use explicit class because SSE method is not on the interface currently + HttpJsonRPCStreamEventBusBridgeImpl bridge = new HttpJsonRPCStreamEventBusBridgeImpl( + vertx, + new JsonRPCBridgeOptions() + .addInboundPermitted(new PermittedOptions().setAddress("hello")) + .addInboundPermitted(new PermittedOptions().setAddress("echo")) + .addInboundPermitted(new PermittedOptions().setAddress("test")) + .addOutboundPermitted(new PermittedOptions().setAddress("echo")) + .addOutboundPermitted(new PermittedOptions().setAddress("test")) + .addOutboundPermitted(new PermittedOptions().setAddress("ping")), + event -> event.complete(true) + ); + + vertx + .createHttpServer() + .requestHandler(req -> { + // this is where any http request will land + // serve the base HTML application + if ("/".equals(req.path())) { + req.response() + .putHeader(HttpHeaders.CONTENT_TYPE, "text/html") + .sendFile("sse.html"); + } else if ("/jsonrpc".equals(req.path())) { + bridge.handle(req); + } else if ("/jsonrpc-sse".equals(req.path())) { + bridge.handleSSE(req, (int) (Math.random() * 100_000), new JsonObject().put("address", "ping")); + } else { + req.response().setStatusCode(404).end("Not Found"); + } + }) + .listen(8080) + .onFailure(start::fail) + .onSuccess(server -> { + System.out.println("Server listening at http://localhost:8080"); + start.complete(); + }); + } +} diff --git a/src/main/java/examples/WebsocketBridgeExample.java b/src/main/java/examples/WebsocketBridgeExample.java new file mode 100644 index 0000000..f5010c5 --- /dev/null +++ b/src/main/java/examples/WebsocketBridgeExample.java @@ -0,0 +1,70 @@ +package examples; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.WebSocketBase; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.bridge.PermittedOptions; +import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions; +import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge; + +public class WebsocketBridgeExample extends AbstractVerticle { + + public static void main(String[] args) { + Vertx.vertx().deployVerticle(new WebsocketBridgeExample()); + } + + @Override + public void start(Promise start) { + vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value")))); + vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body())); + vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi"))); + + JsonRPCBridgeOptions options = new JsonRPCBridgeOptions() + .addInboundPermitted(new PermittedOptions().setAddress("hello")) + .addInboundPermitted(new PermittedOptions().setAddress("echo")) + .addInboundPermitted(new PermittedOptions().setAddress("ping")) + .addOutboundPermitted(new PermittedOptions().setAddress("echo")) + .addOutboundPermitted(new PermittedOptions().setAddress("hello")) + .addOutboundPermitted(new PermittedOptions().setAddress("ping")) + // if set to false, then websockets messages are received on frontend as binary frames + .setWebsocketsTextAsFrame(true); + + Handler bridge = JsonRPCStreamEventBusBridge.webSocketHandler(vertx, options, null); + + vertx + .createHttpServer() + .requestHandler(req -> { + // this is where any http request will land + if ("/jsonrpc".equals(req.path())) { + // we switch from HTTP to WebSocket + req.toWebSocket() + .onFailure(err -> { + err.printStackTrace(); + req.response().setStatusCode(500).end(err.getMessage()); + }) + .onSuccess(bridge::handle); + } else { + // serve the base HTML application + if ("/".equals(req.path())) { + req.response() + .putHeader(HttpHeaders.CONTENT_TYPE, "text/html") + .sendFile("ws.html"); + } else { + // 404 all the rest + req.response().setStatusCode(404).end("Not Found"); + } + } + }) + .listen(8080) + .onFailure(start::fail) + .onSuccess(server -> { + System.out.println("Server listening at http://localhost:8080"); + start.complete(); + }); + } +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/BridgeEvent.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/BridgeEvent.java index 1485c22..3bedd21 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/BridgeEvent.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/BridgeEvent.java @@ -22,6 +22,7 @@ import io.vertx.core.Future; import io.vertx.core.json.JsonObject; import io.vertx.core.net.NetSocket; +import io.vertx.ext.bridge.BaseBridgeEvent; /** * Represents an event that occurs on the event bus bridge. @@ -31,7 +32,7 @@ * @author Tim Fox */ @VertxGen -public interface BridgeEvent extends io.vertx.ext.bridge.BaseBridgeEvent { +public interface BridgeEvent extends BaseBridgeEvent { /** * Get the raw JSON message for the event. This will be null for SOCKET_CREATED or SOCKET_CLOSED events as there is @@ -41,7 +42,10 @@ public interface BridgeEvent extends io.vertx.ext.bridge.BaseBridgeEvent { * @return this reference, so it can be used fluently */ @Fluent - BridgeEvent setRawMessage(JsonObject message); + BridgeEvent setRawMessage(JsonObject message); + + // TODO: this will cause problems with WebSockets as they don't share any common base interface + // this will be a breaking change to users, as the return type is now generic, is this OK? /** * Get the SockJSSocket instance corresponding to the event @@ -49,5 +53,5 @@ public interface BridgeEvent extends io.vertx.ext.bridge.BaseBridgeEvent { * @return the SockJSSocket instance */ @CacheReturn - NetSocket socket(); + T socket(); } diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCBridgeOptions.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCBridgeOptions.java new file mode 100644 index 0000000..f4efa81 --- /dev/null +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCBridgeOptions.java @@ -0,0 +1,127 @@ +package io.vertx.ext.eventbus.bridge.tcp; + +import io.vertx.codegen.annotations.DataObject; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.bridge.BridgeOptions; +import io.vertx.ext.bridge.BridgeOptionsConverter; +import io.vertx.ext.bridge.PermittedOptions; + +import java.util.List; + +@DataObject(generateConverter = true) +public class JsonRPCBridgeOptions extends BridgeOptions { + private boolean websocketsTextAsFrame; + + public JsonRPCBridgeOptions() {} + + /** + * Creates a new instance of {@link JsonRPCBridgeOptions} by copying the content of another {@link JsonRPCBridgeOptions} + * + * @param that the {@link JsonRPCBridgeOptions} to copy. + */ + public JsonRPCBridgeOptions(JsonRPCBridgeOptions that) { + super(that); + this.websocketsTextAsFrame = that.websocketsTextAsFrame; + } + + /** + * Creates a new instance of {@link JsonRPCBridgeOptions} from its JSON representation. + * This method uses the generated converter. + * + * @param json the serialized {@link JsonRPCBridgeOptions} + * @see BridgeOptionsConverter + */ + public JsonRPCBridgeOptions(JsonObject json) { + JsonRPCBridgeOptionsConverter.fromJson(json, this); + } + + /** + * Serializes the current {@link JsonRPCBridgeOptions} to JSON. This method uses the generated converter. + * + * @return the serialized object + */ + public JsonObject toJson() { + JsonObject json = new JsonObject(); + JsonRPCBridgeOptionsConverter.toJson(this, json); + return json; + } + + /** + * Sets whether to use text format for websockets frames for the current {@link JsonRPCBridgeOptions}. + * + * @param websocketsTextAsFrame the choice whether to use text format + * @return the current {@link JsonRPCBridgeOptions}. + */ + public JsonRPCBridgeOptions setWebsocketsTextAsFrame(boolean websocketsTextAsFrame) { + this.websocketsTextAsFrame = websocketsTextAsFrame; + return this; + } + + /** + * @return whether to use text format for websockets frames. + */ + public boolean getWebsocketsTextAsFrame() { + return websocketsTextAsFrame; + } + + + /** + * Adds an inbound permitted option to the current {@link JsonRPCBridgeOptions}. + * + * @param permitted the inbound permitted + * @return the current {@link JsonRPCBridgeOptions}. + */ + public JsonRPCBridgeOptions addInboundPermitted(PermittedOptions permitted) { + super.addInboundPermitted(permitted); + return this; + } + + /** + * @return the list of inbound permitted options. Empty if none. + */ + public List getInboundPermitteds() { + return super.getInboundPermitteds(); + } + + /** + * Sets the list of inbound permitted options. + * + * @param inboundPermitted the list to use, must not be {@link null}. This method use the direct list reference + * (and doesn't create a copy). + * @return the current {@link JsonRPCBridgeOptions}. + */ + public JsonRPCBridgeOptions setInboundPermitteds(List inboundPermitted) { + super.setInboundPermitteds(inboundPermitted); + return this; + } + + /** + * Adds an outbound permitted option to the current {@link JsonRPCBridgeOptions}. + * + * @param permitted the outbound permitted + * @return the current {@link JsonRPCBridgeOptions}. + */ + public JsonRPCBridgeOptions addOutboundPermitted(PermittedOptions permitted) { + super.addOutboundPermitted(permitted); + return this; + } + + /** + * @return the list of outbound permitted options. Empty if none. + */ + public List getOutboundPermitteds() { + return super.getOutboundPermitteds(); + } + + /** + * Sets the list of outbound permitted options. + * + * @param outboundPermitted the list to use, must not be {@link null}. This method use the direct list reference + * (and doesn't create a copy). + * @return the current {@link JsonRPCBridgeOptions}. + */ + public JsonRPCBridgeOptions setOutboundPermitteds(List outboundPermitted) { + super.setOutboundPermitteds(outboundPermitted); + return this; + } +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridge.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridge.java new file mode 100644 index 0000000..ceb6487 --- /dev/null +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridge.java @@ -0,0 +1,72 @@ +/* + * Copyright 2015 Red Hat, Inc. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.ext.eventbus.bridge.tcp; + +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.WebSocketBase; +import io.vertx.core.net.NetSocket; +import io.vertx.ext.bridge.BridgeOptions; +import io.vertx.ext.eventbus.bridge.tcp.impl.HttpJsonRPCStreamEventBusBridgeImpl; +import io.vertx.ext.eventbus.bridge.tcp.impl.JsonRPCStreamEventBusBridgeImpl; +import io.vertx.ext.eventbus.bridge.tcp.impl.TCPJsonRPCStreamEventBusBridgeImpl; +import io.vertx.ext.eventbus.bridge.tcp.impl.WebsocketJsonRPCStreamEventBusBridgeImpl; + +/** + * JSONRPC stream EventBus bridge for Vert.x + * + * @author Paulo Lopes + */ +@VertxGen +public interface JsonRPCStreamEventBusBridge { + + static Handler netSocketHandler(Vertx vertx) { + return netSocketHandler(vertx, null, null); + } + + static Handler netSocketHandler(Vertx vertx, JsonRPCBridgeOptions options) { + return netSocketHandler(vertx, options, null); + } + + static Handler netSocketHandler(Vertx vertx, JsonRPCBridgeOptions options, Handler> eventHandler) { + return new TCPJsonRPCStreamEventBusBridgeImpl(vertx, options, eventHandler); + } + + static Handler webSocketHandler(Vertx vertx) { + return webSocketHandler(vertx, null, null); + } + + static Handler webSocketHandler(Vertx vertx, JsonRPCBridgeOptions options) { + return webSocketHandler(vertx, options, null); + } + static Handler webSocketHandler(Vertx vertx, JsonRPCBridgeOptions options, Handler> eventHandler) { + return new WebsocketJsonRPCStreamEventBusBridgeImpl(vertx, options, eventHandler); + } + + static Handler httpSocketHandler(Vertx vertx) { + return httpSocketHandler(vertx, null, null); + } + + static Handler httpSocketHandler(Vertx vertx, JsonRPCBridgeOptions options) { + return httpSocketHandler(vertx, options, null); + } + + static Handler httpSocketHandler(Vertx vertx, JsonRPCBridgeOptions options, Handler> eventHandler) { + return new HttpJsonRPCStreamEventBusBridgeImpl(vertx, options, eventHandler); + } +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java index 5abbdda..cd10836 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java @@ -22,6 +22,7 @@ import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.NetSocket; import io.vertx.ext.bridge.BridgeOptions; import io.vertx.ext.eventbus.bridge.tcp.impl.TcpEventBusBridgeImpl; @@ -44,7 +45,7 @@ static TcpEventBusBridge create(Vertx vertx, BridgeOptions options) { static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions) { return new TcpEventBusBridgeImpl(vertx, options, netServerOptions,null); } - static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions,Handler eventHandler) { + static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions, Handler> eventHandler) { return new TcpEventBusBridgeImpl(vertx, options, netServerOptions,eventHandler); } /** diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/BridgeEventImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/BridgeEventImpl.java index d18d11a..fafe1af 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/BridgeEventImpl.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/BridgeEventImpl.java @@ -20,7 +20,6 @@ import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.json.JsonObject; -import io.vertx.core.net.NetSocket; import io.vertx.ext.bridge.BridgeEventType; import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent; @@ -28,14 +27,14 @@ * @author Tim Fox * @author grant@iowntheinter.net */ -class BridgeEventImpl implements BridgeEvent { +class BridgeEventImpl implements BridgeEvent { private final BridgeEventType type; private final JsonObject rawMessage; - private final NetSocket socket; + private final T socket; private final Promise promise; - public BridgeEventImpl(BridgeEventType type, JsonObject rawMessage, NetSocket socket) { + public BridgeEventImpl(BridgeEventType type, JsonObject rawMessage, T socket) { this.type = type; this.rawMessage = rawMessage; this.socket = socket; @@ -58,7 +57,7 @@ public JsonObject getRawMessage() { } @Override - public BridgeEvent setRawMessage(JsonObject message) { + public BridgeEvent setRawMessage(JsonObject message) { if (message != rawMessage) { rawMessage.clear().mergeIn(message); } @@ -71,7 +70,7 @@ public void handle(AsyncResult asyncResult) { } @Override - public NetSocket socket() { + public T socket() { return socket; } @@ -114,5 +113,4 @@ public boolean tryFail(Throwable cause) { public boolean tryFail(String failureMessage) { return promise.tryFail(failureMessage); } - } diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/HttpJsonRPCStreamEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/HttpJsonRPCStreamEventBusBridgeImpl.java new file mode 100644 index 0000000..6baab4c --- /dev/null +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/HttpJsonRPCStreamEventBusBridgeImpl.java @@ -0,0 +1,118 @@ +package io.vertx.ext.eventbus.bridge.tcp.impl; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.bridge.BridgeEventType; +import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent; +import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +public class HttpJsonRPCStreamEventBusBridgeImpl extends JsonRPCStreamEventBusBridgeImpl { + + // http client cannot reply in the same request in which it originally received + // a response so the replies map should be persistent across http request + final Map> replies = new ConcurrentHashMap<>(); + + public HttpJsonRPCStreamEventBusBridgeImpl(Vertx vertx, JsonRPCBridgeOptions options, Handler> bridgeEventHandler) { + super(vertx, options, bridgeEventHandler); + } + + @Override + public void handle(HttpServerRequest socket) { + checkCallHook( + // process the new socket according to the event handler + () -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CREATED, null, socket), + // on success + () -> { + final Map> registry = new ConcurrentHashMap<>(); + + socket.exceptionHandler(t -> { + log.error(t.getMessage(), t); + registry.values().forEach(MessageConsumer::unregister); + registry.clear(); + }).handler(buffer -> { + // TODO: handle content type + + // TODO: body may be an array (batching) + final JsonObject msg = new JsonObject(buffer); + + if (this.isInvalid(msg)) { + return; + } + + final String method = msg.getString("method"); + final Object id = msg.getValue("id"); + HttpServerResponse response = socket + .response() + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .endHandler(handler -> { + registry.values().forEach(MessageConsumer::unregister); + // normal close, trigger the event + checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CLOSED, null, socket)); + registry.clear(); + }); + Consumer writer; + if (method.equals("register")) { + response.setChunked(true); + writer = payload -> response.write(payload.encode()); + } else { + writer = payload -> response.end(payload.encode()); + } + dispatch(writer, method, id, msg, registry, replies); + }); + }, + // on failure + () -> socket.response().setStatusCode(500).setStatusMessage("Internal Server Error").end()); + } + + // TODO: Discuss. Currently we are only adding such methods because SSE doesn't have a body, maybe we could + // instead mandate some query params in the request to signal SSE. but bodyHandler is not invoked + // in that case so how to handle the request. endHandler or check query params first before applying + // bodyHandler ? + public void handleSSE(HttpServerRequest socket, Object id, JsonObject msg) { + checkCallHook( + // process the new socket according to the event handler + () -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CREATED, null, socket), + () -> { + final Map> registry = new ConcurrentHashMap<>(); + + socket.exceptionHandler(t -> { + log.error(t.getMessage(), t); + registry.values().forEach(MessageConsumer::unregister); + registry.clear(); + }); + + HttpServerResponse response = socket.response().setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, + "text/event-stream").endHandler(handler -> { + checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CLOSED, null, socket)); + registry.values().forEach(MessageConsumer::unregister); + registry.clear(); + }); + + Consumer writer = payload -> { + JsonObject result = payload.getJsonObject("result"); + if (result != null) { + String address = result.getString("address"); + if (address != null) { + response.write("event: " + address + "\n"); + response.write("data: " + payload.encode() + "\n\n"); + } + } + }; + register(writer, id, msg, registry, replies); + }, + () -> socket.response().setStatusCode(500).setStatusMessage("Internal Server Error").end() + ); + } + + +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/JsonRPCStreamEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/JsonRPCStreamEventBusBridgeImpl.java new file mode 100644 index 0000000..29c827e --- /dev/null +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/JsonRPCStreamEventBusBridgeImpl.java @@ -0,0 +1,399 @@ +/* + * Copyright 2015 Red Hat, Inc. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.ext.eventbus.bridge.tcp.impl; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.*; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.bridge.BridgeEventType; +import io.vertx.ext.bridge.BridgeOptions; +import io.vertx.ext.bridge.PermittedOptions; +import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent; +import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions; +import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper; +import io.vertx.json.schema.Draft; +import io.vertx.json.schema.JsonSchema; +import io.vertx.json.schema.JsonSchemaOptions; +import io.vertx.json.schema.OutputUnit; +import io.vertx.json.schema.Validator; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Abstract TCP EventBus bridge. Handles all common socket operations but has no knowledge on the payload. + * + * @author Paulo Lopes + */ +public abstract class JsonRPCStreamEventBusBridgeImpl implements Handler { + + protected static final Logger log = LoggerFactory.getLogger(JsonRPCStreamEventBusBridgeImpl.class); + protected static final JsonObject EMPTY = new JsonObject(Collections.emptyMap()); + + protected final Vertx vertx; + + protected final EventBus eb; + + protected final Map compiledREs = new HashMap<>(); + protected final JsonRPCBridgeOptions options; + protected final Handler> bridgeEventHandler; + + private final Validator requestValidator; + + public JsonRPCStreamEventBusBridgeImpl(Vertx vertx, JsonRPCBridgeOptions options, Handler> eventHandler) { + this.vertx = vertx; + this.eb = vertx.eventBus(); + this.options = options != null ? options : new JsonRPCBridgeOptions(); + this.bridgeEventHandler = eventHandler; + this.requestValidator = getRequestValidator(); + } + + private Validator getRequestValidator() { + String path = "io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.scehma.json"; + try { + Buffer buffer = vertx.fileSystem().readFileBlocking(path); + JsonObject jsonObject = buffer.toJsonObject(); + return Validator.create(JsonSchema.of(jsonObject), + new JsonSchemaOptions().setDraft(Draft.DRAFT202012).setBaseUri("https://vertx.io")); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + protected boolean isInvalid(JsonObject object) { + OutputUnit outputUnit = requestValidator.validate(object); + if (!outputUnit.getValid()) { + log.error("Invalid message. Error: " + outputUnit.getErrors() + " . Message: " + object); + return true; + } + return false; + } + + protected void dispatch(Consumer socket, String method, Object id, JsonObject msg, Map> registry, Map> replies) { + switch (method) { + case "send": + checkCallHook( + () -> new BridgeEventImpl<>(BridgeEventType.SEND, msg, null), + () -> send(socket, id, msg, registry, replies), + () -> JsonRPCHelper.error(id, -32040, "access_denied", socket) + ); + break; + case "publish": + checkCallHook( + () -> new BridgeEventImpl<>(BridgeEventType.SEND, msg, null), + () -> publish(socket, id, msg, registry, replies), + () -> JsonRPCHelper.error(id, -32040, "access_denied", socket) + ); + break; + case "register": + checkCallHook( + () -> new BridgeEventImpl<>(BridgeEventType.REGISTER, msg, null), + () -> register(socket, id, msg, registry, replies), + () -> JsonRPCHelper.error(id, -32040, "access_denied", socket) + ); + break; + case "unregister": + checkCallHook( + () -> new BridgeEventImpl<>(BridgeEventType.UNREGISTER, msg, null), + () -> unregister(socket, id, msg, registry, replies), + () -> JsonRPCHelper.error(id, -32040, "access_denied", socket) + ); + break; + case "ping": + JsonRPCHelper.response(id, "pong", socket); + break; + default: + JsonRPCHelper.error(id, -32601, "unknown_method", socket); + break; + } + } + + protected void unregister(Consumer socket, Object id, JsonObject msg, Map> registry, Map> replies) { + final JsonObject params = msg.getJsonObject("params", EMPTY); + final String address = params.getString("address"); + + if (address == null) { + JsonRPCHelper.error(id, -32602, "invalid_parameters", socket); + return; + } + + if (checkMatches(false, address)) { + MessageConsumer consumer = registry.remove(address); + if (consumer != null) { + consumer.unregister(); + if (id != null) { + // ack + JsonRPCHelper.response(id, EMPTY, socket); + } + } else { + JsonRPCHelper.error(id, -32044, "unknown_address", socket); + } + } else { + JsonRPCHelper.error(id, -32040, "access_denied", socket); + } + } + + protected void register(Consumer socket, Object id, JsonObject msg, Map> registry, Map> replies) { + final JsonObject params = msg.getJsonObject("params", EMPTY); + final String address = params.getString("address"); + + if (address == null) { + JsonRPCHelper.error(id, -32602, "invalid_parameters", socket); + return; + } + + if (checkMatches(false, address)) { + registry.put(address, eb.consumer(address, res1 -> { + // save a reference to the message so tcp bridged messages can be replied properly + if (res1.replyAddress() != null) { + replies.put(res1.replyAddress(), res1); + } + + final JsonObject responseHeaders = new JsonObject(); + + // clone the headers from / to + for (Map.Entry entry : res1.headers()) { + responseHeaders.put(entry.getKey(), entry.getValue()); + } + + JsonRPCHelper.response( + id, + new JsonObject() + .put("address", res1.address()) + .put("replyAddress", res1.replyAddress()) + .put("headers", responseHeaders) + .put("body", res1.body()), + socket); + })); + checkCallHook( + () -> new BridgeEventImpl<>(BridgeEventType.REGISTERED, msg, null), + () -> { + if (id != null) { + // ack + JsonRPCHelper.response(id, EMPTY, socket); + } + }); + } else { + JsonRPCHelper.error(id, -32040, "access_denied", socket); + } + } + + protected void publish(Consumer socket, Object id, JsonObject msg, Map> registry, Map> replies) { + final JsonObject params = msg.getJsonObject("params", EMPTY); + final String address = params.getString("address"); + + if (address == null) { + JsonRPCHelper.error(id, -32602, "invalid_parameters", socket); + return; + } + + if (checkMatches(true, address)) { + final JsonObject body = params.getJsonObject("body"); + final DeliveryOptions deliveryOptions = parseMsgHeaders(new DeliveryOptions(), params.getJsonObject("headers")); + + eb.publish(address, body, deliveryOptions); + if (id != null) { + // ack + JsonRPCHelper.response(id, EMPTY, socket); + } + } else { + JsonRPCHelper.error(id, -32040, "access_denied", socket); + } + } + + protected void send(Consumer socket, Object id, JsonObject msg, Map> registry, Map> replies) { + final JsonObject params = msg.getJsonObject("params", EMPTY); + final String address = params.getString("address"); + + if (address == null) { + JsonRPCHelper.error(id, -32602, "invalid_parameters", socket); + return; + } + + if (checkMatches(true, address, replies)) { + final JsonObject body = params.getJsonObject("body"); + final DeliveryOptions deliveryOptions = parseMsgHeaders(new DeliveryOptions(), params.getJsonObject("headers")); + + if (id != null) { + // id is not null, it is a request from TCP endpoint that will wait for a response + eb.request(address, body, deliveryOptions, request -> { + if (request.failed()) { + JsonRPCHelper.error(id, (ReplyException) request.cause(), socket); + } else { + final Message response = request.result(); + final JsonObject responseHeaders = new JsonObject(); + + // clone the headers from / to + for (Map.Entry entry : response.headers()) { + responseHeaders.put(entry.getKey(), entry.getValue()); + } + + if (response.replyAddress() != null) { + replies.put(response.replyAddress(), response); + } + + JsonRPCHelper.response( + id, + new JsonObject() + .put("headers", responseHeaders) + .put("id", response.replyAddress()) + .put("body", response.body()), + socket); + } + }); + } else { + // no reply address it might be a response, a failure or a request that does not need a response + if (replies.containsKey(address)) { + // address is registered, it is not a request + final JsonObject error = params.getJsonObject("error"); + if (error == null) { + // No error block, it is a response + replies.get(address).reply(body, deliveryOptions); + } else { + // error block, fail the original response + replies.get(address).fail(error.getInteger("failureCode"), error.getString("message")); + } + } else { + // it is a request that does not expect a response + eb.send(address, body, deliveryOptions); + } + } + // replies are a one time off operation + replies.remove(address); + } else { + JsonRPCHelper.error(id, -32040, "access_denied", socket); + } + } + + protected void checkCallHook(Supplier> eventSupplier) { + checkCallHook(eventSupplier, null, null); + } + + protected void checkCallHook(Supplier> eventSupplier, Runnable okAction) { + checkCallHook(eventSupplier, okAction, null); + } + + protected void checkCallHook(Supplier> eventSupplier, Runnable okAction, Runnable rejectAction) { + if (bridgeEventHandler == null) { + if (okAction != null) { + okAction.run(); + } + } else { + BridgeEvent event = eventSupplier.get(); + bridgeEventHandler.handle(event); + event.future().onComplete(res -> { + if (res.succeeded()) { + if (res.result()) { + if (okAction != null) { + okAction.run(); + } + } else { + if (rejectAction != null) { + rejectAction.run(); + } else { + log.debug("Bridge handler prevented: " + event.toString()); + } + } + } else { + log.error("Failure in bridge event handler", res.cause()); + } + }); + } + } + + protected boolean checkMatches(boolean inbound, String address) { + return checkMatches(inbound, address, null); + } + + protected boolean checkMatches(boolean inbound, String address, Map> replies) { + // special case, when dealing with replies the addresses are not in the inbound/outbound list but on + // the replies registry + if (replies != null && inbound && replies.containsKey(address)) { + return true; + } + + List matches = inbound ? options.getInboundPermitteds() : options.getOutboundPermitteds(); + + for (PermittedOptions matchHolder : matches) { + String matchAddress = matchHolder.getAddress(); + String matchRegex; + if (matchAddress == null) { + matchRegex = matchHolder.getAddressRegex(); + } else { + matchRegex = null; + } + + boolean addressOK; + if (matchAddress == null) { + addressOK = matchRegex == null || regexMatches(matchRegex, address); + } else { + addressOK = matchAddress.equals(address); + } + + if (addressOK) { + return true; + } + } + + return false; + } + + protected boolean regexMatches(String matchRegex, String address) { + Pattern pattern = compiledREs.get(matchRegex); + if (pattern == null) { + pattern = Pattern.compile(matchRegex); + compiledREs.put(matchRegex, pattern); + } + Matcher m = pattern.matcher(address); + return m.matches(); + } + + protected DeliveryOptions parseMsgHeaders(DeliveryOptions options, JsonObject headers) { + if (headers == null) + return options; + + Iterator fnameIter = headers.fieldNames().iterator(); + String fname; + while (fnameIter.hasNext()) { + fname = fnameIter.next(); + if ("timeout".equals(fname)) { + options.setSendTimeout(headers.getLong(fname)); + } else if ("localOnly".equals(fname)) { + options.setLocalOnly(headers.getBoolean(fname)); + } else if ("codecName".equals(fname)) { + options.setCodecName(headers.getString(fname)); + } else { + options.addHeader(fname, headers.getString(fname)); + } + } + + return options; + } +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ParserHandler.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ParserHandler.java new file mode 100644 index 0000000..4674c9c --- /dev/null +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ParserHandler.java @@ -0,0 +1,23 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.ext.eventbus.bridge.tcp.impl; + +import io.vertx.core.buffer.Buffer; + +@FunctionalInterface +public interface ParserHandler { + void handle(String contentType, Buffer body); +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ReadableBuffer.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ReadableBuffer.java new file mode 100644 index 0000000..bcab1a3 --- /dev/null +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ReadableBuffer.java @@ -0,0 +1,142 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.ext.eventbus.bridge.tcp.impl; + +import io.vertx.core.buffer.Buffer; + +final class ReadableBuffer { + + private static final int MARK_WATERMARK = 4 * 1024; + + private Buffer buffer; + private int offset; + private int mark; + + void append(Buffer chunk) { + // either the buffer is null or all read + if (buffer == null || Math.min(mark, offset) == buffer.length()) { + buffer = chunk; + offset = 0; + return; + } + + // slice the buffer discarding the read bytes + if ( + // the offset (read operations) must be further than the last checkpoint + offset >= mark && + // there must be already read more than water mark + mark > MARK_WATERMARK && + // and there are more bytes to read already + buffer.length() > mark) { + + // clean up when there's too much data + buffer = buffer.getBuffer(mark, buffer.length()); + offset -= mark; + mark = 0; + } + + buffer.appendBuffer(chunk); + } + + int findSTX() { + for (int i = offset; i < buffer.length(); i++) { + byte b = buffer.getByte(i); + switch (b) { + case '\r': + case '\n': + // skip new lines + continue; + case '{': + case '[': + return i; + default: + throw new IllegalStateException("Unexpected value in buffer: (int)" + ((int) b)); + } + } + + return -1; + } + + int findETX(int offset) { + // brace start / end + final byte bs, be; + // brace count + int bc = 0; + + switch (buffer.getByte(offset)) { + case '{': + bs = '{'; + be = '}'; + break; + case '[': + bs = '['; + be = ']'; + break; + default: + throw new IllegalStateException("Message 1st byte isn't valid: " + buffer.getByte(offset)); + } + + for (int i = offset; i < buffer.length(); i++) { + byte b = buffer.getByte(i); + if (b == bs) { + bc++; + } else + if (b == be) { + bc--; + } else { + continue; + } + // validation + if (bc < 0) { + // unbalanced braces + throw new IllegalStateException("Message format is not valid: " + buffer.getString(offset, i) + "..."); + } + if (bc == 0) { + // complete + return i + 1; + } + } + + return -1; + } + + Buffer readBytes(int offset, int count) { + Buffer bytes = null; + if (buffer.length() - offset >= count) { + bytes = buffer.getBuffer(offset, offset + count); + this.offset = offset + count; + } + return bytes; + } + + int readableBytes() { + return buffer.length() - offset; + } + + void mark() { + mark = offset; + } + + void reset() { + offset = mark; + } + + + @Override + public String toString() { + return buffer != null ? buffer.toString() : "null"; + } +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/StreamParser.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/StreamParser.java new file mode 100644 index 0000000..c8f0a85 --- /dev/null +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/StreamParser.java @@ -0,0 +1,93 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.ext.eventbus.bridge.tcp.impl; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; + +public final class StreamParser implements Handler { + + private static final Logger LOGGER = LoggerFactory.getLogger(StreamParser.class); + + // the callback when a full response message has been decoded + private Handler handler; + private Handler exceptionHandler; + + // a composite buffer to allow buffer concatenation as if it was + // a long stream + private final ReadableBuffer buffer = new ReadableBuffer(); + + public StreamParser handler(Handler handler) { + this.handler = handler; + return this; + } + + public StreamParser exceptionHandler(Handler handler) { + this.exceptionHandler = handler; + return this; + } + + @Override + public void handle(Buffer chunk) { + if (chunk.length() > 0) { + // add the chunk to the buffer + buffer.append(chunk); + + // the minimum messages are "{}" or "[]" + while (buffer.readableBytes() >= 2) { + // setup a rollback point + buffer.mark(); + + final Buffer payload; + + try { + // locate the message boundaries + final int start = buffer.findSTX(); + + // no start found yet + if (start == -1) { + buffer.reset(); + break; + } + + final int end = buffer.findETX(start); + + // no end found yet + if (end == -1) { + buffer.reset(); + break; + } + + payload = buffer.readBytes(start, end - start); + } catch (IllegalStateException ise) { + exceptionHandler.handle(ise); + break; + } + + // payload is found, deliver it to the handler + try { + handler.handle(payload); + } catch (RuntimeException e) { + // these are user exceptions, not protocol exceptions + // we can continue the parsing job + LOGGER.error("Failed to handle payload", e); + } + } + } + } +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java new file mode 100644 index 0000000..abffbd6 --- /dev/null +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java @@ -0,0 +1,78 @@ +package io.vertx.ext.eventbus.bridge.tcp.impl; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.NetSocket; +import io.vertx.ext.bridge.BridgeEventType; +import io.vertx.ext.bridge.BridgeOptions; +import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent; +import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +public class TCPJsonRPCStreamEventBusBridgeImpl extends JsonRPCStreamEventBusBridgeImpl { + + public TCPJsonRPCStreamEventBusBridgeImpl(Vertx vertx, JsonRPCBridgeOptions options, Handler> bridgeEventHandler) { + super(vertx, options, bridgeEventHandler); + } + + @Override + public void handle(NetSocket socket) { + checkCallHook( + // process the new socket according to the event handler + () -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CREATED, null, socket), + // on success + () -> { + final Map> registry = new ConcurrentHashMap<>(); + final Map> replies = new ConcurrentHashMap<>(); + + socket + .exceptionHandler(t -> { + log.error(t.getMessage(), t); + registry.values().forEach(MessageConsumer::unregister); + registry.clear(); + }) + .endHandler(v -> { + registry.values().forEach(MessageConsumer::unregister); + // normal close, trigger the event + checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CLOSED, null, socket)); + registry.clear(); + }) + .handler( + // create a protocol parser + new StreamParser() + .exceptionHandler(t -> { + log.error(t.getMessage(), t); + }) + .handler(buffer -> { + // TODO: handle content type + + // TODO: body may be an array (batching) + final JsonObject msg = new JsonObject(buffer); + if (this.isInvalid(msg)) { + return; + } + final String method = msg.getString("method"); + final Object id = msg.getValue("id"); + + Consumer writer = payload -> socket.write(payload.toBuffer().appendString("\r\n")); + + dispatch( + writer, + method, + id, + msg, + registry, + replies); + })); + }, + // on failure + socket::close + ); + } +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java index ff2b111..7a8ab7e 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java @@ -59,10 +59,10 @@ public class TcpEventBusBridgeImpl implements TcpEventBusBridge { private final Map compiledREs = new HashMap<>(); private final BridgeOptions options; - private final Handler bridgeEventHandler; + private final Handler> bridgeEventHandler; - public TcpEventBusBridgeImpl(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions, Handler eventHandler) { + public TcpEventBusBridgeImpl(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions, Handler> eventHandler) { this.eb = vertx.eventBus(); this.options = options != null ? options : new BridgeOptions(); this.bridgeEventHandler = eventHandler; @@ -205,7 +205,7 @@ private void doSendOrPub(NetSocket socket, String address, JsonObject msg, Map new BridgeEventImpl(BridgeEventType.REGISTERED, msg, socket), null, null); + checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.REGISTERED, msg, socket), null, null); } else { sendErrFrame("access_denied", socket); } @@ -252,7 +252,7 @@ private void handler(NetSocket socket) { final String type = msg.getString("type", "message"); final String address = msg.getString("address"); BridgeEventType eventType = parseType(type); - checkCallHook(() -> new BridgeEventImpl(eventType, msg, socket), + checkCallHook(() -> new BridgeEventImpl<>(eventType, msg, socket), () -> { if (eventType != BridgeEventType.SOCKET_PING && address == null) { sendErrFrame("missing_address", socket); @@ -289,13 +289,13 @@ public Future close() { return server.close(); } - private void checkCallHook(Supplier eventSupplier, Runnable okAction, Runnable rejectAction) { + private void checkCallHook(Supplier> eventSupplier, Runnable okAction, Runnable rejectAction) { if (bridgeEventHandler == null) { if (okAction != null) { okAction.run(); } } else { - BridgeEventImpl event = eventSupplier.get(); + BridgeEventImpl event = eventSupplier.get(); bridgeEventHandler.handle(event); event.future().onComplete(res -> { if (res.succeeded()) { diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/WebsocketJsonRPCStreamEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/WebsocketJsonRPCStreamEventBusBridgeImpl.java new file mode 100644 index 0000000..c178db0 --- /dev/null +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/WebsocketJsonRPCStreamEventBusBridgeImpl.java @@ -0,0 +1,101 @@ +package io.vertx.ext.eventbus.bridge.tcp.impl; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.http.WebSocketBase; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.bridge.BridgeEventType; +import io.vertx.ext.bridge.BridgeOptions; +import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent; +import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +public class WebsocketJsonRPCStreamEventBusBridgeImpl extends JsonRPCStreamEventBusBridgeImpl { + + public WebsocketJsonRPCStreamEventBusBridgeImpl(Vertx vertx, JsonRPCBridgeOptions options, Handler> bridgeEventHandler) { + super(vertx, options, bridgeEventHandler); + } + + @Override + public void handle(WebSocketBase socket) { + checkCallHook( + // process the new socket according to the event handler + () -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CREATED, null, null), + // on success + () -> { + final Map> registry = new ConcurrentHashMap<>(); + final Map> replies = new ConcurrentHashMap<>(); + + Consumer consumer; + if (options.getWebsocketsTextAsFrame()) { + consumer = payload -> socket.writeTextMessage(payload.encode()); + } else { + consumer = payload -> socket.writeBinaryMessage(payload.toBuffer()); + } + + socket + .exceptionHandler(t -> { + log.error(t.getMessage(), t); + registry.values().forEach(MessageConsumer::unregister); + registry.clear(); + }) + .endHandler(v -> { + registry.values().forEach(MessageConsumer::unregister); + // normal close, trigger the event + checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CLOSED, null, null)); + registry.clear(); + }) + .frameHandler(frame -> { + // TODO: this could be an [], in this case, after parsing, we should loop and call for each element the + // code bellow. + + // One idea from vs-jsonrpcstream was the use of content-types, so define how the message was formated + // by default json (like in the spec) but microsoft was suggesting messagepack as alternative. I'm not + // sure if we should implement this. The TCP parser was accounting for it, but is it a good idea? maybe not? + + // not handling CLOSE frames here, endHandler will be invoked on the socket later + // ping frames are automatically handled by websockets so safe to ignore here + if (frame.isClose() || frame.isPing()) { + return; + } + + final JsonObject msg = new JsonObject(frame.binaryData()); + if (this.isInvalid(msg)) { + return; + } + + final String method = msg.getString("method"); + final Object id = msg.getValue("id"); + + // TODO: we should wrap the socket in order to override the "write" method to write a text frame + // TODO: the current WriteStream assumes binary frames which are harder to handle on the browser + // TODO: maybe we could make this configurable (binary/text) + + // if we create a wraper, say an interface: + // interface SocketWriter { write(Buffer buff) } + // then we can create specific implementation wrappers for all kinds of sockets, netSocket, webSocket (binary or text) + + // given that the wraper is at the socket level (it's not that heavy in terms of garbage collection, 1 extra object per connection. + // And a connection is long lasting, not like HTTP + + dispatch( + consumer, + method, + id, + msg, + registry, + replies); + }); + }, + // on failure + socket::close + ); + } + +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java index b5e1f3e..3af18c3 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java @@ -23,6 +23,12 @@ import io.vertx.core.json.JsonObject; /** + * TODO: refactor this whole thing to be simpler. Avoid the header's parsing, that is probably a bad idea it was very VisualStudio specific + * once we do that, don't rely on line endings as end of message. Instead we need to locate the end of a message. + * To locate the end of the message, we need to count braces. If a message starts with "{" we increase the counter, + * every time we see "}" we decrease. If we reach 0 it's a full message. If we ever go negative we're on a broken state. + * The same for "[" as jsonrpc batches are just an array of messages + * * Simple LV parser * * @author Paulo Lopes @@ -64,6 +70,7 @@ public void handle(Buffer buffer) { if (remainingBytes - 4 >= length) { // we have a complete message try { + // TODO: this is wrong, we can have both JsonObject or JsonArray client.handle(Future.succeededFuture(new JsonObject(_buffer.getString(_offset, _offset + length)))); } catch (DecodeException e) { // bad json @@ -111,4 +118,4 @@ private void append(Buffer newBuffer) { private int bytesRemaining() { return (_buffer.length() - _offset) < 0 ? 0 : (_buffer.length() - _offset); } -} \ No newline at end of file +} diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/JsonRPCHelper.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/JsonRPCHelper.java new file mode 100644 index 0000000..fd62041 --- /dev/null +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/JsonRPCHelper.java @@ -0,0 +1,122 @@ +/* + * Copyright 2015 Red Hat, Inc. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.ext.eventbus.bridge.tcp.impl.protocol; + +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.ReplyException; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonObject; + +import java.util.function.Consumer; + +/** + * Helper class to format and send frames over a socket + * + * @author Paulo Lopes + */ +public class JsonRPCHelper { + + private JsonRPCHelper() { + } + + // TODO: Should we refactor this helpers to return the buffer with the encoded message and let the caller perform + // the write? This would allow the caller to differentiate from a binary write from a text write? + // The same applies to all methods on this helper class + public static void request(String method, Object id, JsonObject params, MultiMap headers, Consumer handler) { + + final JsonObject payload = new JsonObject().put("jsonrpc", "2.0"); + + if (method == null) { + throw new IllegalStateException("method cannot be null"); + } + + payload.put("method", method); + + if (id != null) { + payload.put("id", id); + } + + if (params != null) { + payload.put("params", params.copy()); + } + + // write + if (headers != null) { + headers.forEach(entry -> { + handler.accept( + Buffer.buffer(entry.getKey()).appendString(": ").appendString(entry.getValue()).appendString("\r\n") + ); + }); + // end of headers + handler.accept(Buffer.buffer("\r\n")); + } + + handler.accept(payload.toBuffer().appendString("\r\n")); + } + + public static void request(String method, Object id, JsonObject params, Consumer handler) { + request(method, id, params, null, handler); + } + + public static void request(String method, Object id, Consumer handler) { + request(method, id, null, null, handler); + } + + public static void request(String method, Consumer handler) { + request(method, null, null, null, handler); + } + + public static void request(String method, JsonObject params, Consumer handler) { + request(method, null, params, null, handler); + } + + public static void response(Object id, Object result, Consumer handler) { + final JsonObject payload = new JsonObject() + .put("jsonrpc", "2.0") + .put("id", id) + .put("result", result); + + handler.accept(payload); + } + + public static void error(Object id, Number code, String message, Consumer handler) { + final JsonObject payload = new JsonObject() + .put("jsonrpc", "2.0") + .put("id", id); + + final JsonObject error = new JsonObject(); + payload.put("error", error); + + if (code != null) { + error.put("code", code); + } + + if (message != null) { + error.put("message", message); + } + + handler.accept(payload); + } + + public static void error(Object id, ReplyException failure, Consumer handler) { + error(id, failure.failureCode(), failure.getMessage(), handler); + } + + public static void error(Object id, String message, Consumer handler) { + error(id, -32000, message, handler); + } +} diff --git a/src/main/resources/http.html b/src/main/resources/http.html new file mode 100644 index 0000000..aeb2be3 --- /dev/null +++ b/src/main/resources/http.html @@ -0,0 +1,20 @@ + + + + HTTP Bridge Example + + + + + diff --git a/src/main/resources/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.scehma.json b/src/main/resources/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.scehma.json new file mode 100644 index 0000000..c5ab7ee --- /dev/null +++ b/src/main/resources/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.scehma.json @@ -0,0 +1,41 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://vertx.io/jsonrpc.schema.json", + "title": "Vert.x Event Bus Bridge JSON-RPC 2.0 Specification", + "description": "JSON-RPC schema to validate messages sent to a Vert.x Event Bus Bridge", + "anyOf": [ + { "$ref": "#/definitions/request" }, + { + "type": "array", + "items": { "$ref": "#/definitions/request" } + } + ], + "definitions": { + "request": { + "type": "object", + "properties": { + "jsonrpc": { + "description": "A String specifying the version of the JSON-RPC protocol. MUST be exactly \"2.0\".", + "const": "2.0" + }, + "method": { + "description": "A String containing the name of the method to be invoked. Method names that begin with the word rpc followed by a period character (U+002E or ASCII 46) are reserved for rpc-internal methods and extensions and MUST NOT be used for anything else.", + "type": "string" + }, + "params": { + "description": "A Structured value that holds the parameter values to be used during the invocation of the method. This member MAY be omitted.", + "type": ["object", "array"] + }, + "id": { + "description": "An identifier established by the Client that MUST contain a String, Number, or NULL value if included. If it is not included it is assumed to be a notification. The value SHOULD normally not be Null and Numbers SHOULD NOT contain fractional parts.", + "type": ["string", "integer", "null"] + } + }, + "required": [ + "jsonrpc", + "method" + ], + "additionalProperties": false + } + } +} diff --git a/src/main/resources/sse.html b/src/main/resources/sse.html new file mode 100644 index 0000000..2806f0e --- /dev/null +++ b/src/main/resources/sse.html @@ -0,0 +1,15 @@ + + + + + + + + diff --git a/src/main/resources/ws.html b/src/main/resources/ws.html new file mode 100644 index 0000000..911219b --- /dev/null +++ b/src/main/resources/ws.html @@ -0,0 +1,36 @@ + + + + Websockets Bridge Example + + + + + diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/HttpJsonRPCStreamEventBusBridgeImplTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/HttpJsonRPCStreamEventBusBridgeImplTest.java new file mode 100644 index 0000000..e952094 --- /dev/null +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/HttpJsonRPCStreamEventBusBridgeImplTest.java @@ -0,0 +1,404 @@ +/* + * Copyright 2015 Red Hat, Inc. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.ext.eventbus.bridge.tcp; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetSocket; +import io.vertx.ext.bridge.BridgeOptions; +import io.vertx.ext.bridge.PermittedOptions; +import io.vertx.ext.eventbus.bridge.tcp.impl.StreamParser; +import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.ext.web.client.WebClient; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.*; + +@RunWith(VertxUnitRunner.class) +public class HttpJsonRPCStreamEventBusBridgeImplTest { + + @Rule + public RunTestOnContext rule = new RunTestOnContext(); + + private final Handler> eventHandler = event -> event.complete(true); + + @Before + public void before(TestContext should) { + final Async test = should.async(); + final Vertx vertx = rule.vertx(); + + vertx.eventBus().consumer("hello", + (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value")))); + + vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body())); + + vertx.setPeriodic(1000, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi"))); + + vertx.createHttpServer().requestHandler(JsonRPCStreamEventBusBridge.httpSocketHandler(vertx, + new JsonRPCBridgeOptions().addInboundPermitted(new PermittedOptions().setAddress("hello")).addInboundPermitted( + new PermittedOptions().setAddress("echo")).addInboundPermitted( + new PermittedOptions().setAddress("test")).addOutboundPermitted( + new PermittedOptions().setAddress("echo")).addOutboundPermitted( + new PermittedOptions().setAddress("test")).addOutboundPermitted(new PermittedOptions().setAddress("ping")), + eventHandler)).listen(7000, res -> { + should.assertTrue(res.succeeded()); + test.complete(); + }); + } + + @Test(timeout = 10_000L) + public void testSendVoidMessage(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + final WebClient client = WebClient.create(vertx); + final Async test = should.async(); + + vertx.eventBus().consumer("test", (Message msg) -> { + client.close(); + test.complete(); + }); + + request("send", new JsonObject().put("address", "test").put("body", new JsonObject().put("value", "vert.x")), + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer)); + + } + + @Test(timeout = 10_000L) + public void testNoHandlers(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + final WebClient client = WebClient.create(vertx); + final Async test = should.async(); + + request("send", "#backtrack", + new JsonObject().put("address", "test").put("body", new JsonObject().put("value", "vert.x")), + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame = handler.bodyAsJsonObject(); + + should.assertTrue(frame.containsKey("error")); + should.assertFalse(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + client.close(); + test.complete(); + }).onFailure(should::fail)); + } + + @Test(timeout = 10_000L) + public void testErrorReply(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + final WebClient client = WebClient.create(vertx); + final Async test = should.async(); + + vertx.eventBus().consumer("test", (Message msg) -> { + msg.fail(0, "oops!"); + }); + + request("send", "#backtrack", + new JsonObject().put("address", "test").put("body", new JsonObject().put("value", "vert.x")), + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame = handler.bodyAsJsonObject(); + should.assertTrue(frame.containsKey("error")); + should.assertFalse(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + client.close(); + test.complete(); + }).onFailure(should::fail)); + + } + + @Test(timeout = 10_000L) + public void testSendMessageWithReplyBacktrack(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + WebClient client = WebClient.create(vertx); + final Async test = should.async(); + + request("send", "#backtrack", + new JsonObject().put("address", "hello").put("body", new JsonObject().put("value", "vert.x")), + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame = handler.bodyAsJsonObject(); + + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject result = frame.getJsonObject("result"); + + should.assertEquals("Hello vert.x", result.getJsonObject("body").getString("value")); + client.close(); + test.complete(); + }).onFailure(should::fail)); + } + + @Test(timeout = 10_000L) + public void testSendMessageWithReplyBacktrackTimeout(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + WebClient client = WebClient.create(vertx); + final Async test = should.async(); + + // This does not reply and will provoke a timeout + vertx.eventBus().consumer("test", (Message msg) -> { /* Nothing! */ }); + + JsonObject headers = new JsonObject().put("timeout", 100L); + + request("send", "#backtrack", new JsonObject().put("address", "test").put("headers", headers).put("body", + new JsonObject().put("value", "vert.x")), + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame = handler.bodyAsJsonObject(); + + should.assertTrue(frame.containsKey("error")); + should.assertFalse(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject error = frame.getJsonObject("error"); + + should.assertEquals( + "Timed out after waiting 100(ms) for a reply. address: __vertx.reply.1, repliedAddress: test", + error.getString("message")); + should.assertEquals(-1, error.getInteger("code")); + + client.close(); + test.complete(); + }).onFailure(should::fail)); + + } + + @Test(timeout = 10_000L) + public void testSendMessageWithDuplicateReplyID(TestContext should) { + // replies must always return to the same origin + + final Vertx vertx = rule.vertx(); + WebClient client = WebClient.create(vertx); + final Async test = should.async(); + + vertx.eventBus().consumer("third-party-receiver", msg -> should.fail()); + + request("send", "third-party-receiver", + new JsonObject().put("address", "hello").put("body", new JsonObject().put("value", "vert.x")), + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame = handler.bodyAsJsonObject(); + client.close(); + test.complete(); + }).onFailure(should::fail)); + } + + @Test(timeout = 10_000L) + @Ignore + public void testRegister(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + WebClient client = WebClient.create(vertx); + final Async test = should.async(); + + final AtomicInteger messageCount = new AtomicInteger(0); + + request("register", "#backtrack", new JsonObject().put("address", "echo"), + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame = handler.bodyAsJsonObject(); + // 2 messages will arrive + // 1) ACK for register message + // 2) MESSAGE for echo + if (messageCount.get() == 0) { + // ACK for register message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time ACK for publish is expected + should.assertTrue(messageCount.compareAndSet(0, 1)); + } + else { + // reply for echo message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject result = frame.getJsonObject("result"); + + should.assertEquals("Vert.x", result.getJsonObject("body").getString("value")); + client.close(); + test.complete(); + } + }).onFailure(should::fail)); + + // now try to publish a message so it gets delivered both to the consumer registred on the startup and to this + // remote consumer + + request("publish", "#backtrack", + new JsonObject().put("address", "echo").put("body", new JsonObject().put("value", "Vert.x")), + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame = handler.bodyAsJsonObject(); + // ACK for publish message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + }).onFailure(should::fail)); + + } + + @Test(timeout = 10_000L) + @Ignore + public void testUnRegister(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + WebClient client = WebClient.create(vertx); + final Async test = should.async(); + + final String address = "test"; + // 4 replies will arrive: + // 1). ACK for register + // 2). ACK for publish + // 3). message published to test + // 4). err of NO_HANDLERS because of consumer for 'test' is unregistered. + final AtomicInteger messageCount = new AtomicInteger(0); + final AtomicInteger messageCount2 = new AtomicInteger(0); + final StreamParser parser = new StreamParser().exceptionHandler(should::fail).handler(body -> { + JsonObject frame = new JsonObject(body); + + if (messageCount.get() == 0) { + // ACK for register message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time ACK for publish is expected + should.assertTrue(messageCount.compareAndSet(0, 1)); + } + else if (messageCount.get() == 1) { + // got message, then unregister the handler + should.assertFalse(frame.containsKey("error")); + JsonObject result = frame.getJsonObject("result"); + should.assertEquals("Vert.x", result.getJsonObject("body").getString("value")); + + request("unregister", "#backtrack", new JsonObject().put("address", address), + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame2 = handler.bodyAsJsonObject(); + if (messageCount2.get() == 0) { + // ACK for publish message + should.assertFalse(frame2.containsKey("error")); + should.assertTrue(frame2.containsKey("result")); + should.assertEquals("#backtrack", frame2.getValue("id")); + // increment message count so that next time reply for echo message is expected + should.assertTrue(messageCount2.compareAndSet(0, 1)); + } + else { + // ACK for unregister message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time error reply for send message is expected + should.assertTrue(messageCount.compareAndSet(3, 4)); + + request("send", "#backtrack", new JsonObject().put("address", address).put("body", + new JsonObject().put("value", "This will fail anyway!")), buffer1 -> { + }); + } + }).onFailure(should::fail)); + } + else { + // TODO: Check error handling of bridge for consistency + // consumer on 'test' has been unregistered, send message will fail. + should.assertTrue(frame.containsKey("error")); + JsonObject error = frame.getJsonObject("error"); + should.assertEquals(-1, error.getInteger("code")); + should.assertEquals("No handlers for address test", error.getString("message")); + + client.close(); + test.complete(); + } + }); + + request("register", "#backtrack", new JsonObject().put("address", address), + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame = handler.bodyAsJsonObject(); + // ACK for publish message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time reply for echo message is expected + should.assertTrue(messageCount.compareAndSet(1, 2)); + }).onFailure(should::fail)); + + request("publish", "#backtrack", + new JsonObject().put("address", address).put("body", new JsonObject().put("value", "Vert.x")), buffer -> { + }); + } + + @Test(timeout = 10_000L) + public void testSendPing(TestContext should) { + final Vertx vertx = rule.vertx(); + WebClient client = WebClient.create(vertx); + final Async test = should.async(); + + request("ping", "#backtrack", + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame = handler.bodyAsJsonObject(); + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + should.assertEquals("pong", frame.getString("result")); + client.close(); + test.complete(); + }).onFailure(should::fail)); + } + + @Test(timeout = 10_000L) + public void testNoAddress(TestContext should) { + final Vertx vertx = rule.vertx(); + + WebClient client = WebClient.create(vertx); + final Async test = should.async(); + final AtomicBoolean errorOnce = new AtomicBoolean(false); + + request("send", "#backtrack", + buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> { + JsonObject frame = handler.bodyAsJsonObject(); + if (!errorOnce.compareAndSet(false, true)) { + should.fail("Client gets error message twice!"); + } + else { + should.assertTrue(frame.containsKey("error")); + should.assertEquals("invalid_parameters", frame.getJsonObject("error").getString("message")); + vertx.setTimer(200, l -> { + client.close(); + test.complete(); + }); + } + }).onFailure(should::fail)); + } + +} diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/InteropWebSocketServer.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/InteropWebSocketServer.java new file mode 100644 index 0000000..3b8b519 --- /dev/null +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/InteropWebSocketServer.java @@ -0,0 +1,64 @@ +package io.vertx.ext.eventbus.bridge.tcp; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.bridge.PermittedOptions; +import io.vertx.ext.eventbus.bridge.tcp.impl.HttpJsonRPCStreamEventBusBridgeImpl; + +public class InteropWebSocketServer extends AbstractVerticle { + + // To test just run this application from the IDE and then open the browser on http://localhost:8080 + // later we can also automate this with a vert.x web client, I'll show you next week how to bootstrap it. + public static void main(String[] args) { + Vertx.vertx().deployVerticle(new InteropWebSocketServer()); + } + + @Override + public void start(Promise start) { + // just to have some messages flowing around + vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value")))); + vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body())); + vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi"))); + + HttpJsonRPCStreamEventBusBridgeImpl bridge = (HttpJsonRPCStreamEventBusBridgeImpl) JsonRPCStreamEventBusBridge.httpSocketHandler( + vertx, + new JsonRPCBridgeOptions() + .addInboundPermitted(new PermittedOptions().setAddress("hello")) + .addInboundPermitted(new PermittedOptions().setAddress("echo")) + .addInboundPermitted(new PermittedOptions().setAddress("test")) + .addOutboundPermitted(new PermittedOptions().setAddress("echo")) + .addOutboundPermitted(new PermittedOptions().setAddress("test")) + .addOutboundPermitted(new PermittedOptions().setAddress("ping")), + null + ); + + vertx + .createHttpServer() + .requestHandler(req -> { + // this is where any http request will land + // serve the base HTML application + if ("/".equals(req.path())) { + req.response() + .putHeader(HttpHeaders.CONTENT_TYPE, "text/html") + .sendFile("ws.html"); + } else if ("/jsonrpc".equals(req.path())){ + bridge.handle(req); + } else if ("/jsonrpc-sse".equals(req.path())) { + JsonObject params = new JsonObject().put("params", new JsonObject().put("address", "ping")); + bridge.handleSSE(req, (int) (Math.random() * 100_000), params); + } else { + req.response().setStatusCode(404).end("Not Found"); + } + }) + .listen(8080) + .onFailure(start::fail) + .onSuccess(server -> { + System.out.println("Server listening at http://localhost:8080"); + start.complete(); + }); + } +} diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/StreamParserTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/StreamParserTest.java new file mode 100644 index 0000000..8db0ff0 --- /dev/null +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/StreamParserTest.java @@ -0,0 +1,61 @@ +package io.vertx.ext.eventbus.bridge.tcp; + +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.eventbus.bridge.tcp.impl.StreamParser; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(VertxUnitRunner.class) +public class StreamParserTest { + + @Rule + public RunTestOnContext rule = new RunTestOnContext(); + + @Test(timeout = 30_000) + public void testParseSimple(TestContext should) { + final Async test = should.async(); + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + // extra line feed and carriage return are ignored + should.assertEquals("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"hi\"}", body.toString()); + test.complete(); + }); + + parser.handle(Buffer.buffer( + "\r\n" + + "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"hi\"}")); + } + + @Test(timeout = 30_000) + public void testParseSimpleWithPreambleFail(TestContext should) { + final Async test = should.async(); + final StreamParser parser = new StreamParser() + .exceptionHandler(err -> test.complete()) + .handler(body -> should.fail("There is something else than JSON in the preamble of the buffer")); + + parser.handle(Buffer.buffer( + "Content-Length: 38\r\n" + + "Content-Type: application/vscode-jsonrpc;charset=utf-8\r\n" + + "\r\n" + + "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"hi\"}")); + } + + @Test(timeout = 30_000) + public void testParseSimpleHeaderless(TestContext should) { + final Async test = should.async(); + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + System.out.println(body.toString()); + test.complete(); + }); + + parser.handle(Buffer.buffer("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"hi\"}\r\n")); + } +} diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TCPJsonRPCStreamEventBusBridgeImplTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TCPJsonRPCStreamEventBusBridgeImplTest.java new file mode 100644 index 0000000..403dd9e --- /dev/null +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TCPJsonRPCStreamEventBusBridgeImplTest.java @@ -0,0 +1,677 @@ +/* + * Copyright 2015 Red Hat, Inc. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.ext.eventbus.bridge.tcp; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetSocket; +import io.vertx.ext.bridge.PermittedOptions; +import io.vertx.ext.eventbus.bridge.tcp.impl.StreamParser; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.*; + +@RunWith(VertxUnitRunner.class) +public class TCPJsonRPCStreamEventBusBridgeImplTest { + + @Rule + public RunTestOnContext rule = new RunTestOnContext(); + + private final Handler> eventHandler = event -> event.complete(true); + + @Before + public void before(TestContext should) { + final Async test = should.async(); + final Vertx vertx = rule.vertx(); + + vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value")))); + + vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body())); + + vertx.setPeriodic(1000, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi"))); + + vertx.createNetServer() + .connectHandler(JsonRPCStreamEventBusBridge.netSocketHandler( + vertx, + new JsonRPCBridgeOptions() + .addInboundPermitted(new PermittedOptions().setAddress("hello")) + .addInboundPermitted(new PermittedOptions().setAddress("echo")) + .addInboundPermitted(new PermittedOptions().setAddress("test")) + .addOutboundPermitted(new PermittedOptions().setAddress("echo")) + .addOutboundPermitted(new PermittedOptions().setAddress("test")) + .addOutboundPermitted(new PermittedOptions().setAddress("ping")), + eventHandler)) + .listen(7000, res -> { + should.assertTrue(res.succeeded()); + test.complete(); + }); + } + + @Test(timeout = 10_000L) + public void testSendVoidMessage(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + + vertx.eventBus().consumer("test", (Message msg) -> { + client.close(); + test.complete(); + }); + + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + request("send", new JsonObject().put("address", "test").put("body", new JsonObject().put("value", "vert.x")), socket::write); + })); + } + + @Test(timeout = 10_000L) + public void testNoHandlers(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + + final StreamParser parser = new StreamParser() + .handler(body -> { + JsonObject frame = new JsonObject(body); + + should.assertTrue(frame.containsKey("error")); + should.assertFalse(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + client.close(); + test.complete(); + }).exceptionHandler(should::fail); + + socket.handler(parser); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", "test") + .put("body", new JsonObject().put("value", "vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testErrorReply(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + + vertx.eventBus().consumer("test", (Message msg) -> { + msg.fail(0, "oops!"); + }); + + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + should.assertTrue(frame.containsKey("error")); + should.assertFalse(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + client.close(); + test.complete(); + }); + + socket.handler(parser); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", "test") + .put("body", new JsonObject().put("value", "vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testSendsFromOtherSideOfBridge(TestContext should) { + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + + final AtomicBoolean ack = new AtomicBoolean(false); + + // 2 replies will arrive: + // 1). acknowledge register + // 2). greeting + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + if (!ack.getAndSet(true)) { + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + } else { + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject result = frame.getJsonObject("result"); + + should.assertEquals("hi", result.getJsonObject("body").getString("value")); + client.close(); + test.complete(); + } + }); + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", "ping"), + socket::write + ); + })); + + } + + @Test(timeout = 10_000L) + public void testSendMessageWithReplyBacktrack(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject result = frame.getJsonObject("result"); + + should.assertEquals("Hello vert.x", result.getJsonObject("body").getString("value")); + client.close(); + test.complete(); + }); + + socket.handler(parser); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", "hello") + .put("body", new JsonObject().put("value", "vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testSendMessageWithReplyBacktrackTimeout(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + + // This does not reply and will provoke a timeout + vertx.eventBus().consumer("test", (Message msg) -> { /* Nothing! */ }); + + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + should.assertTrue(frame.containsKey("error")); + should.assertFalse(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject error = frame.getJsonObject("error"); + + should.assertEquals("Timed out after waiting 100(ms) for a reply. address: __vertx.reply.1, repliedAddress: test", error.getString("message")); + should.assertEquals(-1, error.getInteger("code")); + + client.close(); + test.complete(); + }); + + socket.handler(parser); + + JsonObject headers = new JsonObject().put("timeout", 100L); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", "test") + .put("headers", headers) + .put("body", new JsonObject().put("value", "vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testSendMessageWithDuplicateReplyID(TestContext should) { + // replies must always return to the same origin + + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + + vertx.eventBus().consumer("third-party-receiver", msg -> should.fail()); + + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + client.close(); + test.complete(); + }); + + socket.handler(parser); + + + request( + "send", + "third-party-receiver", + new JsonObject() + .put("address", "hello") + .put("body", new JsonObject().put("value", "vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testRegister(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + final AtomicInteger messageCount = new AtomicInteger(0); + + // 3 messages will arrive + // 1) ACK for register message + // 2) ACK for publish message + // 3) MESSAGE for echo + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + if (messageCount.get() == 0) { + // ACK for register message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time ACK for publish is expected + should.assertTrue(messageCount.compareAndSet(0, 1)); + } + else if (messageCount.get() == 1) { + // ACK for publish message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time reply for echo message is expected + should.assertTrue(messageCount.compareAndSet(1, 2)); + } else { + // reply for echo message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject result = frame.getJsonObject("result"); + + should.assertEquals("Vert.x", result.getJsonObject("body").getString("value")); + client.close(); + test.complete(); + } + }); + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", "echo"), + socket::write + ); + + // now try to publish a message so it gets delivered both to the consumer registred on the startup and to this + // remote consumer + + request( + "publish", + "#backtrack", + new JsonObject() + .put("address", "echo") + .put("body", new JsonObject().put("value", "Vert.x")), + socket::write + ); + })); + + } + + @Test(timeout = 10_000L) + public void testUnRegister(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + + final String address = "test"; + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + + // 4 replies will arrive: + // 1). ACK for register + // 2). ACK for publish + // 3). message published to test + // 4). err of NO_HANDLERS because of consumer for 'test' is unregistered. + final AtomicInteger messageCount = new AtomicInteger(0); + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + if (messageCount.get() == 0) { + // ACK for register message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time ACK for publish is expected + should.assertTrue(messageCount.compareAndSet(0, 1)); + } + else if (messageCount.get() == 1) { + // ACK for publish message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time reply for echo message is expected + should.assertTrue(messageCount.compareAndSet(1, 2)); + } else if (messageCount.get() == 2) { + // got message, then unregister the handler + should.assertFalse(frame.containsKey("error")); + JsonObject result = frame.getJsonObject("result"); + should.assertEquals("Vert.x", result.getJsonObject("body").getString("value")); + + // increment message count so that next time ACK for unregister is expected + should.assertTrue(messageCount.compareAndSet(2, 3)); + + request("unregister", "#backtrack", new JsonObject().put("address", address), socket::write); + } else if (messageCount.get() == 3) { + // ACK for unregister message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time error reply for send message is expected + should.assertTrue(messageCount.compareAndSet(3, 4)); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", address) + .put("body", new JsonObject().put("value", "This will fail anyway!")), + socket::write + ); + } else { + // TODO: Check error handling of bridge for consistency + // consumer on 'test' has been unregistered, send message will fail. + should.assertTrue(frame.containsKey("error")); + JsonObject error = frame.getJsonObject("error"); + should.assertEquals(-1, error.getInteger("code")); + should.assertEquals("No handlers for address test", error.getString("message")); + + client.close(); + test.complete(); + } + }); + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", address), + socket::write + ); + + request( + "publish", + "#backtrack", + new JsonObject() + .put("address", address) + .put("body", new JsonObject().put("value", "Vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testReplyFromClient(TestContext should) { + // Send a request from java and get a response from the client + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + final String address = "test"; + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + + final AtomicBoolean ack = new AtomicBoolean(false); + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + if (!ack.getAndSet(true)) { + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + } else { + JsonObject result = frame.getJsonObject("result"); + should.assertEquals("Vert.x", result.getJsonObject("body").getString("value")); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", result.getString("replyAddress")) + .put("body", new JsonObject().put("value", "You got it")), + socket::write + ); + } + }); + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", address), + socket::write + ); + + // There is now way to know that the register actually happened, wait a bit before sending. + vertx.setTimer(500L, timerId -> { + vertx.eventBus().request(address, new JsonObject().put("value", "Vert.x"), respMessage -> { + should.assertTrue(respMessage.succeeded()); + should.assertEquals("You got it", respMessage.result().body().getString("value")); + client.close(); + test.complete(); + }); + }); + + })); + + } + + @Test(timeout = 10_000L) + public void testFailFromClient(TestContext should) { + // Send a request from java and get a response from the client + final Vertx vertx = rule.vertx(); + + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + final String address = "test"; + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + + final AtomicBoolean ack = new AtomicBoolean(false); + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + if (!ack.getAndSet(true)) { + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + } else { + JsonObject result = frame.getJsonObject("result"); + should.assertEquals("Vert.x", result.getJsonObject("body").getString("value")); + + request( + "send", + null, + new JsonObject() + .put("address", result.getString("replyAddress")) + .put("error", new JsonObject().put("failureCode", 1234).put("message", "ooops!")), + socket::write + ); + } + }); + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", address), + socket::write + ); + + // There is now way to know that the register actually happened, wait a bit before sending. + vertx.setTimer(500L, timerId -> { + vertx.eventBus().request(address, new JsonObject().put("value", "Vert.x"), respMessage -> { + should.assertTrue(respMessage.failed()); + should.assertEquals("ooops!", respMessage.cause().getMessage()); + client.close(); + test.complete(); + }); + }); + })); + } + + @Test(timeout = 10_000L) + public void testSendPing(TestContext should) { + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + // MESSAGE for ping + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + should.assertEquals("pong", frame.getString("result")); + client.close(); + test.complete(); + }); + + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + socket.handler(parser); + request( + "ping", + "#backtrack", + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testNoAddress(TestContext should) { + final Vertx vertx = rule.vertx(); + + NetClient client = vertx.createNetClient(); + final Async test = should.async(); + final AtomicBoolean errorOnce = new AtomicBoolean(false); + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + if (!errorOnce.compareAndSet(false, true)) { + should.fail("Client gets error message twice!"); + } else { + should.assertTrue(frame.containsKey("error")); + should.assertEquals("invalid_parameters", frame.getJsonObject("error").getString("message")); + vertx.setTimer(200, l -> { + client.close(); + test.complete(); + }); + } + }); + client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> { + socket.handler(parser); + request( + "send", + "#backtrack", + socket::write + ); + })); + } + +} diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java index 175c3e8..7415c8e 100644 --- a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java @@ -67,11 +67,11 @@ public void before(TestContext context) { .setSsl(true) .setTrustStoreOptions(sslKeyPairCerts.getServerTrustStore()) .setKeyStoreOptions(sslKeyPairCerts.getServerKeyStore()), - be -> { + (BridgeEvent be) -> { logger.info("Handled a bridge event " + be.getRawMessage()); if (be.socket().isSsl()) { try { - for (Certificate c : be.socket().peerCertificates()) { + for (Certificate c : be.socket().peerCertificates()) { logger.info(((X509Certificate)c).getSubjectDN().toString()); } } catch (SSLPeerUnverifiedException e) { diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/ValidatorTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/ValidatorTest.java new file mode 100644 index 0000000..7e21306 --- /dev/null +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/ValidatorTest.java @@ -0,0 +1,60 @@ +package io.vertx.ext.eventbus.bridge.tcp; + + +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.json.schema.*; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import static org.junit.Assert.assertTrue; + +@RunWith(VertxUnitRunner.class) + public class ValidatorTest { + + @Rule + public RunTestOnContext rule = new RunTestOnContext(); + + @Test + public void testValidateSingle() { + String path = "io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.scehma.json"; + + Validator validator = Validator.create( + JsonSchema.of(new JsonObject(rule.vertx().fileSystem().readFileBlocking(path))), + new JsonSchemaOptions() + .setDraft(Draft.DRAFT202012) + .setBaseUri("https://vertx.io") + ); + + JsonObject rpc = new JsonObject("{\"jsonrpc\": \"2.0\", \"method\": \"subtract\", \"params\": [42, 23], \"id\": 1}"); + + assertTrue(validator.validate(rpc).getValid()); + } + + @Test + public void testValidateBatch() { + String path = "io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.scehma.json"; + + Validator validator = Validator.create( + JsonSchema.of(new JsonObject(rule.vertx().fileSystem().readFileBlocking(path))), + new JsonSchemaOptions() + .setOutputFormat(OutputFormat.Basic) + .setDraft(Draft.DRAFT202012) + .setBaseUri("https://vertx.io") + ); + + JsonArray rpc = new JsonArray("[\n" + + " {\"jsonrpc\": \"2.0\", \"method\": \"sum\", \"params\": [1,2,4], \"id\": \"1\"},\n" + + " {\"jsonrpc\": \"2.0\", \"method\": \"notify_hello\", \"params\": [7]},\n" + + " {\"jsonrpc\": \"2.0\", \"method\": \"subtract\", \"params\": [42,23], \"id\": \"2\"},\n" + + " {\"jsonrpc\": \"2.0\", \"method\": \"foo.get\", \"params\": {\"name\": \"myself\"}, \"id\": \"5\"},\n" + + " {\"jsonrpc\": \"2.0\", \"method\": \"get_data\", \"id\": \"9\"} \n" + + " ]"); + + assertTrue(validator.validate(rpc).getValid()); + } + + } diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/WebsocketJsonRPCStreamEventBusBridgeImplTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/WebsocketJsonRPCStreamEventBusBridgeImplTest.java new file mode 100644 index 0000000..abe0276 --- /dev/null +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/WebsocketJsonRPCStreamEventBusBridgeImplTest.java @@ -0,0 +1,682 @@ +/* + * Copyright 2015 Red Hat, Inc. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.ext.eventbus.bridge.tcp; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.WebSocketBase; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.bridge.PermittedOptions; +import io.vertx.ext.eventbus.bridge.tcp.impl.StreamParser; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.*; + +@RunWith(VertxUnitRunner.class) +public class WebsocketJsonRPCStreamEventBusBridgeImplTest { + + + @Rule + public RunTestOnContext rule = new RunTestOnContext(); + + private final Handler> eventHandler = event -> event.complete(true); + + @Before + public void before(TestContext should) { + final Async test = should.async(); + final Vertx vertx = rule.vertx(); + + vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value")))); + + vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body())); + + vertx.setPeriodic(1000, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi"))); + + final Handler bridge = JsonRPCStreamEventBusBridge.webSocketHandler( + vertx, + new JsonRPCBridgeOptions() + .addInboundPermitted(new PermittedOptions().setAddress("hello")) + .addInboundPermitted(new PermittedOptions().setAddress("echo")) + .addInboundPermitted(new PermittedOptions().setAddress("test")) + .addOutboundPermitted(new PermittedOptions().setAddress("echo")) + .addOutboundPermitted(new PermittedOptions().setAddress("test")) + .addOutboundPermitted(new PermittedOptions().setAddress("ping")), + eventHandler + ); + + vertx + .createHttpServer() + .webSocketHandler(bridge::handle) + .listen(7000, res -> { + should.assertTrue(res.succeeded()); + test.complete(); + }); + } + + @Test(timeout = 10_000L) + public void testSendVoidMessage(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + + vertx.eventBus().consumer("test", (Message msg) -> { + client.close(); + test.complete(); + }); + + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> + request("send", new JsonObject().put("address", "test").put("body", new JsonObject().put("value", "vert.x")), socket::write) + )); + } + + @Test(timeout = 10_000L) + public void testNoHandlers(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + + final StreamParser parser = new StreamParser() + .handler(body -> { + JsonObject frame = new JsonObject(body); + + should.assertTrue(frame.containsKey("error")); + should.assertFalse(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + client.close(); + test.complete(); + }).exceptionHandler(should::fail); + + socket.handler(parser); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", "test") + .put("body", new JsonObject().put("value", "vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testErrorReply(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + + vertx.eventBus().consumer("test", (Message msg) -> { + msg.fail(0, "oops!"); + }); + + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + should.assertTrue(frame.containsKey("error")); + should.assertFalse(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + client.close(); + test.complete(); + }); + + socket.handler(parser); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", "test") + .put("body", new JsonObject().put("value", "vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testSendsFromOtherSideOfBridge(TestContext should) { + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + + final AtomicBoolean ack = new AtomicBoolean(false); + + // 2 replies will arrive: + // 1). acknowledge register + // 2). greeting + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + if (!ack.getAndSet(true)) { + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + } else { + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject result = frame.getJsonObject("result"); + + should.assertEquals("hi", result.getJsonObject("body").getString("value")); + client.close(); + test.complete(); + } + }); + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", "ping"), + socket::write + ); + })); + + } + + @Test(timeout = 10_000L) + public void testSendMessageWithReplyBacktrack(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject result = frame.getJsonObject("result"); + + should.assertEquals("Hello vert.x", result.getJsonObject("body").getString("value")); + client.close(); + test.complete(); + }); + + socket.handler(parser); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", "hello") + .put("body", new JsonObject().put("value", "vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testSendMessageWithReplyBacktrackTimeout(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + + // This does not reply and will provoke a timeout + vertx.eventBus().consumer("test", (Message msg) -> { /* Nothing! */ }); + + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + should.assertTrue(frame.containsKey("error")); + should.assertFalse(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject error = frame.getJsonObject("error"); + + should.assertEquals("Timed out after waiting 100(ms) for a reply. address: __vertx.reply.1, repliedAddress: test", error.getString("message")); + should.assertEquals(-1, error.getInteger("code")); + + client.close(); + test.complete(); + }); + + socket.handler(parser); + + JsonObject headers = new JsonObject().put("timeout", 100L); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", "test") + .put("headers", headers) + .put("body", new JsonObject().put("value", "vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testSendMessageWithDuplicateReplyID(TestContext should) { + // replies must always return to the same origin + + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + + vertx.eventBus().consumer("third-party-receiver", msg -> should.fail()); + + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + client.close(); + test.complete(); + }); + + socket.handler(parser); + + + request( + "send", + "third-party-receiver", + new JsonObject() + .put("address", "hello") + .put("body", new JsonObject().put("value", "vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testRegister(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + final AtomicInteger messageCount = new AtomicInteger(0); + + // 3 messages will arrive + // 1) ACK for register message + // 2) ACK for publish message + // 3) MESSAGE for echo + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + if (messageCount.get() == 0) { + // ACK for register message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time ACK for publish is expected + should.assertTrue(messageCount.compareAndSet(0, 1)); + } + else if (messageCount.get() == 1) { + // ACK for publish message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time reply for echo message is expected + should.assertTrue(messageCount.compareAndSet(1, 2)); + } else { + // reply for echo message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + JsonObject result = frame.getJsonObject("result"); + + should.assertEquals("Vert.x", result.getJsonObject("body").getString("value")); + client.close(); + test.complete(); + } + }); + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", "echo"), + socket::write + ); + + // now try to publish a message so it gets delivered both to the consumer registred on the startup and to this + // remote consumer + + request( + "publish", + "#backtrack", + new JsonObject() + .put("address", "echo") + .put("body", new JsonObject().put("value", "Vert.x")), + socket::write + ); + })); + + } + + @Test(timeout = 10_000L) + public void testUnRegister(TestContext should) { + // Send a request and get a response + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + + final String address = "test"; + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + + // 4 replies will arrive: + // 1). ACK for register + // 2). ACK for publish + // 3). message published to test + // 4). err of NO_HANDLERS because of consumer for 'test' is unregistered. + final AtomicInteger messageCount = new AtomicInteger(0); + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + if (messageCount.get() == 0) { + // ACK for register message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time ACK for publish is expected + should.assertTrue(messageCount.compareAndSet(0, 1)); + } + else if (messageCount.get() == 1) { + // ACK for publish message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time reply for echo message is expected + should.assertTrue(messageCount.compareAndSet(1, 2)); + } else if (messageCount.get() == 2) { + // got message, then unregister the handler + should.assertFalse(frame.containsKey("error")); + JsonObject result = frame.getJsonObject("result"); + should.assertEquals("Vert.x", result.getJsonObject("body").getString("value")); + + // increment message count so that next time ACK for unregister is expected + should.assertTrue(messageCount.compareAndSet(2, 3)); + + request("unregister", "#backtrack", new JsonObject().put("address", address), socket::write); + } else if (messageCount.get() == 3) { + // ACK for unregister message + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + // increment message count so that next time error reply for send message is expected + should.assertTrue(messageCount.compareAndSet(3, 4)); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", address) + .put("body", new JsonObject().put("value", "This will fail anyway!")), + socket::write + ); + } else { + // TODO: Check error handling of bridge for consistency + // consumer on 'test' has been unregistered, send message will fail. + should.assertTrue(frame.containsKey("error")); + JsonObject error = frame.getJsonObject("error"); + should.assertEquals(-1, error.getInteger("code")); + should.assertEquals("No handlers for address test", error.getString("message")); + + client.close(); + test.complete(); + } + }); + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", address), + socket::write + ); + + request( + "publish", + "#backtrack", + new JsonObject() + .put("address", address) + .put("body", new JsonObject().put("value", "Vert.x")), + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testReplyFromClient(TestContext should) { + // Send a request from java and get a response from the client + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + final String address = "test"; + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + + final AtomicBoolean ack = new AtomicBoolean(false); + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + if (!ack.getAndSet(true)) { + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + } else { + JsonObject result = frame.getJsonObject("result"); + should.assertEquals("Vert.x", result.getJsonObject("body").getString("value")); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", result.getString("replyAddress")) + .put("body", new JsonObject().put("value", "You got it")), + socket::write + ); + } + }); + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", address), + socket::write + ); + + // There is now way to know that the register actually happened, wait a bit before sending. + vertx.setTimer(500L, timerId -> { + vertx.eventBus().request(address, new JsonObject().put("value", "Vert.x"), respMessage -> { + should.assertTrue(respMessage.succeeded()); + should.assertEquals("You got it", respMessage.result().body().getString("value")); + client.close(); + test.complete(); + }); + }); + + })); + + } + + @Test(timeout = 10_000L) + public void testFailFromClient(TestContext should) { + // Send a request from java and get a response from the client + final Vertx vertx = rule.vertx(); + + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + final String address = "test"; + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + + final AtomicBoolean ack = new AtomicBoolean(false); + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + if (!ack.getAndSet(true)) { + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + } else { + JsonObject result = frame.getJsonObject("result"); + should.assertEquals("Vert.x", result.getJsonObject("body").getString("value")); + + request( + "send", + null, + new JsonObject() + .put("address", result.getString("replyAddress")) + .put("error", new JsonObject().put("failureCode", 1234).put("message", "ooops!")), + socket::write + ); + } + }); + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", address), + socket::write + ); + + // There is now way to know that the register actually happened, wait a bit before sending. + vertx.setTimer(500L, timerId -> { + vertx.eventBus().request(address, new JsonObject().put("value", "Vert.x"), respMessage -> { + should.assertTrue(respMessage.failed()); + should.assertEquals("ooops!", respMessage.cause().getMessage()); + client.close(); + test.complete(); + }); + }); + })); + } + + @Test(timeout = 10_000L) + public void testSendPing(TestContext should) { + final Vertx vertx = rule.vertx(); + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + // MESSAGE for ping + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + + should.assertFalse(frame.containsKey("error")); + should.assertTrue(frame.containsKey("result")); + should.assertEquals("#backtrack", frame.getValue("id")); + + should.assertEquals("pong", frame.getString("result")); + client.close(); + test.complete(); + }); + + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + socket.handler(parser); + request( + "ping", + "#backtrack", + socket::write + ); + })); + } + + @Test(timeout = 10_000L) + public void testNoAddress(TestContext should) { + final Vertx vertx = rule.vertx(); + + HttpClient client = vertx.createHttpClient(); + final Async test = should.async(); + final AtomicBoolean errorOnce = new AtomicBoolean(false); + final StreamParser parser = new StreamParser() + .exceptionHandler(should::fail) + .handler(body -> { + JsonObject frame = new JsonObject(body); + if (!errorOnce.compareAndSet(false, true)) { + should.fail("Client gets error message twice!"); + } else { + should.assertTrue(frame.containsKey("error")); + should.assertEquals("invalid_parameters", frame.getJsonObject("error").getString("message")); + vertx.setTimer(200, l -> { + client.close(); + test.complete(); + }); + } + }); + client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> { + socket.handler(parser); + request( + "send", + "#backtrack", + socket::write + ); + })); + } + +}