Event Bus

Exchanging generated Protobuf classes on the Event Bus

This document will show you how to exchange messages of types generated by Protocol Buffers on the Event Bus.

What you will build

You will build an application that periodically generates a greeting message. The application consists in:

  • a sender verticle which sends a GreetingRequest

  • a receiver verticle which replies to requests with a GreetingResponse

What you need

  • A text editor or an IDE,

  • Java 11 or higher

Create a project

Browse to https://start.vertx.io. Click on advanced options to expand the hidden panel and then change the value of the following fields:

  • Group Id: set to io.vertx.howtos

  • Artifact Id: set to protobuf-eventbus-howto

  • Dependencies: add Hazelcast Cluster Manager

  • Package: set to io.vertx.howtos.protobuf.eventbus

When you’re done, click on Generate Project and extract the generated archive content somewhere on your filesystem.

You may alternatively do this from the command line:

curl -G https://start.vertx.io/starter.zip -d "groupId=io.vertx.howtos" -d "artifactId=protobuf-eventbus-howto" -d "packageName=io.vertx.howtos.protobuf.eventbus" -d "vertxDependencies=vertx-hazelcast" -d "jdkVersion=11" -d "buildTool=maven" --output protobuf-eventbus-howto.zip
unzip -d protobuf-eventbus-howto protobuf-eventbus-howto.zip

Before coding, we need to make some adjustments to the build file:

  • configure a custom Vert.x Launcher class (that will be used as entry point when running the executable JAR)

  • add a dependency to Protocol Buffers

  • configure Maven plugins to generate message classes from a .proto file.

Here is the content of the pom.xml file you should be using:

Implementation of the application

Definition of the messages

In src/main/proto/greetings.proto, we define:

  • a GreetingRequest which holds a name, and

  • a GreetingReply which holds a message

greetings.proto
syntax = "proto3";

package greeting;

option java_multiple_files = true;
option java_package = "io.vertx.howtos.protobuf.eventbus";
option java_outer_classname = "GreetingProtos";

message GreetingRequest {
  string name = 1;
}

message GreetingReply {
  string message = 1;
}

Receiver verticle

The receiver verticle registers a consumer on the Event Bus. When a request is received:

  1. the request is printed to the console along with its system hash code

  2. a reply is generated

  3. the reply is printed to the console along with its system hash code

  4. the reply is sent

ReceiverVerticle.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.Future;
import io.vertx.core.VerticleBase;

public class ReceiverVerticle extends VerticleBase {

  @Override
  public Future<?> start() {
    return vertx.eventBus().<GreetingRequest>consumer("greetings", msg -> {
      var request = msg.body();
      System.out.printf("Received request = %s (%d)%n", request.getName(), System.identityHashCode(request));
      var greeting = String.format("Hello %s", request.getName());
      var reply = GreetingReply.newBuilder().setMessage(greeting).build();
      System.out.printf("Sending reply = %s (%d)%n", reply.getMessage(), System.identityHashCode(reply));
      msg.reply(reply);
    }).completion();
  }
}
The system hash code is printed so that when we run the application in a single virtual machine, we can see whether objects are duplicated or not by the Event Bus.

Sender verticle

The sender verticle schedules a periodic task. Every five seconds:

  1. a request is generated

  2. the request is printed to the console along with its system hash code

  3. the request is sent

  4. the reply is printed to the console along with its system hash code

SenderVerticle.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;

public class SenderVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
    vertx.setPeriodic(5000, l -> {
      var request = GreetingRequest.newBuilder().setName("Jane Doe").build();
      System.out.printf("Sending request = %s (%d)%n", request.getName(), System.identityHashCode(request));
      vertx.eventBus().<GreetingReply>request("greetings", request)
        .map(Message::body)
        .onFailure(Throwable::printStackTrace)
        .onSuccess(reply -> System.out.printf("Received reply = %s (%d)%n", reply.getMessage(), System.identityHashCode(reply)));
    });
  }
}

The EventBus codec

When designing the codec for Protocol Buffer message classes, we can take advantage of their properties:

  • all messages are Serializable in the sense of the Java platform

  • message objects are immutable

Therefore, message classes can be serialized/deserialized transparently when sent/received to/from the network. Moreover, we do not need to duplicate message objects when they are exchanged locally.

ProtobufCodec.java
package io.vertx.howtos.protobuf.eventbus;

import com.google.protobuf.GeneratedMessageV3;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.impl.SerializableUtils;

public class ProtobufCodec implements MessageCodec<GeneratedMessageV3, GeneratedMessageV3> {

  static final String PROTOS_PACKAGE_NAME = "io.vertx.howtos.protobuf.eventbus.";

  @Override
  public void encodeToWire(Buffer buffer, GeneratedMessageV3 o) {
    var bytes = SerializableUtils.toBytes(o);
    buffer.appendInt(bytes.length);
    buffer.appendBytes(bytes);
  }

