Skip to main content

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-rabbitmq-client</artifactId>
  <version>3.4.2</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
  compile 'io.vertx:vertx-rabbitmq-client:3.4.2'
}

Create a client

You can create a client instance as follows using a full amqp uri:

var config = json {
  obj()
}
// full amqp uri
config.put("uri", "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc")
var client = RabbitMQClient.create(vertx, config)

Or you can also specify individual parameters manually:

var config = json {
  obj()
}
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.put("user", "user1")
config.put("password", "password1")
config.put("host", "localhost")
config.put("port", 5672)
config.put("virtualHost", "vhost1")
config.put("connectionTimeout", 60)
config.put("requestedHeartbeat", 60)
config.put("handshakeTimeout", 60)
config.put("requestedChannelMax", 5)
config.put("networkRecoveryInterval", 5)
config.put("automaticRecoveryEnabled", true)

var client = RabbitMQClient.create(vertx, config)

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

var config = mutableMapOf<String, Any?>()

config["x-dead-letter-exchange"] = "my.deadletter.exchange"
config["alternate-exchange"] = "my.alternate.exchange"
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, { onResult ->
  if (onResult.succeeded()) {
    println("Exchange successfully declared with config")
  } else {
    onResult.cause().printStackTrace()
  }
})

Operations

The following are some examples of the operations supported by the RabbitMQService API.

Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

var message = json {
  obj("body" to "Hello RabbitMQ, from Vert.x !")
}
client.basicPublish("", "my.queue", message, { pubResult ->
  if (pubResult.succeeded()) {
    println("Message published !")
  } else {
    pubResult.cause().printStackTrace()
  }
})

Publish with confirm

Publish a message to a queue and confirm the broker acknowledged it.

var message = json {
  obj("body" to "Hello RabbitMQ, from Vert.x !")
}

// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect({ confirmResult ->
  if (confirmResult.succeeded()) {
    client.basicPublish("", "my.queue", message, { pubResult ->
      if (pubResult.succeeded()) {
        // Check the message got confirmed by the broker.
        client.waitForConfirms({ waitResult ->
          if (waitResult.succeeded()) {
            println("Message published !")} else {
            waitResult.cause().printStackTrace()}
        })
      } else {
        pubResult.cause().printStackTrace()
      }
    })
  } else {
    confirmResult.cause().printStackTrace()
  }
})

Consume

Consume messages from a queue

// Create the event bus handler which messages will be sent to
// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer<Any>("my.address", { msg ->
  var json = msg.body()
  println("Got message: ${json.getString("body")}")
})

// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("my.queue", "my.address", { consumeResult ->
  if (consumeResult.succeeded()) {
    println("RabbitMQ consumer created !")
  } else {
    consumeResult.cause().printStackTrace()
  }
})

Get

Will get a message from a queue

client.basicGet("my.queue", true, { getResult ->
  if (getResult.succeeded()) {
    var msg = getResult.result()
    println("Got message: ${msg.getString("body")}")
  } else {
    getResult.cause().printStackTrace()
  }
})

Consume messages without auto-ack

// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer<Any>("my.address", { msg ->
  var json = msg.body()
  println("Got message: ${json.getString("body")}")
  // ack
  client.basicAck(json.getLong("deliveryTag"), false, { asyncResult ->
  })
})

// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("my.queue", "my.address", false, { consumeResult ->
  if (consumeResult.succeeded()) {
    println("RabbitMQ consumer created !")
  } else {
    consumeResult.cause().printStackTrace()
  }
})

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work. <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>