Messaging

Vert.x AMQP Client

The Vert.x AMQP Client allows interacting with AMQP 1.0 brokers and routers. It allows:

  • Connecting to an AMQP broker or router - SASL and TLS connections are supported

  • Consuming message from a queue or a topic

  • Sending messages to a queue or a topic

  • Checking acknowledgement for sent messages

The AMQP 1.0 protocol support durable subscriptions, persistence, security, conversations, sophisticated routing…​ More details on the protocol can be found on the AMQP homepage.

The Vert.x AMQP client is based on Vert.x Proton. If you need fine-grain control, we recommend using Vert.x Proton directly.

Using Vert.x AMQP Client

To use the Vert.x AMQP Client, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-amqp-client</artifactId>
 <version>4.5.11</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-amqp-client:4.5.11'

Creating an AMQP client

Once you have added the client to your CLASSPATH, you can instantiate an AmqpClient as follows:

AmqpClientOptions options = new AmqpClientOptions()
  .setHost("localhost")
  .setPort(5672)
  .setUsername("user")
  .setPassword("secret");
// Create a client using its own internal Vert.x instance.
AmqpClient client1 = AmqpClient.create(options);

// USe an explicit Vert.x instance.
AmqpClient client2 = AmqpClient.create(vertx, options);

There are two methods to instantiate an AmqpClient. You can pass an explicit Vert.x instance. Use this approach if you are in a Vert.x application, or a Vert.x verticle. Otherwise you can omit passing the Vert.x instance, an internal instance is created and closed when the client is closed.

To instantiate an AmqpClient, you need to pass AmqpClientOptions. These options contains the location of the broker or router, credentials…​ Many aspect of the AMQP client can be configured using these options. Note that you can also use these options to configure the underlying Proton client.

Host, port, username and password can also be configured from system properties or environment variables:

  • Host: system property: amqp-client-host, environment variable: AMQP_CLIENT_HOST` (mandatory)

  • Port: system property: amqp-client-port, environment variable: AMQP_CLIENT_PORT` (defaults to 5672)

  • Username: system property: amqp-client-username, environment variable: AMQP_CLIENT_USERNAME`

  • Password: system property: amqp-client-password, environment variable: AMQP_CLIENT_PASSWORD`

Establishing a connection

Once you have created a client, you need to explicitly connect to the remote server. This is done using the connect method:

client
  .connect()
  .onComplete(ar -> {
  if (ar.failed()) {
    System.out.println("Unable to connect to the broker");
  } else {
    System.out.println("Connection succeeded");
    AmqpConnection connection = ar.result();
  }
});

Once established or failed, the handler is called. Note that the connection is used to create receivers and senders.

Creating a receiver

A receiver is used to receive messages. The AMQP receiver can be retrieved using one of the two following methods:

connection
  .createReceiver("my-queue")
  .onComplete(
  done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver.handler(msg -> {
        // called on every received messages
        System.out.println("Received " + msg.bodyAsString());
      });
    }
  }
);

connection
  .createReceiver("my-queue")
  .onComplete(
  done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver
        .exceptionHandler(t -> {
          // Error thrown.
        })
        .handler(msg -> {
          // Attach the message handler
        });
    }
  }
);

The main difference between these 2 approaches is when the message handler is attached to the receiver. In the first approach, the handler is immediately passed and will start receiving messages immediately. In the second approach, the handler is attached manually after the completion. This give you more control and let you attach other handlers.

The receiver passed in the completion handler can be used as a stream. So, you can pause and resume the reception of messages. The back-pressure protocol is implemented using AMQP credits.

The received messages are instances of AmqpMessage. Instances are immutable, and provide access to most of the metadata supported by AMQP. See the list of properties as references. Note that retrieving a JSON object or a JSON array from the body required the value to be passed as AMQP Data.

You can also create a receiver directly from the client:

client
  .createReceiver("my-queue")
  .onComplete(
  done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver.handler(msg -> {
        // called on every received messages
        System.out.println("Received " + msg.bodyAsString());
      });
    }
  }
);

