Vert.x-Stomp

STOMP is the Simple (or Streaming) Text Orientated Messaging Protocol. STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms and brokers. Get more details about STOMP on https://stomp.github.io/index.html.

Vertx-Stomp is an implementation of a STOMP server and client. You can use the STOMP server with other clients and use the STOMP client with other servers. The server and the client supports the version 1.0, 1.1 and 1.2 of the STOMP protocol (see https://stomp.github.io/stomp-specification-1.2.html). The STOMP server can also be used as a bridge with the vert.x event bus, or directly with web sockets (using StompJS).

Using vertx-stomp

To use the Vert.x Stomp server and client, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-stomp</artifactId>
 <version>4.1.8</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-stomp:4.1.8'

STOMP server

Creating a STOMP server

The simplest way to create an STOMP server, using all default options is as follows:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .listen();

This creates a STOMP server listening on localhost:61613 that is compliant with the STOMP specification.

You can configure the port and host in the listen method:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .listen(1234, "0.0.0.0");

If you pass -1 as port, the TCP server would not be started. This is useful when using the websocket bridge. To be notified when the server is ready, use a handler as follows:

StompServer server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .listen(ar -> {
      if (ar.failed()) {
        System.out.println("Failing to start the STOMP server : " + ar.cause().getMessage());
      } else {
        System.out.println("Ready to receive STOMP frames");
      }
    });

The handler receive a reference on the StompServer.

You can also configure the host and port in StompServerOptions:

Future<StompServer> server = StompServer.create(vertx, new StompServerOptions().setPort(1234).setHost("0.0.0.0"))
    .handler(StompServerHandler.create(vertx))
    .listen();

Closing a STOMP server

STOMP servers are closed as follows:

server.close(ar -> {
  if (ar.succeeded()) {
    System.out.println("The STOMP server has been closed");
  } else {
    System.out.println("The STOMP server failed to close : " + ar.cause().getMessage());
  }
});

Configuration

The StompServerOptions let you configure some aspects of the STOMP server.

First, the STOMP server is based on a NetServer, so you can configure the underlying NetServer from the StompServerOptions. Alternatively you can also pass the NetServer you want to use:

Future<StompServer> server = StompServer.create(vertx, netServer)
    .handler(StompServerHandler.create(vertx))
    .listen();

The StompServerOptions let you configure:

  • the host and port of the STOMP server - defaults to 0.0.0.0:61613.

  • whether or not the STOMP server is secured - defaults to false

  • the max STOMP frame body - default to 10 Mb

  • the maximum number of headers accepted in a STOMP frame - defaults to 1000

  • the max length of a header line in a STOMP frame - defaults to 10240

  • the STOMP heartbeat time - default to 1000, 1000

  • the supported STOMP protocol versions (1.0, 1.1 and 1.2 by default)

  • the maximum number of frame allowed in a transaction (defaults to 1000)

  • the size of the transaction chunk - defaults to 1000 (see setTransactionChunkSize)

  • the maximum number of subscriptions a client can handle - defaults to 1000

The STOMP heartbeat is configured using a JSON object as follows:

Future<StompServer> server = StompServer.create(vertx, new StompServerOptions().setHeartbeat(
    new JsonObject().put("x", 1000).put("y", 1000)))
    .handler(StompServerHandler.create(vertx))
    .listen();

Enabling security requires an additional AuthenticationProvider handling the authentication requests:

Future<StompServer> server = StompServer.create(vertx, new StompServerOptions().setSecured(true))
    .handler(StompServerHandler.create(vertx).authProvider(provider))
    .listen();

More information about AuthenticationProvider is available here.

If a frame exceeds one of the size limits, the frame is rejected and the client receives an ERROR frame. As the specification requires, the client connection is closed immediately after having sent the error. The same behavior happens with the other thresholds.

Subscriptions

The default STOMP server handles subscription destination as opaque Strings. So it does not promote a structure and it not hierarchic. By default the STOMP server follow a topic semantic (so messages are dispatched to all subscribers).

Type of destinations

By default, the STOMP server manages destinations as topics. So messages are dispatched to all subscribers. You can configure the server to use queues, or mix both types:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .destinationFactory((v, name) -> {
          if (name.startsWith("/queue")) {
            return Destination.queue(vertx, name);
          } else {
            return Destination.topic(vertx, name);
          }
        }))
    .listen();

