Skip to main content

The Vert.x AMQP Client allows interacting with AMQP 1.0 brokers and routers. It allows:

  • Connecting to an AMQP broker or router - SASL and TLS connections are supported

  • Consuming message from a queue or a topic

  • Sending messages to a queue or a topic

  • Checking acknowledgement for sent messages

The AMQP 1.0 protocol support durable subscriptions, persistence, security, conversations, sophisticated routing…​ More details on the protocol can be found on the AMQP homepage.

The Vert.x AMQP client is based on Vert.x Proton. If you need fine-grain control, we recommend using Vert.x Proton directly.

Using Vert.x AMQP Client

To use the Vert.x AMQP Client, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-amqp-client</artifactId>
 <version>3.8.0</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-amqp-client:3.8.0'

Creating an AMQP client

Once you have added the client to your CLASSPATH, you can instantiate an AmqpClient as follows:

require 'vertx-amqp-client/amqp_client'
options = {
  'host' => "localhost",
  'port' => 5672,
  'username' => "user",
  'password' => "secret"
}
# Create a client using its own internal Vert.x instance.
client1 = VertxAmqpClient::AmqpClient.create(options)

# USe an explicit Vert.x instance.
client2 = VertxAmqpClient::AmqpClient.create(vertx, options)

There are two methods to instantiate an AmqpClient. You can pass an explicit Vert.x instance. Use this approach if you are in a Vert.x application, or a Vert.x verticle. Otherwise you can omit passing the Vert.x instance, an internal instance is created and closed when the client is closed.

To instantiate an AmqpClient, you need to pass AmqpClientOptions. These options contains the location of the broker or router, credentials…​ Many aspect of the AMQP client can be configured using these options. Note that you can also use these options to configure the underlying Proton client.

Host, port, username and password can also be configured from system properties or environment variables:

  • Host: system property: amqp-client-host, environment variable: AMQP_CLIENT_HOST` (mandatory)

  • Port: system property: amqp-client-port, environment variable: AMQP_CLIENT_PORT` (defaults to 5672)

  • Username: system property: amqp-client-username, environment variable: AMQP_CLIENT_USERNAME`

  • Password: system property: amqp-client-password, environment variable: AMQP_CLIENT_PASSWORD`

Establishing a connection

Once you have created a client, you need to explicitly connect to the remote server. This is done using the connect method:

client.connect() { |ar_err,ar|
  if (ar_err != nil)
    puts "Unable to connect to the broker"
  else
    puts "Connection succeeded"
    connection = ar
  end
}

Once established or failed, the handler is called. Note that the connection is used to create receivers and senders.

Creating a receiver

A receiver is used to receive messages. The AMQP receiver can be retrieved using one of the two following methods:

connection.create_receiver("my-queue", lambda { |msg|
  # called on every received messages
  puts "Received #{msg.body_as_string()}"
}) { |done_err,done|
  if (done_err != nil)
    puts "Unable to create receiver"
  else
    receiver = done
  end
}


connection.create_receiver("my-queue") { |done_err,done|
  if (done_err != nil)
    puts "Unable to create receiver"
  else
    receiver = done
    receiver.exception_handler() { |t|
      # Error thrown.
    }.handler() { |msg|
      # Attach the message handler
    }
  end
}

The main difference between these 2 approaches is when the message handler is attached to the receiver. In the first approach, the handler is immediately passed and will start receiving messages immediately. In the second approach, the handler is attached manually after the completion. This give you more control and let you attach other handlers.

The receiver passed in the completion handler can be used as a stream. So, you can pause and resume the reception of messages. The back-pressure protocol is implemented using AMQP credits.

The received messages are instances of AmqpMessage. Instances are immutable, and provide access to most of the metadata supported by AMQP. See the list of properties as references. Note that retrieving a JSON object or a JSON array from the body required the value to be passed as AMQP Data.

You can also create a receiver directly from the client:

client.create_receiver("my-queue", lambda { |msg|
  # called on every received messages
  puts "Received #{msg.body_as_string()}"
}) { |done_err,done|
  if (done_err != nil)
    puts "Unable to create receiver"
  else
    receiver = done
  end
}

