Skip to main content

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-rabbitmq-client</artifactId>
  <version>3.5.1</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
  compile 'io.vertx:vertx-rabbitmq-client:3.5.1'
}

Create a client

You can create a client instance as follows using a full amqp uri:

var RabbitMQClient = require("vertx-rabbitmq-js/rabbit_mq_client");
var config = {
};
// full amqp uri
config.uri = "amqp://xvjvsrrc:[email protected]/xvjvsrrc";
var client = RabbitMQClient.create(vertx, config);

Or you can also specify individual parameters manually:

var RabbitMQClient = require("vertx-rabbitmq-js/rabbit_mq_client");
var config = {
};
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.user = "user1";
config.password = "password1";
config.host = "localhost";
config.port = 5672;
config.virtualHost = "vhost1";
config.connectionTimeout = 6000;
config.requestedHeartbeat = 60;
config.handshakeTimeout = 6000;
config.requestedChannelMax = 5;
config.networkRecoveryInterval = 500;
config.automaticRecoveryEnabled = true;

var client = RabbitMQClient.create(vertx, config);

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

var config = {
};

config.x-dead-letter-exchange = "my.deadletter.exchange";
config.alternate-exchange = "my.alternate.exchange";
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, function (onResult, onResult_err) {
  if (onResult_err == null) {
    console.log("Exchange successfully declared with config");
  } else {
    onResult_err.printStackTrace();
  }
});

Declare queue with additional config

You can pass additional config parameters to RabbitMQs queueDeclare method

var config = {
};
config.x-message-ttl = 10000;

client.queueDeclare("my-queue", true, false, true, config, function (queueResult, queueResult_err) {
  if (queueResult_err == null) {
    console.log("Queue declared!");
  } else {
    console.error("Queue failed to be declared!");
    queueResult_err.printStackTrace();
  }
});

Operations

The following are some examples of the operations supported by the RabbitMQService API.

Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

var message = {
  "body" : "Hello RabbitMQ, from Vert.x !"
};
client.basicPublish("", "my.queue", message, function (pubResult, pubResult_err) {
  if (pubResult_err == null) {
    console.log("Message published !");
  } else {
    pubResult_err.printStackTrace();
  }
});

Publish with confirm

Publish a message to a queue and confirm the broker acknowledged it.

var message = {
  "body" : "Hello RabbitMQ, from Vert.x !"
};

// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect(function (confirmResult, confirmResult_err) {
  if (confirmResult_err == null) {
    client.basicPublish("", "my.queue", message, function (pubResult, pubResult_err) {
      if (pubResult_err == null) {
        // Check the message got confirmed by the broker.
        client.waitForConfirms(function (waitResult, waitResult_err) {
          if (waitResult_err == null) {
            console.log("Message published !")} else {
            waitResult_err.printStackTrace()}
        });
      } else {
        pubResult_err.printStackTrace();
      }
    });
  } else {
    confirmResult_err.printStackTrace();
  }
});

Consume

Consume messages from a queue

// Create the event bus handler which messages will be sent to
// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", function (msg) {
  var json = msg.body();
  console.log("Got message: " + json.body);
});

// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("my.queue", "my.address", function (consumeResult, consumeResult_err) {
  if (consumeResult_err == null) {
    console.log("RabbitMQ consumer created !");
  } else {
    consumeResult_err.printStackTrace();
  }
});

Get

Will get a message from a queue

client.basicGet("my.queue", true, function (getResult, getResult_err) {
  if (getResult_err == null) {
    var msg = getResult;
    console.log("Got message: " + msg.body);
  } else {
    getResult_err.printStackTrace();
  }
});

Consume messages without auto-ack

// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", function (msg) {
  var json = msg.body();
  console.log("Got message: " + json.body);
  // ack
  client.basicAck(json.deliveryTag, false, function (asyncResult, asyncResult_err) {
  });
});

// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("my.queue", "my.address", false, function (consumeResult, consumeResult_err) {
  if (consumeResult_err == null) {
    console.log("RabbitMQ consumer created !");
  } else {
    consumeResult_err.printStackTrace();
  }
});

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work. <a href="mailto:[email protected]">Nick Scavelli</a>