Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b588523
Initial import [broken]
pmlopes Apr 21, 2020
0e3d01c
Fix testSendPing for JsonRPCStreamEventBusBridge
lucifer4j Mar 24, 2022
a50cf41
Fix testRegister
lucifer4j Apr 2, 2022
6d00160
Fix testUnregister
lucifer4j Apr 2, 2022
282ea38
Fix testNoAddress
lucifer4j Apr 2, 2022
d475d9c
Fix testSendsFromOtherSideOfBridge
lucifer4j Apr 3, 2022
63d0b96
Fix testReplyFromClient
lucifer4j Apr 3, 2022
0a38e69
Fix testFailFromClient
lucifer4j Apr 3, 2022
923173e
Adding a PoC on using websockets
pmlopes Jun 2, 2022
a3c6e50
Update parser to not rely on line endings but message boundaries
pmlopes Jun 2, 2022
d150083
interim checkin
lucifer4j Jun 22, 2022
aaea6dd
try generifying on transport
lucifer4j Jun 14, 2022
f6031f5
Prevent crash when trying to parse CLOSE frames as JSON
lucifer4j Jun 23, 2022
fe5591c
Change WriteStream to Consumer in JSON helpers
lucifer4j Jun 23, 2022
f9090fe
Add option to configure binary or text mode for Websockets bridge
lucifer4j Jun 23, 2022
1b9adb9
Ignore ping frames
lucifer4j Jun 23, 2022
45e2a3d
Add json schema to validate requests
lucifer4j Jun 27, 2022
d97df7e
Validate incoming requests using vertx-json-schema
lucifer4j Jun 30, 2022
62e9822
paritally working http transport
lucifer4j Jun 30, 2022
73e6165
Read json schema from classpath
lucifer4j Jul 7, 2022
d85a7b3
Add changes to address feedback
lucifer4j Jul 12, 2022
7e54a83
Override BridgeOptions methods in JsonRPCBridgeOptions to change retu…
lucifer4j Jul 13, 2022
09eafb1
Handle register requests in http transport bridge
lucifer4j Jul 13, 2022
53b887f
Remove endHandler from HttpRequest handler
lucifer4j Jul 13, 2022
0f2d5f7
Make InteropWebSocketServer example use http transport
lucifer4j Jul 13, 2022
2080059
Remove endHandler from HttpRequest handler
lucifer4j Jul 13, 2022
7c0ec0e
Make InteropWebSocketServer example use http transport
lucifer4j Jul 13, 2022
ec257d0
Add tests for each transport
lucifer4j Jul 20, 2022
21d44a7
Add converter for JsonRPCBridgeOptions
lucifer4j Jul 20, 2022
82de124
Add test for json validator
lucifer4j Jul 20, 2022
c82d513
Add demos
lucifer4j Aug 3, 2022
1c75ffc
Use Consumer<JsonObject> instead of Consumer<Buffer> to write
lucifer4j Aug 3, 2022
af47fa7
fix demos
lucifer4j Aug 3, 2022
22c4663
fix demos
lucifer4j Aug 3, 2022
9d96974
Simplify handleSSE
lucifer4j Aug 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</parent>

<artifactId>vertx-tcp-eventbus-bridge</artifactId>
<version>4.3.2-SNAPSHOT</version>
<version>999-SNAPSHOT</version>

<name>Vert.x TCP EventBus Bridge</name>

Expand Down Expand Up @@ -54,7 +54,11 @@
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-bridge-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-json-schema</artifactId>
</dependency>

<dependency>
Expand All @@ -73,6 +77,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion src/client/nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
"scripts": {
"test": "mocha ./test/index.js"
}
}
}
60 changes: 60 additions & 0 deletions src/main/java/examples/HttpBridgeExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package examples;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.WebSocketBase;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge;

public class HttpBridgeExample extends AbstractVerticle {

public static void main(String[] args) {
Vertx.vertx().deployVerticle(new HttpBridgeExample());
}

@Override
public void start(Promise<Void> start) {
vertx.eventBus().consumer("hello", (Message<JsonObject> msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
vertx.eventBus().consumer("echo", (Message<JsonObject> msg) -> msg.reply(msg.body()));
vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));

JsonRPCBridgeOptions options = new JsonRPCBridgeOptions()
.addInboundPermitted(new PermittedOptions().setAddress("hello"))
.addInboundPermitted(new PermittedOptions().setAddress("echo"))
.addInboundPermitted(new PermittedOptions().setAddress("ping"))
.addOutboundPermitted(new PermittedOptions().setAddress("echo"))
.addOutboundPermitted(new PermittedOptions().setAddress("hello"))
.addOutboundPermitted(new PermittedOptions().setAddress("ping"));

Handler<HttpServerRequest> bridge = JsonRPCStreamEventBusBridge.httpSocketHandler(vertx, options, null);

vertx
.createHttpServer()
.requestHandler(req -> {
if ("/".equals(req.path())) {
req.response()
.putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
.sendFile("http.html");
} else if ("/jsonrpc".equals(req.path())){
bridge.handle(req);
} else {
req.response().setStatusCode(404).end("Not Found");
}
})
.listen(8080)
.onFailure(start::fail)
.onSuccess(server -> {
System.out.println("Server listening at http://localhost:8080");
start.complete();
});

}
}
66 changes: 66 additions & 0 deletions src/main/java/examples/HttpSSEBridgeExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package examples;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
import io.vertx.ext.eventbus.bridge.tcp.impl.HttpJsonRPCStreamEventBusBridgeImpl;

