Event bus bridges

Vert.x Camel Bridge

Apache Camel (http://camel.apache.org) is an open source Java framework that focuses on making integration easier and more accessible to developers. This bridge lets Vert.x applications interact with Camel endpoints:

  • the application can send messages to Camel.

  • the application can receive message from Camel.

The bridge relies on the Vert.x event bus and associate an event bus address to a Camel endpoint.

This component is not polyglot as it requires some classes from Camel that can only be used in Java.

Using vertx-camel-bridge

To use the Vert.x Camel Bridge, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-camel-bridge</artifactId>
 <version>4.5.11</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-camel-bridge:4.5.11'

Bridge configuration

Before being used, the bridge needs to be configured and started:

CamelContext camel = new DefaultCamelContext();
CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
).start();

The bridge requires a CamelContext. It will find the endpoint from the context. The bridge needs to be started before being used. Be aware the the start method is asynchronous. You can use start to be notified when the bridge has been started.

Inbound mapping

Inbound mapping associates a Camel endpoint to an event bus address. Messages received on this endpoint are transformed to event bus messages.

Endpoint endpoint = camel.getEndpoint("direct:foo");

CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address"))
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .withoutHeadersCopy())
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .usePublish())
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .withBodyType(String.class))
);

The snippet above shows different ways to configure an inbound mapping:

  • you can configure the Camel endpoint either using the Endpoint object or its uri

  • you can disables the header copy (Camel message headers are copied to the event bus message)

  • you can uses publish instead of send to broadcast the message to all event bus consumers

  • you can configures the type of the event bus message body. If not set it uses the Camel message payload. If sets, it looks in the Camel context for a converter between the Camel message payload and the desired type.

Note: org.fusesource.hawtbuf.Buffer are automatically converted to Buffer.

If send is used (so not publish), and when the Camel exchange expect a reply (In Out exchange), the Vert.x code expect as reply to the sent message. When the reply arrives it is propagated to the exchange:

Endpoint endpoint = camel.getEndpoint("direct:stuff");

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addInboundMapping(new InboundMapping().setAddress("test-reply").setEndpoint(endpoint)));

vertx.eventBus().consumer("with-reply", message -> {
  message.reply("How are you ?");
});

camel.start();
bridge.start();

ProducerTemplate template = camel.createProducerTemplate();
Future<Object> future = template.asyncRequestBody(endpoint, "hello");
String response = template.extractFutureBody(future, String.class);

You can also configure the reply timeout using setTimeout.

Outbound mapping

Outbound mapping associates an event bus address to a Camel endpoint. Messages received on this event bus address are transformed to Camel messages and sent to the endpoint.

Endpoint endpoint = camel.getEndpoint("stream:out");

CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint)
            .withoutHeadersCopy())
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
);

The snippet above shows different ways to configure an outbound mapping.

You can connect your outbound mapping to a Camel route:

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:start")
        .transform(constant("OK"));
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("direct:start")));

camel.start();
bridge.start();


vertx.eventBus().request("test", "hello").onComplete(reply -> {
  // Reply from the route (here it's "OK")
});

If when you send the message on the event bus you register a reply handler, it configures the Camel exchange to expect a response (it uses the request-reply pattern of the EIP). The response is passed in the reply body. If the route fails, you get a reply failure (recipient failure), with the message as cause:

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:my-route")
        .to("http://localhost:8080");
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route")));

camel.start();
bridge.start();

vertx.eventBus().request("camel-route", "hello").onComplete(reply -> {
  if (reply.succeeded()) {
    Object theResponse = reply.result().body();
  } else {
    Throwable theCause = reply.cause();
  }
});

If the processing you apply is blocking, you*must** set blocking to true. This avoid executing the processing on the event loop thread:

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:my-route")
      .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
          // Do something blocking...
        }
      })
      .to("http://localhost:8080");
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
  .addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true)));

camel.start();
bridge.start();

vertx.eventBus().request("camel-route", "hello").onComplete(reply -> {
  if (reply.succeeded()) {
    Object theResponse = reply.result().body();
  } else {
    Throwable theCause = reply.cause();
  }
});

By default it uses the default worker thread pool, this is customizable using the setWorkerExecutor method.

Stopping the bridge

Don’t forget to stop the bridge using the stop method. The stop method is asynchronous. You can use stop to be notified when the bridge has been stopped.

Exchanging custom object

If you want to send and receive custom objects, you need to register a codec on the event bus:

vertx.eventBus().registerDefaultCodec(Person.class, codec);