  @Override
  public GeneratedMessageV3 decodeFromWire(int pos, Buffer buffer) {
    var length = buffer.getInt(pos);
    pos += 4;
    var bytes = buffer.getBytes(pos, pos + length);
    return (GeneratedMessageV3) SerializableUtils.fromBytes(bytes, CheckedClassNameObjectInputStream::new);
  }

  @Override
  public GeneratedMessageV3 transform(GeneratedMessageV3 o) {
    return o;
  }

  @Override
  public String name() {
    return "ProtobufCodec";
  }

  @Override
  public byte systemCodecID() {
    return -1; // -1 for a user codec
  }

  public boolean appliesTo(String className) {
    return className.startsWith(PROTOS_PACKAGE_NAME);
  }
}

For safety reasons, we do not want to deserialize just any object on the receiver side. This is why we use a CheckedClassNameObjectInputStream instead of a plain ObjectInputStream.

The implementation guarantees that only some classes are allowed:

  • our message classes, of course

  • Protocol Buffer’s Java implementation classes

  • classes allowed by default by the Vert.x Event Bus (e.g. byte arrays)

CheckedClassNameObjectInputStream.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.eventbus.EventBus;

import java.io.*;

class CheckedClassNameObjectInputStream extends ObjectInputStream {

  CheckedClassNameObjectInputStream(InputStream in) throws IOException {
    super(in);
  }

  @Override
  protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
    var name = desc.getName();
    if (name.startsWith("com.google.protobuf.")
      || name.startsWith(ProtobufCodec.PROTOS_PACKAGE_NAME)
      || EventBus.DEFAULT_SERIALIZABLE_CHECKER.apply(name)) {
      return super.resolveClass(desc);
    }
    throw new InvalidClassException("Class not allowed: " + name);
  }
}

Finally, in a custom Launcher class, we must:

  • register this codec

  • configure the Event Bus so that it uses this codec when the type of the message’s body belongs to our package

CustomLauncher.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.launcher.application.HookContext;
import io.vertx.launcher.application.VertxApplication;
import io.vertx.launcher.application.VertxApplicationHooks;

public class CustomLauncher extends VertxApplication implements VertxApplicationHooks {

  public CustomLauncher(String[] args) {
    super(args);
  }

  public static void main(String[] args) {
    new CustomLauncher(args).launch();
  }

  @Override
  public void afterVertxStarted(HookContext context) {
    var vertx = context.vertx();
    var protobufCodec = new ProtobufCodec();
    vertx.eventBus().registerCodec(protobufCodec);
    vertx.eventBus().codecSelector(body -> {
      return protobufCodec.appliesTo(body.getClass().getName()) ? protobufCodec.name() : null;
    });
  }
}

Running the application

First you must build the application:

./mvnw clean package

Then start the receiver:

java -Djava.net.preferIPv4Stack=true -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar io.vertx.howtos.protobuf.eventbus.ReceiverVerticle -cluster

When it is ready, you will see: INFO: Succeeded in deploying verticle.

Now start the sender in another terminal:

java -Djava.net.preferIPv4Stack=true -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar io.vertx.howtos.protobuf.eventbus.SenderVerticle -cluster

When it is ready, you will see: INFO: Succeeded in deploying verticle.

After some time, you will see in the sender console:

Sending request = Jane Doe (1445840961)
Received reply = Hello Jane Doe (654163465)

And in the receiver console:

Received request = Jane Doe (449456520)
Sending reply = Hello Jane Doe (522259462)

In clustered mode, the system hash code that is printed is not important: objects living in distinct virtual machines are, obviously, different.

What about local mode? To run the sender and the receiver in the same virtual machine, we can use a third verticle whose only purpose is to deploy them.

MainVerticle.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.Future;
import io.vertx.core.VerticleBase;

public class MainVerticle extends VerticleBase {

  @Override
  public Future<?> start() {
    return Future.join(
      vertx.deployVerticle(new ReceiverVerticle()),
      vertx.deployVerticle(new SenderVerticle())
    );
  }
}

Open a terminal, build the project again and run the executable JAR.

./mvnw clean package
java -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar

When it is ready, you will see: INFO: Succeeded in deploying verticle.

After some time, you will see in the console:

Sending request = Jane Doe (346056258)
Received request = Jane Doe (346056258)
Sending reply = Hello Jane Doe (1483137857)
Received reply = Hello Jane Doe (1483137857)

Pay attention to the system hash code. Notice that the request object is the same in both the sender and receiver. This is also true about the reply object.

Summary

This document covered:

  1. creating a codec for messages of types generated by Protocol Buffers

  2. registering this codec and configuring the Event Bus to use it by default

  3. sending and receiving message objects locally and across the network