<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.9.16</version>
</dependency>
RabbitMQ Client for Vert.x
A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)
This service is experimental and the APIs are likely to change before settling down.
Getting Started
Maven
Add the following dependency to your maven project
Gradle
Add the following dependency to your gradle project
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.9.16'
}
Create a client
You can create a client instance as follows using a full amqp uri:
RabbitMQOptions config = new RabbitMQOptions();
// full amqp uri
config.setUri("amqp://xvjvsrrc:[email protected]/xvjvsrrc");
RabbitMQClient client = RabbitMQClient.create(vertx, config);
Or you can also specify individual parameters manually:
RabbitMQOptions config = new RabbitMQOptions();
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1");
config.setPassword("password1");
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("vhost1");
config.setConnectionTimeout(6000); // in milliseconds
config.setRequestedHeartbeat(60); // in seconds
config.setHandshakeTimeout(6000); // in milliseconds
config.setRequestedChannelMax(5);
config.setNetworkRecoveryInterval(500); // in milliseconds
config.setAutomaticRecoveryEnabled(true);
RabbitMQClient client = RabbitMQClient.create(vertx, config);
You can set multiples addresses to connect to a cluster;
RabbitMQOptions config = new RabbitMQOptions();
config.setUser("user1");
config.setPassword("password1");
config.setVirtualHost("vhost1");
config.setAddresses(Arrays.asList(Address.parseAddresses("firstHost,secondHost:5672")));
RabbitMQClient client = RabbitMQClient.create(vertx, config);
Declare exchange with additional config
You can pass additional config parameters to RabbitMQ’s exchangeDeclare method
JsonObject config = new JsonObject();
config.put("x-dead-letter-exchange", "my.deadletter.exchange");
config.put("alternate-exchange", "my.alternate.exchange");
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, onResult -> {
if (onResult.succeeded()) {
System.out.println("Exchange successfully declared with config");
} else {
onResult.cause().printStackTrace();
}
});
Declare queue with additional config
You can pass additional config parameters to RabbitMQs queueDeclare method
JsonObject config = new JsonObject();
config.put("x-message-ttl", 10_000L);
client.queueDeclare("my-queue", true, false, true, config, queueResult -> {
if (queueResult.succeeded()) {
System.out.println("Queue declared!");
} else {
System.err.println("Queue failed to be declared!");
queueResult.cause().printStackTrace();
}
});
Operations
The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.
Publish
Publish a message to a queue
JsonObject message = new JsonObject().put("body", "Hello RabbitMQ, from Vert.x !");
client.basicPublish("", "my.queue", message, pubResult -> {
if (pubResult.succeeded()) {
System.out.println("Message published !");
} else {
pubResult.cause().printStackTrace();
}
});
Publish with confirm
Publish a message to a queue and confirm the broker acknowledged it.
JsonObject message = new JsonObject().put("body", "Hello RabbitMQ, from Vert.x !");
// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect(confirmResult -> {
if(confirmResult.succeeded()) {
client.basicPublish("", "my.queue", message, pubResult -> {
if (pubResult.succeeded()) {
// Check the message got confirmed by the broker.
client.waitForConfirms(waitResult -> {
if(waitResult.succeeded())
System.out.println("Message published !");
else
waitResult.cause().printStackTrace();
});
} else {
pubResult.cause().printStackTrace();
}
});
} else {
confirmResult.cause().printStackTrace();
}
});
Consume
Consume messages from a queue.
// Create a stream of messages from a queue
client.basicConsumer("my.queue", rabbitMQConsumerAsyncResult -> {
if (rabbitMQConsumerAsyncResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
RabbitMQConsumer mqConsumer = rabbitMQConsumerAsyncResult.result();
mqConsumer.handler(message -> {
System.out.println("Got message: " + message.body().toString());
});
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace();
}
});
At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.
consumer.pause();
consumer.resume();
There are actually a set of options to specify when creating a consumption stream.
The QueueOptions
lets you specify:
-
The size of internal queue with
setMaxInternalQueueSize
-
Should the stream keep more recent messages when queue size is exceed with
setKeepMostRecent
QueueOptions options = new QueueOptions()
.setMaxInternalQueueSize(1000)
.setKeepMostRecent(true);
client.basicConsumer("my.queue", options, rabbitMQConsumerAsyncResult -> {
if (rabbitMQConsumerAsyncResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace();
}
});
When you want to stop consuming message from a queue, you can do:
rabbitMQConsumer.cancel(cancelResult -> {
if (cancelResult.succeeded()) {
System.out.println("Consumption successfully stopped");
} else {
System.out.println("Tired in attempt to stop consumption");
cancelResult.cause().printStackTrace();
}
});
You can get notified by the end handler when the queue won’t process any more messages:
rabbitMQConsumer.endHandler(v -> {
System.out.println("It is the end of the stream");
});
You can set the exception handler to be notified of any error that may occur when a message is processed:
consumer.exceptionHandler(e -> {
System.out.println("An exception occurred in the process of message handling");
e.printStackTrace();
});
And finally, you may want to retrive a related to the consumer tag:
String consumerTag = consumer.consumerTag();
System.out.println("Consumer tag is: " + consumerTag);
Get
Will get a message from a queue
client.basicGet("my.queue", true, getResult -> {
if (getResult.succeeded()) {
JsonObject msg = getResult.result();
System.out.println("Got message: " + msg.getString("body"));
} else {
getResult.cause().printStackTrace();
}
});
Consume messages without auto-ack
client.basicConsumer("my.queue", new QueueOptions().setAutoAck(false), consumeResult -> {
if (consumeResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
RabbitMQConsumer consumer = consumeResult.result();
// Set the handler which messages will be sent to
consumer.handler(msg -> {
JsonObject json = (JsonObject) msg.body();
System.out.println("Got message: " + json.getString("body"));
// ack
client.basicAck(json.getLong("deliveryTag"), false, asyncResult -> {
});
});
} else {
consumeResult.cause().printStackTrace();
}
});
Running the tests
You will need to have RabbitMQ installed and running with default ports on localhost for this to work.