RabbitMQ Client for Vert.x

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-rabbitmq-client</artifactId>
 <version>3.9.16</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
 compile 'io.vertx:vertx-rabbitmq-client:3.9.16'
}

Create a client

You can create a client instance as follows using a full amqp uri:

RabbitMQOptions config = new RabbitMQOptions();
// full amqp uri
config.setUri("amqp://xvjvsrrc:[email protected]/xvjvsrrc");
RabbitMQClient client = RabbitMQClient.create(vertx, config);

Or you can also specify individual parameters manually:

RabbitMQOptions config = new RabbitMQOptions();
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1");
config.setPassword("password1");
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("vhost1");
config.setConnectionTimeout(6000); // in milliseconds
config.setRequestedHeartbeat(60); // in seconds
config.setHandshakeTimeout(6000); // in milliseconds
config.setRequestedChannelMax(5);
config.setNetworkRecoveryInterval(500); // in milliseconds
config.setAutomaticRecoveryEnabled(true);

RabbitMQClient client = RabbitMQClient.create(vertx, config);

You can set multiples addresses to connect to a cluster;

RabbitMQOptions config = new RabbitMQOptions();
config.setUser("user1");
config.setPassword("password1");
config.setVirtualHost("vhost1");

config.setAddresses(Arrays.asList(Address.parseAddresses("firstHost,secondHost:5672")));

RabbitMQClient client = RabbitMQClient.create(vertx, config);

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

JsonObject config = new JsonObject();

config.put("x-dead-letter-exchange", "my.deadletter.exchange");
config.put("alternate-exchange", "my.alternate.exchange");
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, onResult -> {
  if (onResult.succeeded()) {
    System.out.println("Exchange successfully declared with config");
  } else {
    onResult.cause().printStackTrace();
  }
});

Declare queue with additional config

You can pass additional config parameters to RabbitMQs queueDeclare method

JsonObject config = new JsonObject();
config.put("x-message-ttl", 10_000L);

client.queueDeclare("my-queue", true, false, true, config, queueResult -> {
  if (queueResult.succeeded()) {
    System.out.println("Queue declared!");
  } else {
    System.err.println("Queue failed to be declared!");
    queueResult.cause().printStackTrace();
  }
});

Operations

The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

JsonObject message = new JsonObject().put("body", "Hello RabbitMQ, from Vert.x !");
client.basicPublish("", "my.queue", message, pubResult -> {
  if (pubResult.succeeded()) {
    System.out.println("Message published !");
  } else {
    pubResult.cause().printStackTrace();
  }
});

Publish with confirm

Publish a message to a queue and confirm the broker acknowledged it.

JsonObject message = new JsonObject().put("body", "Hello RabbitMQ, from Vert.x !");

// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect(confirmResult -> {
  if(confirmResult.succeeded()) {
    client.basicPublish("", "my.queue", message, pubResult -> {
      if (pubResult.succeeded()) {
        // Check the message got confirmed by the broker.
        client.waitForConfirms(waitResult -> {
          if(waitResult.succeeded())
            System.out.println("Message published !");
          else
            waitResult.cause().printStackTrace();
        });
      } else {
        pubResult.cause().printStackTrace();
      }
    });
  } else {
    confirmResult.cause().printStackTrace();
  }
});

Consume

Consume messages from a queue.

// Create a stream of messages from a queue
client.basicConsumer("my.queue", rabbitMQConsumerAsyncResult -> {
  if (rabbitMQConsumerAsyncResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    RabbitMQConsumer mqConsumer = rabbitMQConsumerAsyncResult.result();
    mqConsumer.handler(message -> {
      System.out.println("Got message: " + message.body().toString());
    });
  } else {
    rabbitMQConsumerAsyncResult.cause().printStackTrace();
  }
});

At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.

consumer.pause();
consumer.resume();

There are actually a set of options to specify when creating a consumption stream.

The QueueOptions lets you specify:

  • The size of internal queue with setMaxInternalQueueSize

  • Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent

QueueOptions options = new QueueOptions()
  .setMaxInternalQueueSize(1000)
  .setKeepMostRecent(true);

client.basicConsumer("my.queue", options, rabbitMQConsumerAsyncResult -> {
  if (rabbitMQConsumerAsyncResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
  } else {
    rabbitMQConsumerAsyncResult.cause().printStackTrace();
  }
});

When you want to stop consuming message from a queue, you can do:

rabbitMQConsumer.cancel(cancelResult -> {
  if (cancelResult.succeeded()) {
    System.out.println("Consumption successfully stopped");
  } else {
    System.out.println("Tired in attempt to stop consumption");
    cancelResult.cause().printStackTrace();
  }
});

You can get notified by the end handler when the queue won’t process any more messages:

rabbitMQConsumer.endHandler(v -> {
  System.out.println("It is the end of the stream");
});

You can set the exception handler to be notified of any error that may occur when a message is processed:

consumer.exceptionHandler(e -> {
  System.out.println("An exception occurred in the process of message handling");
  e.printStackTrace();
});

And finally, you may want to retrive a related to the consumer tag:

String consumerTag = consumer.consumerTag();
System.out.println("Consumer tag is: " + consumerTag);

Get

Will get a message from a queue

client.basicGet("my.queue", true, getResult -> {
  if (getResult.succeeded()) {
    JsonObject msg = getResult.result();
    System.out.println("Got message: " + msg.getString("body"));
  } else {
    getResult.cause().printStackTrace();
  }
});

Consume messages without auto-ack

client.basicConsumer("my.queue", new QueueOptions().setAutoAck(false), consumeResult -> {
  if (consumeResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    RabbitMQConsumer consumer = consumeResult.result();

    // Set the handler which messages will be sent to
    consumer.handler(msg -> {
      JsonObject json = (JsonObject) msg.body();
      System.out.println("Got message: " + json.getString("body"));
      // ack
      client.basicAck(json.getLong("deliveryTag"), false, asyncResult -> {
      });
    });
  } else {
    consumeResult.cause().printStackTrace();
  }
});

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work.