Skip to main content

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-rabbitmq-client-scala_2.12</artifactId>
  <version>3.5.0</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
  compile 'io.vertx:vertx-rabbitmq-client-scala_2.12:3.5.0'
}

Create a client

You can create a client instance as follows using a full amqp uri:

var config = RabbitMQOptions()
// full amqp uri
config.setUri("amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc")
var client = RabbitMQClient.create(vertx, config)

Or you can also specify individual parameters manually:

var config = RabbitMQOptions()
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1")
config.setPassword("password1")
config.setHost("localhost")
config.setPort(5672)
config.setVirtualHost("vhost1")
config.setConnectionTimeout(6000)
config.setRequestedHeartbeat(60)
config.setHandshakeTimeout(6000)
config.setRequestedChannelMax(5)
config.setNetworkRecoveryInterval(500)
config.setAutomaticRecoveryEnabled(true)

var client = RabbitMQClient.create(vertx, config)

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

var config = Map()

config + ("x-dead-letter-exchange" -> "my.deadletter.exchange")
config + ("alternate-exchange" -> "my.alternate.exchange")
// ...
client.exchangeDeclareFuture("my.exchange", "fanout", true, false, config).onComplete{
  case Success(result) => {
    println("Exchange successfully declared with config")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Operations

The following are some examples of the operations supported by the RabbitMQService API.

Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

var message = new io.vertx.core.json.JsonObject().put("body", "Hello RabbitMQ, from Vert.x !")
client.basicPublishFuture("", "my.queue", message).onComplete{
  case Success(result) => {
    println("Message published !")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Publish with confirm

Publish a message to a queue and confirm the broker acknowledged it.

var message = new io.vertx.core.json.JsonObject().put("body", "Hello RabbitMQ, from Vert.x !")

// Put the channel in confirm mode. This can be done once at init.
client.confirmSelectFuture().onComplete{
  case Success(result) => {
    client.basicPublishFuture("", "my.queue", message).onComplete{
      case Success(result) => {
        // Check the message got confirmed by the broker.
        client.waitForConfirmsFuture().onComplete{
          case Success(result) => {
            println("Message published !")}
          case Failure(cause) => {
            println(s"$cause")
          }
        }
      }
      case Failure(cause) => {
        println(s"$cause")
      }
    }
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Consume

Consume messages from a queue

// Create the event bus handler which messages will be sent to
// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", (msg: io.vertx.scala.core.eventbus.Message<java.lang.Object>) => {
  var json = msg.body()
  println(s"Got message: ${json.getValue("body")}")
})

// Setup the link between rabbitmq consumer and event bus address
client.basicConsumeFuture("my.queue", "my.address").onComplete{
  case Success(result) => {
    println("RabbitMQ consumer created !")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Get

Will get a message from a queue

client.basicGetFuture("my.queue", true).onComplete{
  case Success(result) => {
    var msg = result
    println(s"Got message: ${msg.getValue("body")}")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Consume messages without auto-ack

// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", (msg: io.vertx.scala.core.eventbus.Message<java.lang.Object>) => {
  var json = msg.body()
  println(s"Got message: ${json.getValue("body")}")
  // ack
  client.basicAckFuture(json.getValue("deliveryTag"), false).onComplete{
    case Success(result) => println("Success")
    case Failure(cause) => println("Failure")
  }
})

// Setup the link between rabbitmq consumer and event bus address
client.basicConsumeFuture("my.queue", "my.address", false).onComplete{
  case Success(result) => {
    println("RabbitMQ consumer created !")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work. <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>