import java.util.function.Consumer;

public class HttpSSEBridgeExample extends AbstractVerticle {

public static void main(String[] args) {
Vertx.vertx().deployVerticle(new HttpSSEBridgeExample());
}

@Override
public void start(Promise<Void> start) {
// just to have some messages flowing around
vertx.eventBus().consumer("hello", (Message<JsonObject> msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
vertx.eventBus().consumer("echo", (Message<JsonObject> msg) -> msg.reply(msg.body()));
vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));

// use explicit class because SSE method is not on the interface currently
HttpJsonRPCStreamEventBusBridgeImpl bridge = new HttpJsonRPCStreamEventBusBridgeImpl(
vertx,
new JsonRPCBridgeOptions()
.addInboundPermitted(new PermittedOptions().setAddress("hello"))
.addInboundPermitted(new PermittedOptions().setAddress("echo"))
.addInboundPermitted(new PermittedOptions().setAddress("test"))
.addOutboundPermitted(new PermittedOptions().setAddress("echo"))
.addOutboundPermitted(new PermittedOptions().setAddress("test"))
.addOutboundPermitted(new PermittedOptions().setAddress("ping")),
event -> event.complete(true)
);

vertx
.createHttpServer()
.requestHandler(req -> {
// this is where any http request will land
// serve the base HTML application
if ("/".equals(req.path())) {
req.response()
.putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
.sendFile("sse.html");
} else if ("/jsonrpc".equals(req.path())) {
bridge.handle(req);
} else if ("/jsonrpc-sse".equals(req.path())) {
bridge.handleSSE(req, (int) (Math.random() * 100_000), new JsonObject().put("address", "ping"));
} else {
req.response().setStatusCode(404).end("Not Found");
}
})
.listen(8080)
.onFailure(start::fail)
.onSuccess(server -> {
System.out.println("Server listening at http://localhost:8080");
start.complete();
});
}
}
70 changes: 70 additions & 0 deletions src/main/java/examples/WebsocketBridgeExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package examples;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.WebSocketBase;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge;

public class WebsocketBridgeExample extends AbstractVerticle {

public static void main(String[] args) {
Vertx.vertx().deployVerticle(new WebsocketBridgeExample());
}

@Override
public void start(Promise<Void> start) {
vertx.eventBus().consumer("hello", (Message<JsonObject> msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
vertx.eventBus().consumer("echo", (Message<JsonObject> msg) -> msg.reply(msg.body()));
vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));

JsonRPCBridgeOptions options = new JsonRPCBridgeOptions()
.addInboundPermitted(new PermittedOptions().setAddress("hello"))
.addInboundPermitted(new PermittedOptions().setAddress("echo"))
.addInboundPermitted(new PermittedOptions().setAddress("ping"))
.addOutboundPermitted(new PermittedOptions().setAddress("echo"))
.addOutboundPermitted(new PermittedOptions().setAddress("hello"))
.addOutboundPermitted(new PermittedOptions().setAddress("ping"))
// if set to false, then websockets messages are received on frontend as binary frames
.setWebsocketsTextAsFrame(true);

Handler<WebSocketBase> bridge = JsonRPCStreamEventBusBridge.webSocketHandler(vertx, options, null);

vertx
.createHttpServer()
.requestHandler(req -> {
// this is where any http request will land
if ("/jsonrpc".equals(req.path())) {
// we switch from HTTP to WebSocket
req.toWebSocket()
.onFailure(err -> {
err.printStackTrace();
req.response().setStatusCode(500).end(err.getMessage());
})
.onSuccess(bridge::handle);
} else {
// serve the base HTML application
if ("/".equals(req.path())) {
req.response()
.putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
.sendFile("ws.html");
} else {
// 404 all the rest
req.response().setStatusCode(404).end("Not Found");
}
}
})
.listen(8080)
.onFailure(start::fail)
.onSuccess(server -> {
System.out.println("Server listening at http://localhost:8080");
start.complete();
});
}
}
10 changes: 7 additions & 3 deletions src/main/java/io/vertx/ext/eventbus/bridge/tcp/BridgeEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.bridge.BaseBridgeEvent;

/**
* Represents an event that occurs on the event bus bridge.
Expand All @@ -31,7 +32,7 @@
* @author <a href="http://tfox.org">Tim Fox</a>
*/
@VertxGen
public interface BridgeEvent extends io.vertx.ext.bridge.BaseBridgeEvent {
public interface BridgeEvent<T> extends BaseBridgeEvent {

/**
* Get the raw JSON message for the event. This will be null for SOCKET_CREATED or SOCKET_CLOSED events as there is
Expand All @@ -41,13 +42,16 @@ public interface BridgeEvent extends io.vertx.ext.bridge.BaseBridgeEvent {
* @return this reference, so it can be used fluently
*/
@Fluent
BridgeEvent setRawMessage(JsonObject message);
BridgeEvent<T> setRawMessage(JsonObject message);

// TODO: this will cause problems with WebSockets as they don't share any common base interface
// this will be a breaking change to users, as the return type is now generic, is this OK?

/**
* Get the SockJSSocket instance corresponding to the event
*
* @return the SockJSSocket instance
*/
@CacheReturn
NetSocket socket();
T socket();
}
Loading