In this case, a connection is established automatically. You can retrieve it using connection

By default the messages are automatically acknowledged. You can disable this behavior using setAutoAcknowledgement. Then, you need to explicitly acknowledge the incoming messages using: * accepted * rejected * released

Creating a sender

Senders allows publishing messages to queues and topics. You retrieve a sender as follows:

connection
  .createSender("my-queue")
  .onComplete(done -> {
  if (done.failed()) {
    System.out.println("Unable to create a sender");
  } else {
    AmqpSender result = done.result();
  }
});

Once you have retrieved an AMQP sender, you can create messages. Because AmqpMessage are immutable, the creation uses the AmqpMessageBuilder builder class. The following snippet provides a few examples:

AmqpMessageBuilder builder = AmqpMessage.create();

// Very simple message
AmqpMessage m1 = builder.withBody("hello").build();

// Message overriding the destination
AmqpMessage m2 = builder.withBody("hello").address("another-queue").build();

// Message with a JSON object as body, metadata and TTL
AmqpMessage m3 = builder
  .withJsonObjectAsBody(new JsonObject().put("message", "hello"))
  .subject("subject")
  .ttl(10000)
  .applicationProperties(new JsonObject().put("prop1", "value1"))
  .build();

Once you have the sender and created the message, you can send it using:

  • send - send the message

  • sendWithAck - send the message and monitor its acknowledgment

The simplest way to send a message is the following:

sender.send(AmqpMessage.create().withBody("hello").build());

When sending a message, you can monitor the acknowledgment:

sender
  .sendWithAck(AmqpMessage.create().withBody("hello").build())
  .onComplete(acked -> {
  if (acked.succeeded()) {
    System.out.println("Message accepted");
  } else {
    System.out.println("Message not accepted");
  }
});

Note that message is considered as acknowledged if the delivery is set fo ACCEPTED. Other delivery values are considered as non-acknowledged (details can be found in the passed cause).

The AmqpSender can be used as a write stream. The flow control is implemented using AMQP credits.

You can also create a sender directly from the client:

client
  .createSender("my-queue")
  .onComplete(maybeSender -> {
  //...
});

In this case, a connection is established automatically. You can retrieve it using connection.

Implementing request-reply

To implement a request-reply behavior, you could use a dynamic receiver and an anonymous sender. A dynamic receiver is not associated with an address by the user, but the address it provided by the broker. Anonymous senders are also not associated to a specific address, requiring all messages to contain an address.

The following snippet shows how request-reply can be implemented:

connection
  .createAnonymousSender()
  .onComplete(responseSender -> {
  // You got an anonymous sender, used to send the reply
  // Now register the main receiver:
  connection
    .createReceiver("my-queue")
    .onComplete(done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver.handler(msg -> {
        // You got the message, let's reply.
        responseSender.result().send(AmqpMessage.create()
          .address(msg.replyTo())
          .correlationId(msg.id()) // send the message id as correlation id
          .withBody("my response to your request")
          .build()
        );
      });
    }
  });
});

// On the sender side (sending the initial request and expecting a reply)
connection
  .createDynamicReceiver()
  .onComplete(replyReceiver -> {
  // We got a receiver, the address is provided by the broker
  String replyToAddress = replyReceiver.result().address();

  // Attach the handler receiving the reply
  replyReceiver.result().handler(msg -> {
    System.out.println("Got the reply! " + msg.bodyAsString());
  });

  // Create a sender and send the message:
  connection
    .createSender("my-queue")
    .onComplete(sender -> {
    sender.result().send(AmqpMessage.create()
      .replyTo(replyToAddress)
      .id("my-message-id")
      .withBody("This is my request").build());
  });
});

To reply to a message, send it to the address specified into the reply-to. Also, it’s a good practice to indicate the correlation id using the message id, so the reply receiver can associate the response to the request.

Closing the client

Once you are done with a connection receiver or sender, you should close them using the close method. Closing a connection, closes all created receivers and senders.

Once the client is not used anymore, you must also close it. It would close all opened connections, and as a consequences receivers and senders.