In this case, a connection is established automatically. You can retrieve it using connection

By default the messages are automatically acknowledged. You can disable this behavior using autoAcknowledgement. Then, you need to explicitly acknowledge the incoming messages using: * accepted * rejected * released

Creating a sender

Senders allows publishing messages to queues and topics. You retrieve a sender as follows:

connection.create_sender("my-queue") { |done_err,done|
  if (done_err != nil)
    puts "Unable to create a sender"
  else
    result = done
  end
}

Once you have retrieved an AMQP sender, you can create messages. Because AmqpMessage are immutable, the creation uses the AmqpMessageBuilder builder class. The following snippet provides a few examples:

require 'vertx-amqp-client/amqp_message'
# Retrieve a builder
builder = VertxAmqpClient::AmqpMessage.create()

# Very simple message
m1 = builder.with_body("hello").build()

# Message overriding the destination
m2 = builder.with_body("hello").address("another-queue").build()

# Message with a JSON object as body, metadata and TTL
m3 = builder.with_json_object_as_body({
  'message' => "hello"
}).subject("subject").ttl(10000).application_properties({
  'prop1' => "value1"
}).build()

Once you have the sender and created the message, you can send it using:

  • send - send the message

  • sendWithAck - send the message and monitor its acknowledgment

The simplest way to send a message is the following:

require 'vertx-amqp-client/amqp_message'
sender.send(VertxAmqpClient::AmqpMessage.create().with_body("hello").build())

When sending a message, you can monitor the acknowledgment:

require 'vertx-amqp-client/amqp_message'
sender.send_with_ack(VertxAmqpClient::AmqpMessage.create().with_body("hello").build()) { |acked_err,acked|
  if (acked_err == nil)
    puts "Message accepted"
  else
    puts "Message not accepted"
  end
}

Note that message is considered as acknowledged if the delivery is set fo ACCEPTED. Other delivery values are considered as non-acknowledged (details can be found in the passed cause).

The AmqpSender can be used as a write stream. The flow control is implemented using AMQP credits.

You can also create a sender directly from the client:

client.create_sender("my-queue") { |maybeSender_err,maybeSender|
  #...
}

In this case, a connection is established automatically. You can retrieve it using connection.

Implementing request-reply

To implement a request-reply behavior, you could use a dynamic receiver and an anonymous sender. A dynamic receiver is not associated with an address by the user, but the address it provided by the broker. Anonymous senders are also not associated to a specific address, requiring all messages to contain an address.

The following snippet shows how request-reply can be implemented:

require 'vertx-amqp-client/amqp_message'
# On the receiver side (receiving the initial request and replying)
connection.create_anonymous_sender() { |responseSender_err,responseSender|
  # You got an anonymous sender, used to send the reply
  # Now register the main receiver:
  connection.create_receiver("my-queue", lambda { |msg|
    # You got the message, let's reply.
    responseSender.send(VertxAmqpClient::AmqpMessage.create().address(msg.reply_to()).correlation_id(msg.id()).with_body("my response to your request").build())
  }) { |done_err,done|
    # We are done, for the receiver side
  }
}

# On the sender side (sending the initial request and expecting a reply)
connection.create_dynamic_receiver() { |replyReceiver_err,replyReceiver|
  # We got a receiver, the address is provided by the broker
  replyToAddress = replyReceiver.address()

  # Attach the handler receiving the reply
  replyReceiver.handler() { |msg|
    puts "Got the reply! #{msg.body_as_string()}"
  }

  # Create a sender and send the message:
  connection.create_sender("my-queue") { |sender_err,sender|
    sender.send(VertxAmqpClient::AmqpMessage.create().reply_to(replyToAddress).id("my-message-id").with_body("This is my request").build())
  }
}

To reply to a message, send it to the address specified into the reply-to. Also, it’s a good practice to indicate the correlation id using the message id, so the reply receiver can associate the response to the request.

Closing the client

Once you are done with a connection receiver or sender, you should close them using the close method. Closing a connection, closes all created receivers and senders.

Once the client is not used anymore, you must also close it. It would close all opened connections, and as a consequences receivers and senders.