Skip to main content

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-rabbitmq-client-scala_2.12</artifactId>
  <version>3.4.1</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
  compile 'io.vertx:vertx-rabbitmq-client-scala_2.12:3.4.1'
}

Create a client

You can create a client instance as follows using a full amqp uri:

var config = new io.vertx.core.json.JsonObject()
// full amqp uri
config.put("uri", "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc")
var client = RabbitMQClient.create(vertx, config)

Or you can also specify individual parameters manually:

var config = new io.vertx.core.json.JsonObject()
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.put("user", "user1")
config.put("password", "password1")
config.put("host", "localhost")
config.put("port", 5672)
config.put("virtualHost", "vhost1")
config.put("connectionTimeout", 60)
config.put("requestedHeartbeat", 60)
config.put("handshakeTimeout", 60)
config.put("requestedChannelMax", 5)
config.put("networkRecoveryInterval", 5)
config.put("automaticRecoveryEnabled", true)

var client = RabbitMQClient.create(vertx, config)

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

var config = Map()

config + ("x-dead-letter-exchange" -> "my.deadletter.exchange")
config + ("alternate-exchange" -> "my.alternate.exchange")
// ...
client.exchangeDeclareFuture("my.exchange", "fanout", true, false, config).onComplete{
  case Success(result) => {
    println("Exchange successfully declared with config")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Operations

The following are some examples of the operations supported by the RabbitMQService API.

Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

var message = new io.vertx.core.json.JsonObject().put("body", "Hello RabbitMQ, from Vert.x !")
client.basicPublishFuture("", "my.queue", message).onComplete{
  case Success(result) => {
    println("Message published !")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Consume

Consume messages from a queue

// Create the event bus handler which messages will be sent to
// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", (msg: io.vertx.scala.core.eventbus.Message<java.lang.Object>) => {
  var json = msg.body()
  println(s"Got message: ${json.getValue("body")}")
})

// Setup the link between rabbitmq consumer and event bus address
client.basicConsumeFuture("my.queue", "my.address").onComplete{
  case Success(result) => {
    println("RabbitMQ consumer created !")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Get

Will get a message from a queue

client.basicGetFuture("my.queue", true).onComplete{
  case Success(result) => {
    var msg = result
    println(s"Got message: ${msg.getValue("body")}")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Consume messages without auto-ack

// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", (msg: io.vertx.scala.core.eventbus.Message<java.lang.Object>) => {
  var json = msg.body()
  println(s"Got message: ${json.getValue("body")}")
  // ack
  client.basicAckFuture(json.getValue("deliveryTag"), false).onComplete{
    case Success(result) => println("Success")
    case Failure(cause) => println("Failure")
  }
})

// Setup the link between rabbitmq consumer and event bus address
client.basicConsumeFuture("my.queue", "my.address", false).onComplete{
  case Success(result) => {
    println("RabbitMQ consumer created !")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work. <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>