In the last example, all destination starting with /queue are queues while others are topics. The destination is created when the first subscription on this destination is received.

A server can decide to reject the destination creation by returning null:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .destinationFactory((v, name) -> {
          if (name.startsWith("/forbidden")) {
            return null;
          } else if (name.startsWith("/queue")) {
            return Destination.queue(vertx, name);
          } else {
            return Destination.topic(vertx, name);
          }
        }))
    .listen();

In this case, the subscriber received an ERROR frame.

Queues dispatches messages using a round-robin strategies.

Providing your own type of destination

On purpose the STOMP server does not implement any advanced feature. IF you need more advanced dispatching policy, you can implement your own type of destination by providing a DestinationFactory returning your own Destination object.

Acknowledgment

By default, the STOMP server does nothing when a message is not acknowledged. You can customize this by providing your own Destination implementation.

The custom destination should call the

onAck and onNack method in order to let the StompServerHandler customizes the behavior:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .onAckHandler(acknowledgement -> {
          // Action to execute when the frames (one in `client-individual` mode, several
          // in `client` mode are acknowledged.
        })
        .onNackHandler(acknowledgement -> {
          // Action to execute when the frames (1 in `client-individual` mode, several in
          // `client` mode are not acknowledged.
        }))
    .listen();

Customizing the STOMP server

In addition to the handlers seen above, you can configure almost all aspects of the STOMP server, such as the actions made when specific frames are received, the ping to sent to the client (to implement the heartbeat). Here are some examples:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
            .closeHandler(connection -> {
              // client connection closed
            })
            .beginHandler(frame -> {
              // transaction starts
            })
            .commitHandler(frame -> {
                  // transaction committed
                }
            )
        //...
    ).listen();

Be aware that changing the default behavior may break the compliance with the STOMP specification. So, please look at the default implementations.

STOMP client

STOMP clients connect to STOMP server and can send and receive frames.

Creating a STOMP client

You create a StompClient instance with default options as follows:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    // use the connection
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

The previous snippet creates a STOMP client connecting to "0.0.0.0:61613". Once connected, you get a StompClientConnection that let you interact with the server. You can configure the host and port as follows:

StompClient.create(vertx)
  .connect(61613, "0.0.0.0")
  .onSuccess(connection -> {
    // use the connection
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

To catch connection errors due to authentication issues, or whatever error frames sent by the server during the connection negotiation, you can register a error handler on the Stomp Client. All connections created with the client inherit of the error handler (but can have their own):

StompClient.create(vertx)
  .errorFrameHandler(frame -> {
    // Received the ERROR frame
  })
  .connect(61613, "0.0.0.0")
  .onSuccess(connection -> {
    // use the connection
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

You can also configure the host and port in the StompClientOptions:

StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234))
  .connect()
  .onSuccess(connection -> {
    // use the connection
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

Closing a STOMP client

You can close a STOMP client:

StompClient client = StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234));

client
  .connect()
  .onSuccess(connection -> {
    // use the connection
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

client.close();

However, this way would not notify the server of the disconnection. To cleanly close the connection, you should use the disconnect method:

StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234))
  .connect()
  .onSuccess(connection -> {
    // use the connection
    connection.disconnect();
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

If the heartbeat is enabled and if the client did not detect server activity after the configured timeout, the connection is automatically closed.

Handling errors

On the StompClientConnection, you can register an error handler receiving ERROR frames sent by the server. Notice that the server closes the connection with the client after having sent such frame:

StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234))
  .connect()
  .onSuccess(connection -> {
    // use the connection
    connection
      .errorHandler(frame ->
        System.out.println("ERROR frame received : " + frame));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

The client can also be notified when a connection drop has been detected. Connection failures are detected using the STOMP heartbeat mechanism. When the server has not sent a message in the heartbeat time window, the connection is closed and the connectionDroppedHandler is called (if set). To configure a connectionDroppedHandler, call connectionDroppedHandler. The handler can for instance tries to reconnect to the server:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {

    connection.connectionDroppedHandler(con -> {
      // The connection has been lost
      // You can reconnect or switch to another server.
    });

    connection.send("/queue", Buffer.buffer("Hello"))
      .onSuccess(frame -> System.out.println("Message processed by the server")
      );
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

Configuration

You can configure various aspect by passing a StompClientOptions when creating the StompClient. As the STOMP client relies on a NetClient, you can configure the underlying Net Client from the StompClientOptions. Alternatively, you can pass the NetClient you want to use in the connect method:

StompClient.create(vertx)
  .connect(netClient)
  .onSuccess(connection -> {
    // use the connection
    connection
      .errorHandler(frame ->
        System.out.println("ERROR frame received : " + frame));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

The StompClientOptions let you configure:

  • the host and port ot the STOMP server

  • the login and passcode to connect to the server

  • whether or not the content-length header should be added to the frame if not set explicitly. (enabled by default)

  • whether or not the STOMP command should be used instead of the CONNECT command (disabled by default)

  • whether or not the host header should be ignored in the CONNECT frame (disabled by default)

  • the heartbeat configuration (1000, 1000 by default)

Subscribing to destinations

To subscribe to a destination, use:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    // use the connection
    connection.subscribe("/queue", frame ->
      System.out.println("Just received a frame from /queue : " + frame));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

To unsubscribe, use:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    // use the connection
    connection.subscribe("/queue", frame ->
      System.out.println("Just received a frame from /queue : " + frame));

    // ....

    connection.unsubscribe("/queue");
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

Sending messages

To send a message, use:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    Map<String, String> headers = new HashMap<>();
    headers.put("header1", "value1");
    connection.send("/queue", headers, Buffer.buffer("Hello"));
    // No headers:
    connection.send("/queue", Buffer.buffer("World"));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

In Java and Groovy, you can use the Headers class to ease the header creation.

Acknowledgements

Clients can send ACK and NACK frames:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    connection.subscribe("/queue", frame -> {
      connection.ack(frame.getAck());
      // OR
      connection.nack(frame.getAck());
    });
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

Transactions

Clients can also create transactions. ACK, NACK and SEND frames sent in the transaction will be delivery only when the transaction is committed.

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    Map<String, String> headers = new HashMap<>();
    headers.put("transaction", "my-transaction");
    connection.beginTX("my-transaction");
    connection.send("/queue", headers, Buffer.buffer("Hello"));
    connection.send("/queue", headers, Buffer.buffer("World"));
    connection.send("/queue", headers, Buffer.buffer("!!!"));
    connection.commit("my-transaction");
    // OR
    connection.abort("my-transaction");
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

Receipt

Each sent commands can have a receipt handler, notified when the server has processed the message:

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    connection
      .send("/queue", Buffer.buffer("Hello"))
      .onSuccess(frame ->
        System.out.println("Message processed by the server"));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

Using the STOMP server as a bridge to the vert.x Event Bus

The STOMP server can be used as a bridge to the vert.x Event Bus. The bridge is bi-directional meaning the STOMP frames are translated to Event Bus messages and Event Bus messages are translated to STOMP frames.

To enable the bridge you need to configure the inbound and outbound addresses. Inbound addresses are STOMP destination that are transferred to the event bus. The STOMP destination is used as the event bus address. Outbound addresses are event bus addresses that are transferred to STOMP.

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .bridge(new BridgeOptions()
            .addInboundPermitted(new PermittedOptions().setAddress("/toBus"))
            .addOutboundPermitted(new PermittedOptions().setAddress("/toStomp"))
        )
    )
    .listen();

By default, the bridge use a publish/subscribe delivery (topic). You can configure it to use a point to point delivery where only one STOMP client or Event Bus consumer is invoked:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
            .bridge(new BridgeOptions()
                    .addInboundPermitted(new PermittedOptions().setAddress("/toBus"))
                    .addOutboundPermitted(new PermittedOptions().setAddress("/toStomp"))
                    .setPointToPoint(true)
            )
    )
    .listen();

The permitted options can also be expressed as a "regex" or with a match. A match is a structure that the message payload must meet. For instance, in the next examples, the payload must contains the field "foo" set to "bar". Structure match only supports JSON object.

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .bridge(new BridgeOptions()
            .addInboundPermitted(new PermittedOptions().setAddress("/toBus")
                .setMatch(new JsonObject().put("foo", "bar")))
            .addOutboundPermitted(new PermittedOptions().setAddress("/toStomp"))
            .setPointToPoint(true)
        )
    )
    .listen();

Using the STOMP server with web sockets

If you want to connect a JavaScript client (node.js or a browser) directly with the STOMP server, you can use a web socket. The STOMP protocol has been adapted to work over web sockets in StompJS. The JavaScript connects directly to the STOMP server and send STOMP frames on the web socket. It also receives the STOMP frame directly on the web socket.

To configure the server to use StompJS, you need to:

  1. Enable the web socket bridge and configure the path of the listening web socket (/stomp by default).

  2. Import StompJS in your application (as a script on an HTML page, or as an npm module (https://www.npmjs.com/package/stompjs).

  3. Connect to the server

To achieve the first step, you would need a HTTP server, and pass the webSocketHandler result to webSocketHandler:

StompServer server = StompServer.create(vertx, new StompServerOptions()
    .setPort(-1) // Disable the TCP port, optional
    .setWebsocketBridge(true) // Enable the web socket support
    .setWebsocketPath("/stomp")) // Configure the web socket path, /stomp by default
    .handler(StompServerHandler.create(vertx));

Future<HttpServer> http = vertx.createHttpServer(
    new HttpServerOptions().setWebSocketSubProtocols(Arrays.asList("v10.stomp", "v11.stomp"))
)
    .webSocketHandler(server.webSocketHandler())
    .listen(8080);

Don’t forget to declare the supported sub-protocols. Without this, the connection will be rejected.

Then follow the instructions from the StompJS documentation to connect to the server. Here is a simple example:

var url = "ws://localhost:8080/stomp";
var client = Stomp.client(url);
var callback = function(frame) {
  console.log(frame);
};

client.connect({}, function() {
var subscription = client.subscribe("foo", callback);
});

Registering received and writing frame handlers

STOMP clients, client’s connections and server handlers support registering a received Frame handler that would be notified every time a frame is received from the wire. It lets you log the frames, or implement custom behavior. The handler is already called for PING frames, and illegal / unknown frames:

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx).receivedFrameHandler(sf -> {
      System.out.println(sf.frame());
    }))
    .listen();

StompClient client = StompClient.create(vertx).receivedFrameHandler(frame -> System.out.println(frame));

The handler is called before the frame is processed, so you can also modify the frame.

Frames not using a valid STOMP command use the UNKNOWN command. The original command is written in the headers using the Frame.STOMP_FRAME_COMMAND key.

You can also register a handler to be notified when a frame is going to be sent (written to the wire):

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .writingFrameHandler(sf -> {
      System.out.println(sf.frame());
    })
    .listen();

StompClient client = StompClient.create(vertx).writingFrameHandler(frame -> {
  System.out.println(frame);
});