- Maven
-
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-eventbus-bridge-grpc</artifactId> <version>5.1.0</version> </dependency> - Gradle
-
compile 'io.vertx:vertx-eventbus-bridge-grpc:5.1.0'
Vert.x gRPC EventBus Bridge
Vert.x gRPC EventBus Bridge is a gRPC bridge to Vert.x EventBus.
To use this project, add the following dependency to the dependencies section of your build descriptor:
The gRPC EventBus Bridge is built on top of gRPC, meaning that any application that can create gRPC clients can interact with a remote Vert.x instance via its event bus. The main use case for the gRPC bridge is for applications that need a standardized, high-performance, and language-agnostic way to communicate with a Vert.x EventBus.
gRPC provides a modern, open-source RPC framework that uses HTTP/2 for transport, Protocol Buffers as the interface description language.
The bridge also supports gRPC-Web, allowing web clients to communicate with the gRPC server.
Creating a gRPC EventBus Bridge Server
You can create a gRPC EventBus Bridge server like this.
GrpcBridgeOptions options = new GrpcBridgeOptions()
.addInboundPermitted(new PermittedOptions().setAddress("hello"))
.addInboundPermitted(new PermittedOptions().setAddress("echo"))
.addOutboundPermitted(new PermittedOptions().setAddress("news"));
// Create the bridge
GrpcEventBusBridge bridge = GrpcEventBusBridge.create(vertx, options);
// Create the gRPC server
GrpcServer grpcServer = GrpcServer.server(vertx).addService(bridge);
// Add the server
grpcServer.addService(bridge);
HttpServer server = vertx
.createHttpServer(new HttpServerOptions()
.setSsl(true)
.setKeyCertOptions(new JksOptions()
.setPath("/path/to/keycert.jks")
.setPassword("the-password"))
)
.requestHandler(grpcServer);
// Start the bridge
server.listen(443).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("gRPC EventBus Bridge started");
} else {
System.err.println("Failed to start gRPC EventBus Bridge: " + ar.cause());
}
}); Creating the bridge involves a few steps:
-
Creating the bridge with the proper inbound/outbound permissions
-
Creating the gRPC server
-
Adding the bridge to the gRPC server
-
Creating the HTTP server hosting the gRPC server
| You can refer to the Vert.x gRPC documentation and Vert.x Core documentation to learn more about Vert.x gRPC and HTTP servers. |
Creating a Vert.x gRPC Client
Here’s how to create a gRPC client to connect to the bridge:
GrpcClient client = GrpcClient.client(vertx);
SocketAddress socketAddress = SocketAddress.inetSocketAddress(7000, "localhost");
EventBusBridgeGrpcClient bridgeClient = EventBusBridgeGrpcClient.create(client, socketAddress); Interacting with the bridge
This section describes the interaction patterns with the gRPC bridge using the Vert.x gRPC Client.
Those patterns do not depend on the Vert.x gRPC Client. You can apply them to your own usage of gRPC with the EventBus rpc and messages defined by eventbus.proto. |
Sending Messages
To send a message to an address:
JsonObject message = new JsonObject().put("value", "Hello from gRPC client");
// Convert to Protobuf Struct
JsonValue messageBody = JsonValue.newBuilder().setText(message.encode()).build();
// Create the request
SendOp request = SendOp.newBuilder()
.setAddress("hello")
.setBody(messageBody)
.build();
// Send the message
grpcClient.send(request).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Message sent successfully");
} else {
System.err.println("Failed to send message: " + ar.cause());
}
}); Request-Response Pattern
To send a request and receive a response:
JsonObject message = new JsonObject().put("value", "Hello from gRPC client");
// Convert to JsonValue
JsonValue messageBody = JsonValue.newBuilder().setText(message.encode()).build();
// Create the request with timeout
RequestOp request = RequestOp.newBuilder()
.setAddress("hello")
.setBody(messageBody)
.setReplyBodyFormatValue(JsonValueFormat.text_VALUE) // Ask for encoded strings
.setTimeout(Duration.newBuilder().setSeconds(10).build()) // 10 seconds timeout
.build();
// Send the request
grpcClient.request(request).onComplete(ar -> {
if (ar.succeeded()) {
EventBusMessage response = ar.result();
// Convert Protobuf Struct to JsonObject
Object responseBody = Json.decodeValue(response.getBody().getText());
System.out.println("Received response: " + responseBody);
} else {
System.err.println("Request failed: " + ar.cause());
}
}); Publishing Messages
To publish a message to all subscribers:
JsonObject message = new JsonObject().put("value", "Broadcast message");
// Convert to JsonValue
JsonValue messageBody = JsonValue.newBuilder().setText(message.encode()).build();
// Create the request
PublishOp request = PublishOp.newBuilder()
.setAddress("news")
.setBody(messageBody)
.build();
// Publish the message
grpcClient.publish(request).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Message published successfully");
} else {
System.err.println("Failed to publish message: " + ar.cause());
}
}); Subscribing to Messages
To subscribe to messages from an address:
SubscribeOp request = SubscribeOp.newBuilder()
.setAddress("news")
.setMessageBodyFormatValue(JsonValueFormat.text_VALUE) // Ask for encoded strings
.build();
// Subscribe to the address
grpcClient.subscribe(request).onComplete(ar -> {
if (ar.succeeded()) {
// Get the stream
ReadStream<EventBusMessage> stream = ar.result();
// Set a handler for incoming messages
stream.handler(message -> {
// Store the consumer ID for later unsubscribing
String consumerId = message.getConsumerId();
// Convert Protobuf Struct to JsonObject
Object messageBody = Json.decodeValue(message.getBody().getText());
System.out.println("Received message: " + messageBody);
});
// Handle errors
stream.exceptionHandler(err -> {
System.err.println("Stream error: " + err.getMessage());
});
} else {
System.err.println("Failed to subscribe: " + ar.cause());
}
}); Unsubscribing from Messages
To unsubscribe from an address:
UnsubscribeOp request = UnsubscribeOp.newBuilder()
.setConsumerId(consumerId) // The consumer ID received in the subscription
.build();
// Unsubscribe
grpcClient.unsubscribe(request).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Unsubscribed successfully");
} else {
System.err.println("Failed to unsubscribe: " + ar.cause());
}
}); Replying to message
When an EventBusMessage carries a reply address, the sender of this message expects a reply.
The reply address can be used to send the reply.
streamOfMessages.handler(message -> {
String replyAddress = message.getReplyAddress();
// Reply to the sender
SendOp reply = SendOp.newBuilder()
.setAddress(replyAddress)
.setBody(message.getBody())
.build();
// Echo the message
grpcClient.send(reply);
}); Health Check
To perform a health check:
grpcClient.ping(Empty.getDefaultInstance()).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Bridge is healthy");
} else {
System.err.println("Bridge health check failed: " + ar.cause());
}
}); Bridge events
The GrpcEventBusBridge supports a custom bridge event handler that allows to log bridge actions or perform custom access control:
GrpcEventBusBridge bridgeService = GrpcEventBusBridge.builder(vertx)
.with(bridgeOptions)
.withEventHandler(bridgeEvent -> {
// Advanced bridge event handling
switch (bridgeEvent.type()) {
case SOCKET_CREATED:
System.out.println("New gRPC client connected");
break;
case SOCKET_CLOSED:
System.out.println("gRPC client disconnected");
break;
case SEND:
System.out.println("Message sent to: " + bridgeEvent.getRawMessage().getString("address"));
break;
case PUBLISH:
System.out.println("Message published to: " + bridgeEvent.getRawMessage().getString("address"));
break;
case RECEIVE:
System.out.println("Message received from: " + bridgeEvent.getRawMessage().getString("address"));
break;
case REGISTER:
System.out.println("Client registered for: " + bridgeEvent.getRawMessage().getString("address"));
break;
case UNREGISTER:
System.out.println("Client unregistered from: " + bridgeEvent.getRawMessage().getString("address"));
break;
}
// Always complete the event to allow it to proceed
bridgeEvent.complete(true);
})
.build(); Protocol Description
The gRPC EventBus Bridge uses Protocol Buffers to define the service and message structures. The service definition includes the following operations:
-
Publish: Publishes a message to an address (one-way communication) -
Send: Sends a message to an address (point-to-point) -
Request: Implements the request-reply pattern -
Subscribe: Subscribes to messages from an address (returns a stream of messages) -
Unsubscribe: Unsubscribes from an address -
Ping: Health check
The message structures are defined as follows:
JsonValue
The bridge handles Json values which can be one of object, array, number, string, true/false or null, the JsonValue message type is usef for this mater.
A JsonValue wraps either a string, a byte array or a google.protobuf.Value.
You can create a JsonValue from a string:
JsonValue jsonValue = JsonValue.newBuilder().setText("4").build();
JsonObject json = new JsonObject().put("name", "Julien");
jsonValue = JsonValue.newBuilder().setText(json.encode()).build(); or from a byte array:
JsonValue jsonValue = JsonValue.newBuilder().setBinary(ByteString.copyFromUtf8("4")).build();
JsonObject json = new JsonObject().put("name", "Julien");
jsonValue = JsonValue.newBuilder().setBinary(ByteString.copyFromUtf8(json.encode())).build(); or from google.protobuf.Value:
JsonValue jsonValue = JsonValue.newBuilder().setProto(
Value.newBuilder().setNumberValue(4)).build();
jsonValue = JsonValue.newBuilder().setProto(
Value.newBuilder().setStructValue(Struct
.newBuilder()
.putFields("name", Value.newBuilder().setStringValue("Julien").build()))
).build(); You can send a JsonValue using any of these formats.
When you interact with the bridge, you can specify the format you want as a client as part of the bridge interactions.
PublishOp
Used for publishing messages to an address:
message PublishOp {
// The address the message was sent to
string address = 1;
// Message headers
map<string, string> headers = 2;
// Message payload
JsonValue body = 3;
} SendOp
Used for sending messages to an address (point-to-point):
message SendOp {
// The address the message was sent to
string address = 1;
// Message headers
map<string, string> headers = 2;
// Message payload
JsonValue body = 3;
// Timeout in milliseconds
google.protobuf.Duration timeout = 4;
} RequestOp
Used for request-reply pattern:
message RequestOp {
// The address the message was sent to
string address = 1;
// Message headers
map<string, string> headers = 2;
// Message payload
JsonValue body = 3;
// The desired format of the reply body
JsonValueFormat reply_body_format = 4;
// Timeout in milliseconds
google.protobuf.Duration timeout = 5;
} SubscribeOp
Used for subscribing to an address:
message SubscribeOp {
// The address to subscribe to
string address = 1;
// Message headers
map<string, string> headers = 2;
// The desired format of the message body sent by the bridge
JsonValueFormat message_body_format = 3;
} UnsubscribeOp
Used for unsubscribing from an address:
message UnsubscribeOp {
// The consumer ID for subscription management
string consumer_id = 1;
} EventBusMessage
Used for receiving messages from the EventBus:
message EventBusMessage {
// The address the message was sent to
string address = 1;
// The consumer ID for subscription management
string consumer_id = 2;
// Optional reply address
// Presence means that the message is expecting a reply at this reply address
// This reply address shall be used in Send rpc or Request rpc
string reply_address = 3;
// Message headers
map<string, string> headers = 4;
// Message payload
JsonValue body = 5;
// Optional status for error responses
google.rpc.Status status = 6;
}