Vert.x Camel Bridge
Apache Camel (http://camel.apache.org) is an open source Java framework that focuses on making integration easier and more accessible to developers. This bridge lets Vert.x applications interact with Camel endpoints:
-
the application can send messages to Camel.
-
the application can receive message from Camel.
The bridge relies on the Vert.x event bus and associate an event bus address to a Camel endpoint.
Caution
|
This component is not polyglot as it requires some classes from Camel that can only be used in Java. |
Using vertx-camel-bridge
To use the Vert.x Camel Bridge, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-camel-bridge</artifactId>
<version>4.5.10</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile 'io.vertx:vertx-camel-bridge:4.5.10'
Bridge configuration
Before being used, the bridge needs to be configured and started:
CamelContext camel = new DefaultCamelContext();
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
).start();
The bridge requires a CamelContext
. It will find the endpoint from the context. The bridge needs to be started
before being used. Be aware the the start
method is asynchronous. You can use
start
to be notified when the bridge has been started.
Inbound mapping
Inbound mapping associates a Camel endpoint to an event bus address. Messages received on this endpoint are transformed to event bus messages.
Endpoint endpoint = camel.getEndpoint("direct:foo");
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address"))
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.withoutHeadersCopy())
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.usePublish())
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.withBodyType(String.class))
);
The snippet above shows different ways to configure an inbound mapping:
-
you can configure the Camel endpoint either using the
Endpoint
object or its uri -
you can disables the header copy (Camel message headers are copied to the event bus message)
-
you can uses
publish
instead ofsend
to broadcast the message to all event bus consumers -
you can configures the type of the event bus message body. If not set it uses the Camel message payload. If sets, it looks in the Camel context for a converter between the Camel message payload and the desired type.
Note: org.fusesource.hawtbuf.Buffer
are automatically converted to Buffer
.
If send
is used (so not publish
), and when the Camel exchange expect a reply (In Out exchange), the Vert.x
code expect as reply to the sent message. When the reply arrives it is propagated to the exchange:
Endpoint endpoint = camel.getEndpoint("direct:stuff");
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addInboundMapping(new InboundMapping().setAddress("test-reply").setEndpoint(endpoint)));
vertx.eventBus().consumer("with-reply", message -> {
message.reply("How are you ?");
});
camel.start();
bridge.start();
ProducerTemplate template = camel.createProducerTemplate();
Future<Object> future = template.asyncRequestBody(endpoint, "hello");
String response = template.extractFutureBody(future, String.class);
You can also configure the reply timeout
using setTimeout
.
Outbound mapping
Outbound mapping associates an event bus address to a Camel endpoint. Messages received on this event bus address are transformed to Camel messages and sent to the endpoint.
Endpoint endpoint = camel.getEndpoint("stream:out");
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint)
.withoutHeadersCopy())
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
);
The snippet above shows different ways to configure an outbound mapping.
You can connect your outbound mapping to a Camel route:
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.transform(constant("OK"));
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("direct:start")));
camel.start();
bridge.start();
vertx.eventBus().request("test", "hello").onComplete(reply -> {
// Reply from the route (here it's "OK")
});
If when you send the message on the event bus you register a reply handler, it configures the Camel exchange to expect a response (it uses the request-reply pattern of the EIP). The response is passed in the reply body. If the route fails, you get a reply failure (recipient failure), with the message as cause:
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:my-route")
.to("http://localhost:8080");
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route")));
camel.start();
bridge.start();
vertx.eventBus().request("camel-route", "hello").onComplete(reply -> {
if (reply.succeeded()) {
Object theResponse = reply.result().body();
} else {
Throwable theCause = reply.cause();
}
});
If the processing you apply is blocking, you*must** set blocking to true
. This avoid executing the
processing on the event loop thread:
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:my-route")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// Do something blocking...
}
})
.to("http://localhost:8080");
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true)));
camel.start();
bridge.start();
vertx.eventBus().request("camel-route", "hello").onComplete(reply -> {
if (reply.succeeded()) {
Object theResponse = reply.result().body();
} else {
Throwable theCause = reply.cause();
}
});
By default it uses the default worker thread pool, this is customizable using the
setWorkerExecutor
method.
Stopping the bridge
Don’t forget to stop the bridge using the stop
method. The stop
method is asynchronous. You can use
stop
to be notified when the bridge has been stopped.