curl -G https://start.vertx.io/starter.zip -d "groupId=io.vertx.howtos" -d "artifactId=protobuf-eventbus-howto" -d "packageName=io.vertx.howtos.protobuf.eventbus" -d "vertxDependencies=vertx-hazelcast" -d "jdkVersion=11" -d "buildTool=maven" --output protobuf-eventbus-howto.zip
unzip -d protobuf-eventbus-howto protobuf-eventbus-howto.zip
Exchanging generated Protobuf classes on the Event Bus
This document will show you how to exchange messages of types generated by Protocol Buffers on the Event Bus.
What you will build
You will build an application that periodically generates a greeting message. The application consists in:
-
a sender verticle which sends a
GreetingRequest
-
a receiver verticle which replies to requests with a
GreetingResponse
Create a project
Browse to https://start.vertx.io. Click on advanced options to expand the hidden panel and then change the value of the following fields:
-
Group Id: set to
io.vertx.howtos
-
Artifact Id: set to
protobuf-eventbus-howto
-
Dependencies: add
Hazelcast Cluster Manager
-
Package: set to
io.vertx.howtos.protobuf.eventbus
When you’re done, click on Generate Project and extract the generated archive content somewhere on your filesystem.
You may alternatively do this from the command line: |
Before coding, we need to make some adjustments to the build file:
-
configure a custom Vert.x
Launcher
class (that will be used as entry point when running the executable JAR) -
add a dependency to Protocol Buffers
-
configure Maven plugins to generate message classes from a
.proto
file.
Here is the content of the pom.xml
file you should be using:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.vertx.howtos</groupId>
<artifactId>protobuf-eventbus-howto</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.verticle>io.vertx.howtos.protobuf.eventbus.MainVerticle</main.verticle>
<launcher.class>io.vertx.howtos.protobuf.eventbus.CustomLauncher</launcher.class>
<vertx.version>4.4.0</vertx.version>
<protobuf.version>3.22.2</protobuf.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-stack-depchain</artifactId>
<version>${vertx.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-hazelcast</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>${launcher.class}</Main-Class>
<Main-Verticle>${main.verticle}</Main-Verticle>
</manifestEntries>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<outputFile>${project.build.directory}/${project.artifactId}-${project.version}-fat.jar
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Implementation of the application
Definition of the messages
In src/main/proto/greetings.proto
, we define:
-
a
GreetingRequest
which holds aname
, and -
a
GreetingReply
which holds amessage
syntax = "proto3";
package greeting;
option java_multiple_files = true;
option java_package = "io.vertx.howtos.protobuf.eventbus";
option java_outer_classname = "GreetingProtos";
message GreetingRequest {
string name = 1;
}
message GreetingReply {
string message = 1;
}
Receiver verticle
The receiver verticle registers a consumer on the Event Bus. When a request is received:
-
the request is printed to the console along with its system hash code
-
a reply is generated
-
the reply is printed to the console along with its system hash code
-
the reply is sent
package io.vertx.howtos.protobuf.eventbus;
import io.vertx.core.AbstractVerticle;
public class ReceiverVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
vertx.eventBus().<GreetingRequest>consumer("greetings", msg -> {
var request = msg.body();
System.out.printf("Received request = %s (%d)%n", request.getName(), System.identityHashCode(request));
var greeting = String.format("Hello %s", request.getName());
var reply = GreetingReply.newBuilder().setMessage(greeting).build();
System.out.printf("Sending reply = %s (%d)%n", reply.getMessage(), System.identityHashCode(reply));
msg.reply(reply);
});
}
}
The system hash code is printed so that when we run the application in a single virtual machine, we can see whether objects are duplicated or not by the Event Bus. |
Sender verticle
The sender verticle schedules a periodic task. Every five seconds:
-
a request is generated
-
the request is printed to the console along with its system hash code
-
the request is sent
-
the reply is printed to the console along with its system hash code
package io.vertx.howtos.protobuf.eventbus;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
public class SenderVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
vertx.setPeriodic(5000, l -> {
var request = GreetingRequest.newBuilder().setName("Jane Doe").build();
System.out.printf("Sending request = %s (%d)%n", request.getName(), System.identityHashCode(request));
vertx.eventBus().<GreetingReply>request("greetings", request)
.map(Message::body)
.onFailure(Throwable::printStackTrace)
.onSuccess(reply -> System.out.printf("Received reply = %s (%d)%n", reply.getMessage(), System.identityHashCode(reply)));
});
}
}
The EventBus codec
When designing the codec for Protocol Buffer message classes, we can take advantage of their properties:
-
all messages are
Serializable
in the sense of the Java platform -
message objects are immutable
Therefore, message classes can be serialized/deserialized transparently when sent/received to/from the network. Moreover, we do not need to duplicate message objects when they are exchanged locally.
package io.vertx.howtos.protobuf.eventbus;
import com.google.protobuf.GeneratedMessageV3;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.impl.SerializableUtils;
public class ProtobufCodec implements MessageCodec<GeneratedMessageV3, GeneratedMessageV3> {
static final String PROTOS_PACKAGE_NAME = "io.vertx.howtos.protobuf.eventbus.";
@Override
public void encodeToWire(Buffer buffer, GeneratedMessageV3 o) {
var bytes = SerializableUtils.toBytes(o);
buffer.appendInt(bytes.length);
buffer.appendBytes(bytes);
}
@Override
public GeneratedMessageV3 decodeFromWire(int pos, Buffer buffer) {
var length = buffer.getInt(pos);
pos += 4;
var bytes = buffer.getBytes(pos, pos + length);
return (GeneratedMessageV3) SerializableUtils.fromBytes(bytes, CheckedClassNameObjectInputStream::new);
}
@Override
public GeneratedMessageV3 transform(GeneratedMessageV3 o) {
return o;
}
@Override
public String name() {
return "ProtobufCodec";
}
@Override
public byte systemCodecID() {
return -1; // -1 for a user codec
}
public boolean appliesTo(String className) {
return className.startsWith(PROTOS_PACKAGE_NAME);
}
}
For safety reasons, we do not want to deserialize just any object on the receiver side. This is why we use a CheckedClassNameObjectInputStream
instead of a plain ObjectInputStream
.
The implementation guarantees that only some classes are allowed:
-
our message classes, of course
-
Protocol Buffer’s Java implementation classes
-
classes allowed by default by the Vert.x Event Bus (e.g. byte arrays)
package io.vertx.howtos.protobuf.eventbus;
import io.vertx.core.eventbus.EventBus;
import java.io.*;
class CheckedClassNameObjectInputStream extends ObjectInputStream {
CheckedClassNameObjectInputStream(InputStream in) throws IOException {
super(in);
}
@Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
var name = desc.getName();
if (name.startsWith("com.google.protobuf.")
|| name.startsWith(ProtobufCodec.PROTOS_PACKAGE_NAME)
|| EventBus.DEFAULT_SERIALIZABLE_CHECKER.apply(name)) {
return super.resolveClass(desc);
}
throw new InvalidClassException("Class not allowed: " + name);
}
}
Finally, in a custom Launcher
class, we must:
-
register this codec
-
configure the Event Bus so that it uses this codec when the type of the message’s body belongs to our package
package io.vertx.howtos.protobuf.eventbus;
import io.vertx.core.Launcher;
import io.vertx.core.Vertx;
public class CustomLauncher extends Launcher {
public static void main(String[] args) {
new CustomLauncher().dispatch(args);
}
@Override
public void afterStartingVertx(Vertx vertx) {
var protobufCodec = new ProtobufCodec();
vertx.eventBus().registerCodec(protobufCodec);
vertx.eventBus().codecSelector(body -> {
return protobufCodec.appliesTo(body.getClass().getName()) ? protobufCodec.name() : null;
});
}
}
Running the application
First you must build the application:
./mvnw clean package
Then start the receiver:
java -Djava.net.preferIPv4Stack=true -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar run io.vertx.howtos.protobuf.eventbus.ReceiverVerticle -cluster
When it is ready, you will see: INFO: Succeeded in deploying verticle
.
Now start the sender in another terminal:
java -Djava.net.preferIPv4Stack=true -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar run io.vertx.howtos.protobuf.eventbus.SenderVerticle -cluster
When it is ready, you will see: INFO: Succeeded in deploying verticle
.
After some time, you will see in the sender console:
Sending request = Jane Doe (1445840961) Received reply = Hello Jane Doe (654163465)
And in the receiver console:
Received request = Jane Doe (449456520) Sending reply = Hello Jane Doe (522259462)
In clustered mode, the system hash code that is printed is not important: objects living in distinct virtual machines are, obviously, different.
What about local mode? To run the sender and the receiver in the same virtual machine, we can use a third verticle whose only purpose is to deploy them.
package io.vertx.howtos.protobuf.eventbus;
import io.vertx.core.AbstractVerticle;
public class MainVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
vertx.deployVerticle(new ReceiverVerticle());
vertx.deployVerticle(new SenderVerticle());
}
}
Open a terminal, build the project again and run the executable JAR.
./mvnw clean package
java -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar
When it is ready, you will see: INFO: Succeeded in deploying verticle
.
After some time, you will see in the console:
Sending request = Jane Doe (346056258) Received request = Jane Doe (346056258) Sending reply = Hello Jane Doe (1483137857) Received reply = Hello Jane Doe (1483137857)
Pay attention to the system hash code. Notice that the request object is the same in both the sender and receiver. This is also true about the reply object.
Summary
This document covered:
-
creating a codec for messages of types generated by Protocol Buffers
-
registering this codec and configuring the Event Bus to use it by default
-
sending and receiving message objects locally and across the network