<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
<version>5.1.3</version>
</dependency> Vert.x MQTT
Using Vert.x MQTT
This component had officially released in the Vert.x stack, just following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml):
-
Gradle (in your
build.gradlefile):
compile "io.vertx:vertx-mqtt:5.1.3" Vert.x MQTT server
This component provides a server which is able to handle connections, communication and messages exchange with remote MQTT clients. Its API provides a bunch of events related to raw protocol messages received by clients and exposes some features in order to send messages to them.
At the moment of writing, it supports all MQTT version 5.0 features except AUTH message which is yet to be implemented.
It’s not a fully featured MQTT broker but can be used for building something like that or for protocol translation.
| this module has the tech preview status, this means the API can change between versions. |
Handling client connection/disconnection
This example shows how it’s possible to handle the connection request from a remote MQTT client. First, an MqttServer instance is created and the endpointHandler method is used to specify the handler called when a remote client sends a CONNECT message for connecting to the server itself. The MqttEndpoint instance, provided as parameter to the handler, brings all main information related to the CONNECT message like client identifier, username/password, "will" information, clean session flag, protocol version and, "keep alive" timeout and CONNECT message properties (for MQTT version 5.0). Inside that handler, the endpoint instance provides the accept method for replying to the remote client with the corresponding CONNACK message : in this way, the connection is established. Finally, the server is started using the listen method with the default behavior (on localhost and default MQTT port 1883). The same method allows to specify an handler in order to check if the server is started properly or not.
MqttServer mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(endpoint -> {
// shows main connect info
System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
if (endpoint.auth() != null) {
System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
}
System.out.println("[properties = " + endpoint.connectProperties() + "]");
if (endpoint.will() != null) {
System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
" QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
}
System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");
// accept connection from the remote client
endpoint.accept(false);
})
.listen()
.onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("MQTT server is listening on port " + ar.result().actualPort());
} else {
System.out.println("Error on starting the server");
ar.cause().printStackTrace();
}
}); The same endpoint instance provides the disconnectMessageHandler for specifying the handler called when the remote client sends a DISCONNECT message in order to disconnect from the server; this handler takes MqttDisconnectMessage as a parameter.
endpoint.disconnectMessageHandler(disconnectMessage -> {
System.out.println("Received disconnect from client, reason code = " + disconnectMessage.code());
}); If MQTT version 5.0 or newer is used server can send DISCONNECT message to client with the reason code and properties using disconnect.
Handling client connection/disconnection with SSL/TLS support
The server has the support for accepting connection requests through the SSL/TLS protocol for authentication and encryption. In order to do that, the MqttServerOptions class provides the setSsl method for setting the usage of SSL/TLS (passing 'true' as value) and some other useful methods for providing server certificate and related private key (as Java key store reference, PEM or PFX format). In the following example, the setKeyCertOptions method is used in order to pass the certificates in PEM format. This method requires an instance of the possible implementations of the KeyCertOptions interface and in this case the PemKeyCertOptions class is used in order to provide the path for the server certificate and the private key with the correspondent setCertPath and setKeyPath methods. The MQTT server is started passing the Vert.x instance as usual and the above MQTT options instance to the creation method.
MqttServerOptions options = new MqttServerOptions()
.setPort(8883)
.setKeyCertOptions(new PemKeyCertOptions()
.setKeyPath("./src/test/resources/tls/server-key.pem")
.setCertPath("./src/test/resources/tls/server-cert.pem"))
.setSsl(true);
MqttServer mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(endpoint -> {
// shows main connect info
System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
if (endpoint.auth() != null) {
System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
}
if (endpoint.will() != null) {
System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
" QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
}
System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");
// accept connection from the remote client
endpoint.accept(false);
})
.listen()
.onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("MQTT server is listening on port " + ar.result().actualPort());
} else {
System.out.println("Error on starting the server");
ar.cause().printStackTrace();
}
}); Handling client connections via WebSocket
If you want to support connections via WebSockets, you can enable this via MqttServerOptions, too. By passing true to setUseWebSocket, it will listen for websocket connections on the path /mqtt.
As with other setup configurations, the resulting endpoint connections and related disconnection are managed the same way as regular connections.
DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options); Handling client subscription/unsubscription request
After a connection is established between client and server, the client can send a subscription request for a topic using the SUBSCRIBE message. The MqttEndpoint interface allows to specify a handler for the incoming subscription request using the subscribeHandler method. Such handler receives an instance of the MqttSubscribeMessage interface which brings the list of topics with the related subscription options as desired by the client. Subscription options include QoS level and related flags and for MQTT version 5.0 also additional flags, such as noLocal and retainAsPublished. Finally, the endpoint instance provides the subscribeAcknowledge method for replying to the client with the related SUBACK message containing the reason code (which is either QoS level or error code - separate per each topic or pattern) and message properties.
endpoint.subscribeHandler(subscribe -> {
List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();
for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService()));
}
// ack the subscriptions request
endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
}); In the same way, it’s possible to use the unsubscribeHandler method on the endpoint in order to specify the handler called when the client sends an UNSUBSCRIBE message. This handler receives an instance of the MqttUnsubscribeMessage interface as parameter with the list of topics to unsubscribe. Finally, the endpoint instance provides the unsubscribeAcknowledge and unsubscribeAcknowledge methods for replying to the client with the related UNSUBACK message - either simply acknowledging all unsubscriptions, or specifying the reasons per each topic and the properties in the UNSUBSCRIBE request (supported in MQTT v 5.0 or later).
endpoint.unsubscribeHandler(unsubscribe -> {
for (String t: unsubscribe.topics()) {
System.out.println("Unsubscription for " + t);
}
// ack the subscriptions request
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
}); Handling client published message
In order to handle incoming messages published by the remote client, the MqttEndpoint interface provides the publishHandler method for specifying the handler called when the client sends a PUBLISH message. This handler receives an instance of the MqttPublishMessage interface as parameter with the payload, the QoS level, the duplicate and retain flags, message properties.
If the QoS level is 0 (AT_MOST_ONCE), there is no need from the endpoint to reply the client.
If the QoS level is 1 (AT_LEAST_ONCE), the endpoind needs to reply with a PUBACK message using the available publishAcknowledge or publishAcknowledge method.
If the QoS level is 2 (EXACTLY_ONCE), the endpoint needs to reply with a PUBREC message using the available publishReceived or publishReceived method; in this case the same endpoint should handle the PUBREL message received from the client as well (the remote client sends it after receiving the PUBREC from the endpoint) and it can do that specifying the handler through the publishReleaseHandler or publishReleaseMessageHandler method - depending on whether the server needs access to MQTT version 5.0 extended capabilities (reason code, message properties). In order to close the QoS level 2 delivery, the endpoint can use the publishComplete or publishComplete method for sending the PUBCOMP message to the client.
endpoint.publishHandler(message -> {
System.out.println("Just received message [" + message.payload().toString(Charset.defaultCharset()) + "] with QoS [" + message.qosLevel() + "]");
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(message.messageId());
}
}).publishReleaseHandler(messageId -> {
endpoint.publishComplete(messageId);
}); Publish message to the client
The endpoint can publish a message to the remote client (sending a PUBLISH message) using the publish method which takes the following input parameters : the topic to publish, the payload, the QoS level, the duplicate and retain flags. If you’re using MQTT version 5.0 or newer and you’d like to specify message properties you can use publish method instead which takes message ID and message properties in addition to the previously described method.
If the QoS level is 0 (AT_MOST_ONCE), the endpoint won’t be receiving any feedback from the client.
If the QoS level is 1 (AT_LEAST_ONCE), the endpoint needs to handle the PUBACK message received from the client in order to receive final acknowledge of delivery. It’s possible using the publishAcknowledgeHandler or publishAcknowledgeMessageHandler method specifying such a handler.
If the QoS level is 2 (EXACTLY_ONCE), the endpoint needs to handle the PUBREC message received from the client. The publishReceivedHandler and publishReceivedMessageHandler methods allow to specify the handler for that. Inside that handler, the endpoint can use the publishRelease or publishRelease method for replying to the client with the PUBREL message. The last step is to handle the PUBCOMP message received from the client as final acknowledge for the published message; it’s possible using the publishCompletionHandler or publishCompletionMessageHandler for specifying the handler called when the final PUBCOMP message is received.
endpoint.publish("my_topic",
Buffer.buffer("Hello from the Vert.x MQTT server"),
MqttQoS.EXACTLY_ONCE,
false,
false);
// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
}).publishReceivedHandler(messageId -> {
endpoint.publishRelease(messageId);
}).publishCompletionHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
}); Be notified by client keep alive
The underlying MQTT keep alive mechanism is handled by the server internally. When the CONNECT message is received, the server takes care of the keep alive timeout specified inside that message in order to check if the client doesn’t send messages in such timeout. At same time, for every PINGREQ received, the server replies with the related PINGRESP.
Even if there is no need for the high level application to handle that, the MqttEndpoint interface provides the pingHandler method for specifying an handler called when a PINGREQ message is received from the client. It’s just a notification to the application that the client isn’t sending meaningful messages but only pings for keeping alive; in any case the PINGRESP is automatically sent by the server internally as described above.
endpoint.pingHandler(v -> {
System.out.println("Ping received from client");
}); Closing the server
The MqttServer interface provides the close method that can be used for closing the server; it stops to listen for incoming connections and closes all the active connections with remote clients. This method is asynchronous and one overload provides the possibility to specify a complention handler that will be called when the server is really closed.
mqttServer.close().onComplete(v -> {
System.out.println("MQTT server closed");
}); Handling client auth packet/Sending AUTH packet to remote client(Only in MQTT version 5)
After a connection is established between client and server, the client can send an auth packet to server using the AUTH message. The MqttEndpoint interface allows to specify a handler for the incoming auth packet using the authenticationExchangeHandler method. Such handler receives an instance of the MqttAuthenticationExchangeMessage interface which brings the reason code, the authentication method and data. The server could continue to send AUTH packet using the authenticationExchange for authentication or just passed it.
endpoint.authenticationExchange(MqttAuthenticationExchangeMessage.create(MqttAuthenticateReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES));
// handling auth from client
endpoint.authenticationExchangeHandler(auth -> {
System.out.println("AUTH packet received from client. code: " + auth.reasonCode());
}); Automatic clean-up in verticles
If you’re creating MQTT servers from inside verticles, those servers will be automatically closed when the verticle is undeployed.
Scaling : sharing MQTT servers
The handlers related to the MQTT server are always executed in the same event loop thread. It means that on a system with more cores, only one instance is deployed so only one core is used. In order to use more cores, it’s possible to deploy more instances of the MQTT server.
It’s possible to do that programmatically:
for (int i = 0; i < 10; i++) {
MqttServer mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(endpoint -> {
// handling endpoint
})
.listen()
.onComplete(ar -> {
// handling start listening
});
} or using a verticle specifying the number of instances:
DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options); What’s really happen is that even only MQTT server is deployed but as incoming connections arrive, Vert.x distributes them in a round-robin fashion to any of the connect handlers executed on different cores.
Vert.x MQTT client
This component provides an MQTT client which is compliant with the 3.1.1 and 5.0 specs. Its API provides a bunch of methods for connecting/disconnecting to a broker, publishing messages (with all three different levels of QoS) and subscribing to topics. MQTT 5.0 features such as user properties, reason codes, subscription options, subscription identifiers, topic aliases and automatic server redirect are supported in addition to the 3.1.1 baseline.
| This module has the tech preview status, this means the API can change between versions. |
Connect/Disconnect
The client gives you opportunity to connect to a server and disconnect from it. Also, you could specify things like the host and port of a server you would like to connect to passing instance of MqttClientOptions as a param through constructor.
This example shows how you could connect to a server and disconnect from it using Vert.x MQTT client and calling connect and disconnect methods.
MqttClient client = MqttClient.create(vertx);
client.connect(1883, "mqtt.eclipse.org").onComplete(s -> {
client.disconnect();
}); The default address of the server provided by MqttClientOptions is localhost:1883 and localhost:8883 if you are using SSL/TSL. |
Subscribe to a topic
Now, lest go deeper and take look at this example:
client.publishHandler(s -> {
System.out.println("There are new message in topic: " + s.topicName());
System.out.println("Content(as string) of the message: " + s.payload().toString());
System.out.println("QoS: " + s.qosLevel());
})
.subscribe("rpi2/temp", 2); Here we have the example of usage of subscribe method. In order to receive messages from rpi2/temp topic we call subscribe method. Although, to handle received messages from server you need to provide a handler, which will be called each time you have a new messages in the topics you subscribe on. As this example shows, handler could be provided via publishHandler method.
Publishing message to a topic
If you would like to publish some message into topic then publish should be called. Let’s take a look at the example:
client.publish("temperature",
Buffer.buffer("hello"),
MqttQoS.AT_LEAST_ONCE,
false,
false); In the example, we send message to topic with name "temperature".
Handling server auth request/Sending AUTH packet to server(Only in MQTT version 5)
After a connection is established between client and server, the client can send an auth request to server using the authenticationExchange for authentication. The Server may return an AUTH packet. The MqttClient interface allows to specify a handler for the incoming auth packet using the authenticationExchangeHandler method. Such handler receives an instance of the MqttAuthenticationExchangeMessage interface which brings the reason code, the authentication method and data.
client.authenticationExchange(MqttAuthenticateReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES);
client.authenticationExchangeHandler(auth -> {
//The handler will be called time to time by default
System.out.println("We have just received AUTH packet: " + auth.reasonCode());
}); Keep connection with server alive
In order to keep connection with server you should time to time send something to server otherwise server will close the connection. The right way to keep connection alive is a ping method.
By default, your client keep connections with server automatically. That means that you don’t need to call ping in order to keep connections with server. The MqttClient will do it for you. |
If you want to disable this feature then you should call setAutoKeepAlive with false as argument:
options.setAutoKeepAlive(false); Be notified when
-
publish is completed
You could provide handler by calling
publishCompletionHandler. The handler will be called each time publish is completed. This one is pretty useful because you could see the packetId of just received PUBACK or PUBCOMP packet.client.publishCompletionHandler(id -> { System.out.println("Id of just received PUBACK or PUBCOMP packet is " + id); }); // The line of code below will trigger publishCompletionHandler (QoS 2) client.publish("hello", Buffer.buffer("hello"), MqttQoS.EXACTLY_ONCE, false, false); // The line of code below will trigger publishCompletionHandler (QoS is 1) client.publish("hello", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false); // The line of code below does not trigger because QoS value is 0 client.publish("hello", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false);The handler WILL NOT BE CALLED if sent publish packet with QoS=0. -
subscribe completed
client.subscribeCompletionHandler(mqttSubAckMessage -> { System.out.println("Id of just received SUBACK packet is " + mqttSubAckMessage.messageId()); for (int s : mqttSubAckMessage.grantedQoSLevels()) { if (s == 0x80) { System.out.println("Failure"); } else { System.out.println("Success. Maximum QoS is " + s); } } }); client.subscribe("temp", 1); client.subscribe("temp2", 2); -
unsubscribe completed
client .unsubscribeCompletionHandler(id -> { System.out.println("Id of just received UNSUBACK packet is " + id); }); client.subscribe("temp", 1); client.unsubscribe("temp"); -
unsubscribe sent
client.subscribe("temp", 1); client.unsubscribe("temp").onSuccess(id -> { System.out.println("Id of just sent UNSUBSCRIBE packet is " + id); }); -
PINGRESP received
client.pingResponseHandler(s -> { //The handler will be called time to time by default System.out.println("We have just received PINGRESP packet"); });
Connecting using TLS
You can connect to an MQTT server using TLS by configuring the client TCP options, make sure to set:
-
the SSL flag
-
the server certificate or the trust all flag
-
the hostname verification algorithm to
"HTTPS"if you want to verify the server identity otherwise""
MqttClientOptions options = new MqttClientOptions();
options
.setSsl(true)
.setTrustOptions(new PemTrustOptions().addCertPath("/path/to/server.crt"))
// Algo can be the empty string "" or "HTTPS" to verify the server hostname
.setHostnameVerificationAlgorithm(algo);
MqttClient client = MqttClient.create(vertx, options);
client.connect(1883, "mqtt.eclipse.org").onComplete(s -> {
client.disconnect();
}); | More details on the TLS client config can be found here |
Use proxy protocol
MqttServer mqttServer = MqttServer
.create(vertx, new MqttServerOptions()
// set true to use proxy protocol
.setUseProxyProtocol(true));
mqttServer.endpointHandler(endpoint -> {
// remote address is origin real address, not proxy's address
System.out.println(endpoint.remoteAddress());
endpoint.accept(false);
})
.listen()
.onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("MQTT server is listening on port " + ar.result().actualPort());
} else {
System.out.println("Error on starting the server");
ar.cause().printStackTrace();
}
}); If your servers are behind haproxy or nginx and you want to get the client’s original ip and port, then you need to set setUseProxyProtocol to true
To enable this feature, you need to add dependency netty-codec-haproxy, but it is not introduced by default, so you need to manually add it |
-
Maven (in your
pom.xml):
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-haproxy</artifactId>
<version>5.1.3</version>
</dependency> -
Gradle (in your
build.gradlefile):
compile "io.netty:netty-codec-haproxy:5.1.3" MQTT 5.0 client features
| this section is new and currently in tech preview, in other words the API is subject to be change until it reaches maturity |
The client targets MQTT 3.1.1 by default. To switch the wire protocol to MQTT 5.0, set the protocol version on MqttClientOptions before creating the client:
MqttClientOptions options = new MqttClientOptions();
options.setVersion(io.netty.handler.codec.mqtt.MqttVersion.MQTT_5.protocolLevel()); // 5
MqttClient client = MqttClient.create(vertx, options); Only the values 4 (MQTT 3.1.1) and 5 (MQTT 5.0) are accepted by setVersion.
CONNECT properties
When using MQTT 5.0, the following CONNECT properties can be configured on MqttClientOptions:
-
setSessionExpireInterval— Session Expiry Interval (seconds) -
setReceiveMaximum— maximum number of in-flight QoS 1/2 PUBLISH packets the client is willing to receive -
setMaximumPacketSize— maximum packet size the client will accept from the server -
setTopicAliasMaximum— highest topic alias value the client will accept from the server -
setRequestResponseInformation/setRequestProblemInformation
User properties can be attached to the CONNECT packet by using the connect overload that takes a Map<String, String> of user properties.
The result of the CONNECT is delivered as a MqttConnAckMessage which, on MQTT 5.0, exposes the full set of server-advertised properties (Receive Maximum, Maximum QoS, Retain Available, Maximum Packet Size, Assigned Client Identifier, Topic Alias Maximum, Reason String, Wildcard / Shared Subscription Available, Subscription Identifiers Available, Server Keep Alive, Response Information, Server Reference, Authentication Method/Data and user properties).
Session Expiry Interval
The MQTT 5.0 Session Expiry Interval (in seconds) replaces the 3.1.1 clean session flag and tells the broker how long session state (subscriptions, queued QoS 1/2 messages) must survive after the network connection is closed. It is set on MqttClientOptions before connecting:
MqttClientOptions options = new MqttClientOptions();
options.setVersion(io.netty.handler.codec.mqtt.MqttVersion.MQTT_5.protocolLevel());
options.setCleanSession(false); // required to keep a persistent session
options.setSessionExpireInterval(3600L); // keep the session for one hour after disconnect
MqttClient client = MqttClient.create(vertx, options); Semantics follow MQTT 5.0 §3.1.2.11:
-
null(the default) or0L— the session ends as soon as the network connection ends (equivalent to a clean session). -
a positive value — the session persists for that many seconds after the connection is closed.
-
0xFFFFFFFFL(4_294_967_295L) — the session never expires.
Valid values are in the range 0L..0xFFFFFFFFL; values outside this range cause setSessionExpireInterval to throw IllegalArgumentException. The property is only put on the wire when the protocol version is 5; on MQTT 3.1.1 the value is silently ignored and session persistence is driven exclusively by setCleanSession.
The broker may override the requested value: when present, the Session Expiry Interval returned in the CONNACK is the authoritative one for the lifetime of the session and can be inspected via sessionExpiryInterval.
Receive Maximum
Receive Maximum (MQTT 5.0 §3.1.2.11.3) is the per-direction flow-control window for QoS 1 and QoS 2 PUBLISH packets: it is the maximum number of in-flight messages (sent but not yet fully acknowledged) the peer is willing to accept. Each side advertises its own limit and must honour the limit advertised by the other.
Client side — set on MqttClientOptions before connecting:
MqttClientOptions options = new MqttClientOptions();
options.setVersion(io.netty.handler.codec.mqtt.MqttVersion.MQTT_5.protocolLevel());
options.setReceiveMaximum(20); // accept at most 20 concurrent in-flight QoS 1/2 PUBLISHes from the server
MqttClient client = MqttClient.create(vertx, options); -
The value is a positive 16-bit integer; valid range is
1..65535. Values outside0..0xFFFFare rejected bysetReceiveMaximumwithIllegalArgumentException. -
null(the default) omits the property from the CONNECT, which per spec means "no limit" (65535). -
The property is only put on the wire when the protocol version is
5; on MQTT 3.1.1 the value is ignored.
Server side — the value the broker advertises in its CONNACK is enforced automatically when calling publish for QoS 1 or 2:
-
if the number of in-flight messages already equals the server’s Receive Maximum, the returned
Futurefails with anMqttExceptionwhose code isMQTT_INFLIGHT_QUEUE_FULL; -
QoS 0 PUBLISH packets are never gated by this limit (they have no acknowledgement);
-
if the CONNACK omits the property, the client treats it as unlimited (
Integer.MAX_VALUEinternally); -
the negotiated value can be inspected via
receiveMaximum.
this is independent from (and applied in addition to) the existing client-local cap configured via setMaxInflightQueue; whichever limit is hit first triggers the MQTT_INFLIGHT_QUEUE_FULL failure. |
Last Will and Testament (LWT)
Will message configuration has been moved to a dedicated MqttClientWillOptions object that supports both 3.1.1 fields (topic, payload, QoS, retain) and the MQTT 5.0 will properties: Will Delay Interval, Payload Format Indicator, Content Type, Response Topic, Correlation Data and user properties. The will options can be set via setWillOptions; the legacy setWillTopic / setWillMessage / setWillQoS / setWillRetain setters are still available for 3.1.1 compatibility.
The standalone willFlag setter has been removed on the client side — the presence of a will is now derived from willTopic and willPayload. In JSON, the will is serialized as a nested willOptions object. |
Publishing with properties
When using MQTT 5.0, outgoing PUBLISH packets can carry additional properties (Payload Format Indicator, Message Expiry Interval, Content Type, Response Topic, Correlation Data, user properties, Topic Alias). Use publish to pass an MqttProperties instance alongside the payload. The Subscription Identifier property is not valid on a client-originated PUBLISH — see the dedicated section below.
The acknowledgement flow now exposes the full typed messages including reason code and properties:
-
publishAckMessageHandler— PUBACK (QoS 1) -
publishRecMessageHandler— PUBREC (QoS 2) -
publishCompMessageHandler— PUBCOMP (QoS 2)
These fire alongside the existing publishCompletionHandler so that 3.1.1 code keeps working. On the inbound side, the client can also acknowledge an incoming PUBLISH with reason code and properties via publishAcknowledge, publishReceived, publishRelease and publishComplete.
Subscriptions
Two MQTT 5.0 specific overloads of subscribe are available:
-
subscribe— same QoS map as the 3.1.1 API plus a properties object (for example a Subscription Identifier). -
subscribe— for fine-grained subscription options. EachMqttTopicSubscriptioncarries anMqttSubscriptionOptionthat encodes QoS plus the v5 flags (No Local,Retain As Published,Retain Handling).
unsubscribe has a matching unsubscribe overload that accepts properties. The SUBACK and UNSUBACK responses expose per-topic reason codes and properties via MqttSubAckMessage and MqttUnsubAckMessage; for UNSUBACK the dedicated unsubscribeCompletionMessageHandler delivers the full typed message.
Subscription Identifier
The Subscription Identifier (MQTT 5.0 §3.8.2.1.2) is a positive integer that the client attaches to a SUBSCRIBE request; the broker then echoes it back on every PUBLISH that matches that subscription (§3.3.2.3.8), so the client can route incoming messages to the correct handler without re-parsing the topic.
It is set as a property on the SUBSCRIBE packet, not on the PUBLISH:
MqttProperties subProps = new MqttProperties();
subProps.add(new MqttProperties.IntegerProperty(
MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value(),
42));
client.subscribe(java.util.Collections.singletonMap("sensors/+/temperature", 1), subProps); The same subProps can be passed to the list-based overload subscribe when you need to combine the identifier with per-topic v5 subscription options (No Local, Retain As Published, Retain Handling).
On the receiving side, read the identifier from the incoming PUBLISH:
client.publishHandler(msg -> {
MqttProperties.MqttProperty<?> p = msg.properties()
.getProperty(MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value());
if (p != null) {
int id = (Integer) p.value(); // 42
// route based on id
}
}); If the broker matches the PUBLISH against more than one subscription, the property is repeated — use properties().getProperties(SUBSCRIPTION_IDENTIFIER.value()) to obtain the full list.
Preconditions enforced by the client (an attempt to subscribe with a Subscription Identifier that violates them fails the returned Future with MqttException):
-
the protocol version must be
5(MqttClientOptions.setVersion(5)) — otherwiseMQTT_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED; -
the broker must not have advertised
SUBSCRIPTION_IDENTIFIER_AVAILABLE=0in the CONNACK; the negotiated value is exposed bysubscriptionIdentifierAvailableand, when missing, defaults to "available" per spec.
Topic alias
Topic aliases (MQTT 5.0 §3.3.2.3.4) are managed automatically in both directions.
Server → client: when an incoming PUBLISH carries a Topic Alias property, the alias-to-topic mapping is cached and subsequent packets that reuse the same alias with an empty topic name are transparently re-expanded before being delivered to the publishHandler. The maximum number of aliases the broker may use is controlled by setTopicAliasMaximum; an alias value of 0 or one above the negotiated maximum results in a DISCONNECT with reason code TOPIC_ALIAS_INVALID.
Client → server: when the broker advertises a non-zero Topic Alias Maximum in its CONNACK, the client transparently assigns aliases to outgoing PUBLISH packets. The first PUBLISH for a given topic carries the full topic name plus a newly allocated Topic Alias property; subsequent PUBLISH packets on the same topic are sent with an empty topic name and just the alias. If the alias pool advertised by the broker is exhausted, further topics are published with their full name and no alias. This is fully internal — applications keep calling publish with the real topic name and pay no attention to aliases.
Request / Response interaction
MQTT 5.0 §4.10 standardises a request/response pattern on top of regular PUBLISH packets, using three new properties: Response Topic, Correlation Data and the optional Response Information hint the broker may advertise to help the client pick a topic prefix. The pattern is fully supported in this client.
Step 1 — (optional) ask the broker for Response Information
If you want the broker to suggest a topic prefix to use as the Response Topic (typically a unique per-client path that is already covered by the broker’s ACL for that client), set Request Response Information = true on the CONNECT:
MqttClientOptions options = new MqttClientOptions();
options.setVersion(io.netty.handler.codec.mqtt.MqttVersion.MQTT_5.protocolLevel());
options.setRequestResponseInformation(true);
MqttClient client = MqttClient.create(vertx, options);
client.connect(1883, "broker.example.com")
.onSuccess(connack -> {
String prefix = connack.responseInformation(); // null if the broker didn't return any
// e.g. "$share/responses/{clientId}/"
}); See setRequestResponseInformation and responseInformation. The broker is free to ignore the request — treat null as "no hint, pick your own response topic".
Step 2 — requester: PUBLISH with Response Topic + Correlation Data
The requester subscribes to whatever topic it will use to receive replies, then publishes the request with the two properties set:
// 1. subscribe to the reply channel
String replyTopic = "clients/" + client.clientId() + "/replies";
client.subscribe(replyTopic, 1);
// 2. publish the request
byte[] correlationId = java.util.UUID.randomUUID().toString().getBytes();
MqttProperties reqProps = new MqttProperties();
reqProps.add(new MqttProperties.StringProperty(
MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value(), replyTopic));
reqProps.add(new MqttProperties.BinaryProperty(
MqttProperties.MqttPropertyType.CORRELATION_DATA.value(), correlationId));
client.publish("requests/temperature",
Buffer.buffer("{\"room\":\"kitchen\"}"),
MqttQoS.AT_LEAST_ONCE, false, false, reqProps); Correlation Data is opaque to MQTT — its only purpose is to let the requester pair a reply back to the originating request when several requests are outstanding at the same time.
Step 3 — responder: read Response Topic + Correlation Data, publish back
On the responder side the two properties arrive on the incoming PUBLISH:
client.publishHandler(msg -> {
if (!"requests/temperature".equals(msg.topicName())) return;
String respTopic = ((MqttProperties.StringProperty) msg.properties()
.getProperty(MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value())).value();
byte[] corr = ((MqttProperties.BinaryProperty) msg.properties()
.getProperty(MqttProperties.MqttPropertyType.CORRELATION_DATA.value())).value();
MqttProperties respProps = new MqttProperties();
respProps.add(new MqttProperties.BinaryProperty(
MqttProperties.MqttPropertyType.CORRELATION_DATA.value(), corr));
client.publish(respTopic,
Buffer.buffer("{\"temp\":21.5}"),
MqttQoS.AT_LEAST_ONCE, false, false, respProps);
}); A responder that cannot or will not honour the request (missing Response Topic, unsupported payload, etc.) simply does not publish a reply — there is no protocol-level negative ack defined for this pattern.
Step 4 — requester: correlate the reply
Back on the requester, the same publishHandler callback receives the reply on replyTopic. Match the Correlation Data against the value used in the request to route the response to the right caller (e.g. complete a Promise<Buffer> kept in a Map<ByteBuffer, Promise<Buffer>>).
Last Will and request/response
A will message can also act as a "I’m gone" reply: MqttClientWillOptions exposes setResponseTopic(…) and setCorrelationData(…) so the broker, on abnormal disconnect, will publish a will that any pending requester can correlate.
Server-initiated DISCONNECT
A handler can be registered to be notified when the server sends a DISCONNECT packet (rather than the client closing the connection itself):
client.disconnectMessageHandler(msg -> {
System.out.println("server disconnected, reason=" + msg.code() + " props=" + msg.properties());
}); See disconnectMessageHandler. The handler fires before closeHandler and only for server-initiated disconnects. The client can also actively send a DISCONNECT with reason code and properties via disconnect.
Enhanced Authentication (AUTH)
MQTT 5.0 §4.12 introduces an Enhanced Authentication flow built on a new AUTH control packet, designed to support multi-round challenge/response schemes (e.g. SCRAM, Kerberos, mutual proofs) that cannot fit in the single CONNECT exchange used by 3.1.1. The client side fully supports this flow, both during the initial connection (§4.12.1) and for re-authentication on an already established connection (§4.12.2).
Configuring the CONNECT
The first chunk of authentication data, plus the name of the mechanism, are placed on the CONNECT packet via MqttClientOptions:
-
setAuthenticationMethod— UTF-8 name of the mechanism (MQTT 5.0 §3.1.2.11.9). When set, the broker is expected to drive the rest of the exchange with AUTH packets and must fail the connection withBad authentication methodif it does not support it. -
setAuthenticationData— opaque binary payload required by the chosen mechanism (MQTT 5.0 §3.1.2.11.10).
The values the broker returns in the CONNACK can be inspected on the resulting MqttConnAckMessage via authenticationMethod() and authenticationData() — for mechanisms such as SCRAM, the final server signature (v=…) is typically delivered there.
Handling AUTH packets from the broker
Register a handler to be notified when the broker sends an AUTH packet — both during the initial handshake (before CONNACK, while the client is still in CONNECTING state) and during re-authentication on a live connection:
client.authenticationExchangeHandler(msg -> {
// msg.reasonCode() — CONTINUE_AUTHENTICATION / SUCCESS / RE_AUTHENTICATE
// msg.authenticationMethod() — echoed by the broker, must match the negotiated method
// msg.authenticationData() — opaque payload for the mechanism (e.g. SCRAM server-first-message)
}); Sending AUTH packets
The client replies to (or initiates) an AUTH exchange through authenticationExchange. It is valid in two phases:
-
Initial enhanced authentication (MQTT 5.0 §4.12.1) — call it from inside the
authenticationExchangeHandlerwhileclient.connect(…)is still in flight. Reply withCONTINUE_AUTHENTICATION(0x18) until the broker terminates the exchange with a CONNACK. The returnedFuturesucceeds as soon as the AUTH frame is on the wire; the overall outcome is the one delivered byclient.connect(…). -
Re-authentication (MQTT 5.0 §4.12.2) — once the connection is established, call it with
RE_AUTHENTICATE(0x19) to restart the authentication exchange without tearing down the underlying TCP connection.
The reason code is supplied via MqttAuthenticateReasonCode (SUCCESS, CONTINUE_AUTHENTICATION, RE_AUTHENTICATE); the MqttProperties parameter must include the AUTHENTICATION_METHOD (echoed unchanged for the whole exchange) and, when required by the mechanism, the AUTHENTICATION_DATA payload.
Example: SCRAM-SHA-256
The following snippet shows how to drive a full SCRAM-SHA-256 handshake (RFC 5802) against an MQTT 5.0 broker that advertises SCRAM-SHA-256 (for example EMQX with the scram authenticator). The crypto primitives come from the JDK — no extra dependency is required.
Step 1 — generate the client nonce, build client-first-message and place it on the CONNECT:
String clientNonce = randomBase64(24); // securely random, RFC 5802 §5.1
String clientFirstMessageBare = "n=" + username + ",r=" + clientNonce;
String clientFirstMessage = "n,," + clientFirstMessageBare; // "n,," is the GS2 header
MqttClientOptions options = new MqttClientOptions();
options.setVersion(io.netty.handler.codec.mqtt.MqttVersion.MQTT_5.protocolLevel());
options.setAuthenticationMethod("SCRAM-SHA-256");
options.setAuthenticationData(Buffer.buffer(clientFirstMessage.getBytes(StandardCharsets.UTF_8)));
MqttClient client = MqttClient.create(vertx, options); Step 2 — when the broker replies with server-first-message, compute the client proof and send client-final-message back through an AUTH packet:
client.authenticationExchangeHandler(msg -> {
// server-first-message: r=<server-nonce>,s=<base64-salt>,i=<iterations>
String serverFirst = new String(msg.authenticationData().getBytes(), StandardCharsets.UTF_8);
Map<String,String> f = parseScram(serverFirst);
String serverNonce = f.get("r");
byte[] salt = Base64.getDecoder().decode(f.get("s"));
int iterations = Integer.parseInt(f.get("i"));
// RFC 5802 §5.1: server MUST extend the client nonce — fail closed otherwise.
if (!serverNonce.startsWith(clientNonce)) throw new SecurityException("bad nonce");
String clientFinalNoProof = "c=biws,r=" + serverNonce; // biws = base64("n,,")
String authMessage = clientFirstMessageBare + "," + serverFirst + "," + clientFinalNoProof;
byte[] saltedPwd = pbkdf2(password, salt, iterations); // PBKDF2-HMAC-SHA-256
byte[] clientKey = hmacSha256(saltedPwd, "Client Key".getBytes());
byte[] storedKey = sha256(clientKey);
byte[] clientSig = hmacSha256(storedKey, authMessage.getBytes());
byte[] clientProof = xor(clientKey, clientSig);
String clientFinal = clientFinalNoProof
+ ",p=" + Base64.getEncoder().encodeToString(clientProof);
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.StringProperty(
MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value(), "SCRAM-SHA-256"));
props.add(new MqttProperties.BinaryProperty(
MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value(),
clientFinal.getBytes(StandardCharsets.UTF_8)));
client.authenticationExchange(MqttAuthenticateReasonCode.CONTINUE_AUTHENTICATION, props);
});
client.connect(1883, "broker.example.com").onSuccess(connack -> {
// authenticated; connack.authenticationData() may carry the SCRAM server-final-message (v=...)
}); A complete, runnable version including the SCRAM helper methods (pbkdf2, hmacSha256, sha256, xor, nonce generation and SASL-name escaping) is available in public class VertxMqttClientAUTHExamples {
private static final String AUTH_METHOD = "SCRAM-SHA-256"; private static final String GS2_HEADER = "n,,";
public static void main(String[] args) { final String host = args.length > 0 ? args[0] : "localhost"; final int port = args.length > 1 ? Integer.parseInt(args[1]) : 1883; final String username = args.length > 2 ? args[2] : "user"; final String password = args.length > 3 ? args[3] : "public";
final String clientNonce = generateNonce(); final String clientFirstMessageBare = "n=" + saslName(username) + ",r=" + clientNonce; final byte[] clientFirstMessage = (GS2_HEADER + clientFirstMessageBare).getBytes(StandardCharsets.UTF_8);
MqttClientOptions option = new MqttClientOptions(); option.setVersion(5); option.setAuthenticationMethod(AUTH_METHOD); option.setAuthenticationData(Buffer.buffer(clientFirstMessage));
Vertx vertx = Vertx.vertx(); MqttClient client = MqttClient.create(vertx, option);
client.authenticationExchangeHandler(msg -> {
try {
String serverFirstMessage = new String(msg.authenticationData().getBytes(), StandardCharsets.UTF_8);
Map<String, String> fields = parseScramMessage(serverFirstMessage);
String serverNonce = fields.get("r");
byte[] salt = Base64.getDecoder().decode(fields.get("s"));
int iterations = Integer.parseInt(fields.get("i")); // RFC 5802 §5.1: server MUST extend the client nonce — fail closed otherwise.
if (serverNonce == null || !serverNonce.startsWith(clientNonce)) {
throw new SecurityException("Invalid server nonce");
} String channelBinding = Base64.getEncoder().encodeToString(GS2_HEADER.getBytes(StandardCharsets.UTF_8)); String clientFinalNoProof = "c=" + channelBinding + ",r=" + serverNonce; String authMessage = clientFirstMessageBare + "," + serverFirstMessage + "," + clientFinalNoProof;
byte[] saltedPassword = pbkdf2(password, salt, iterations);
byte[] clientKey = hmacSha256(saltedPassword, "Client Key".getBytes(StandardCharsets.UTF_8));
byte[] storedKey = MessageDigest.getInstance("SHA-256").digest(clientKey);
byte[] clientSignature = hmacSha256(storedKey, authMessage.getBytes(StandardCharsets.UTF_8));
byte[] clientProof = xor(clientKey, clientSignature); String clientFinalMessage = clientFinalNoProof + ",p=" + Base64.getEncoder().encodeToString(clientProof);
MqttProperties props = new MqttProperties(); props.add(new MqttProperties.StringProperty(AUTHENTICATION_METHOD.value(), AUTH_METHOD)); props.add(new MqttProperties.BinaryProperty(AUTHENTICATION_DATA.value(), clientFinalMessage.getBytes(StandardCharsets.UTF_8)));
client.authenticationExchange(MqttAuthenticateReasonCode.CONTINUE_AUTHENTICATION, props);
} catch (Exception e) {
e.printStackTrace();
}
}); client.connect(port, host).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Connected & authenticated as '" + username + "'");
client.publish("temperature", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false)
.onComplete(p -> client.disconnect().onComplete(d -> vertx.close()));
} else {
System.err.println("Connect failed: " + ar.cause());
vertx.close();
}
});
} private static String generateNonce() {
byte[] bytes = new byte[24];
new SecureRandom().nextBytes(bytes);
// RFC 5802 §5.1: nonce is "printable" — strip base64 chars disallowed inside SCRAM attrs.
return Base64.getEncoder().encodeToString(bytes).replace("=", "").replace(",", "");
} private static String saslName(String name) {
return name.replace("=", "=3D").replace(",", "=2C");
} private static Map<String, String> parseScramMessage(String msg) {
Map<String, String> out = new HashMap<>();
for (String token : msg.split(",")) {
int eq = token.indexOf('=');
if (eq > 0) {
out.put(token.substring(0, eq), token.substring(eq + 1));
}
}
return out;
} private static byte[] pbkdf2(String password, byte[] salt, int iterations) throws Exception {
SecretKeyFactory skf = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
PBEKeySpec spec = new PBEKeySpec(password.toCharArray(), salt, iterations, 256);
try {
return skf.generateSecret(spec).getEncoded();
} finally {
spec.clearPassword();
}
} private static byte[] hmacSha256(byte[] key, byte[] data) throws Exception {
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(key, "HmacSHA256"));
return mac.doFinal(data);
} private static byte[] xor(byte[] a, byte[] b) {
byte[] out = new byte[a.length];
for (int i = 0; i < a.length; i++) {
out[i] = (byte) (a[i] ^ b[i]);
}
return out;
}
}. Server-side AUTH
On the server side AUTH packets are accepted at the wire level (so a broker can be probed without dropping the TCP connection) but MqttEndpointImpl does not currently surface them to user code: {@code handleAuth} is a stub. Implementing an enhanced-authentication endpoint therefore requires extending the endpoint API; the client-side types are already in place so a future server port will not change this section’s API surface.
Automatic server redirect
When the broker returns a CONNACK or DISCONNECT with a Server Reference property (MQTT 5.0 §3.2.2.3.18 / §3.14.2.3.4), the client can transparently reconnect to one of the servers listed there. This behavior is enabled by default and can be toggled with setAutoServerRedirect. When several references are present in the comma-separated list, one is picked at random.