Hazelcast Cluster Manager

This is a cluster manager implementation for Vert.x that uses Hazelcast.

It is the default cluster manager used in the Vert.x CLI, but it can be replaced with another implementation as Vert.x cluster managers are pluggable.

This implementation is packaged inside:

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-hazelcast</artifactId>
 <version>4.5.7</version>
</dependency>

In Vert.x a cluster manager is used for various functions including:

  • Discovery and group membership of Vert.x nodes in a cluster

  • Maintaining cluster wide topic subscriber lists (so we know which nodes are interested in which event bus addresses)

  • Distributed Map support

  • Distributed Locks

  • Distributed Counters

Cluster managers do not handle the event bus inter-node transport, this is done directly by Vert.x with TCP connections.

Using this cluster manager

If you are using Vert.x from the command line, the jar corresponding to this cluster manager (it will be named vertx-hazelcast-4.5.7.jar should be in the lib directory of the Vert.x installation.

If you want clustering with this cluster manager in your Vert.x Maven or Gradle project then just add a dependency to the artifact: io.vertx:vertx-hazelcast:4.5.7 in your project.

If the jar is on your classpath as above then Vert.x will automatically detect this and use it as the cluster manager. Please make sure you don’t have any other cluster managers on your classpath or Vert.x might choose the wrong one.

You can also specify the cluster manager programmatically if you are embedding Vert.x by specifying it on the options when you are creating your Vert.x instance, for example:

ClusterManager mgr = new HazelcastClusterManager();

Vertx
  .builder()
  .withClusterManager(mgr)
  .buildClustered()
  .onComplete(res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
  } else {
    // failed!
  }
});

Configuring this cluster manager

Configuring with XML

Usually the cluster manager is configured by a file default-cluster.xml which is packaged inside the jar.

If you want to override this configuration you can provide a file called cluster.xml on your classpath and this will be used instead.If you want to embed the cluster.xml file in a fat jar, it must be located at the root of the fat jar.If it’s an external file, the directory containing the file must be added to the classpath.For example, if you are using the launcher class from Vert.x, the classpath enhancement can be done as follows:

# If the cluster.xml is in the current directory:
java -jar ... -cp . -cluster
vertx run MyVerticle -cp . -cluster

# If the cluster.xml is in the conf directory
java -jar ... -cp conf -cluster

Another way to override the configuration is by providing the system property vertx.hazelcast.config with a location:

# Use a cluster configuration located in an external file
java -Dvertx.hazelcast.config=./config/my-cluster-config.xml -jar ... -cluster

# Or use a custom configuration from the classpath
java -Dvertx.hazelcast.config=classpath:my/package/config/my-cluster-config.xml -jar ... -cluster

The vertx.hazelcast.config system property, when present, overrides any cluster.xml on the classpath, but if loading from this system property fails, then loading falls back to either cluster.xml or the Hazelcast default configuration.

Caution
Configuration of Hazelcast the -Dhazelcast.config system property is not supported by Vert.x and should not be used.

The xml file is a Hazelcast configuration file and is described in detail in the documentation on the Hazelcast web-site.

Configuring programmatically

You can specify configuration programmatically if embedding:

Config hazelcastConfig = new Config();

// Now set some stuff on the config (omitted)

ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);

Vertx
  .builder()
  .withClusterManager(mgr)
  .buildClustered()
  .onComplete(res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
  } else {
    // failed!
  }
});

It might also be useful to customize an existing XML configuration. For example, you might want to change the cluster name:

Config hazelcastConfig = ConfigUtil.loadConfig();

hazelcastConfig.setClusterName("my-cluster-name");

ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);

Vertx
  .builder()
  .withClusterManager(mgr)
  .buildClustered()
  .onComplete(res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
  } else {
    // failed!
  }
});

ConfigUtil#loadConfig loads Hazelcast config XML and transform it into a Config object. The content is read from:

  1. the location denoted by the vertx.hazelcast.config sysprop, if present, or

  2. the cluster.xml file on the classpath, if present, or

  3. the default config file

Discovery options

Hazelcast supports several different discovery options. The default configuration uses multicast so you must have multicast enabled on your network for this to work.

For full documentation on how to configure discovery differently please consult the Hazelcast documentation.

Changing local and public address with system properties

Sometimes, cluster nodes must bind to an address that is not reachable by other members. For example, this may happen when nodes are not in the same network area, or on certain clouds with specific firewall configurations.

The bind address and public address (the address advertised to other members) can be set with system properties:

-Dhazelcast.local.localAddress=172.16.5.131 -Dhazelcast.local.publicAddress=104.198.78.81

Using an existing Hazelcast cluster

You can pass an existing HazelcastInstance in the cluster manager to reuse an existing cluster:

ClusterManager mgr = new HazelcastClusterManager(hazelcastInstance);
Vertx
  .builder()
  .withClusterManager(mgr)
  .buildClustered()
  .onComplete(res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
  } else {
    // failed!
  }
});

In this case, Vert.x is not the cluster owner and so do not shutdown the cluster on close.

Notice that the custom Hazelcast instance need to be configured with:

<member-attributes>
 <attribute name="__vertx.nodeId">unique-identifier</attribute>
</member-attributes>

<multimap name="__vertx.subs">
 <backup-count>1</backup-count>
 <value-collection-type>SET</value-collection-type>
</multimap>

<map name="__vertx.haInfo">
 <backup-count>1</backup-count>
</map>

<map name="__vertx.nodeInfo">
 <backup-count>1</backup-count>
</map>

<cp-subsystem>
 <cp-member-count>0</cp-member-count>
 <semaphores>
   <semaphore>
     <name>__vertx.*</name>
     <jdk-compatible>false</jdk-compatible>
     <initial-permits>1</initial-permits>
   </semaphore>
 </semaphores>
</cp-subsystem>
Caution
The __vertx.nodeId is used by Vert.x as identifier of the node in the cluster. Make sure to configure unique values across members.
Important
Hazelcast clients or smart clients are not supported.
Important
Make sure Hazelcast is started before and shut down after Vert.x. Also, the Hazelcast shutdown hook should be disabled (see xml example, or via system property).

Changing timeout for failed nodes

By default, a node will be removed from the cluster if Hazelcast didn’t receive a heartbeat for 300 seconds. To change this value hazelcast.max.no.heartbeat.seconds system property such as in:

-Dhazelcast.max.no.heartbeat.seconds=5

Afterwards a node will be removed from the cluster after 5 seconds without a heartbeat.

Trouble shooting clustering

If the default multicast configuration is not working here are some common causes:

Multicast not enabled on the machine.

It is quite common in particular on OSX machines for multicast to be disabled by default. Please google for information on how to enable this.

Using wrong network interface

If you have more than one network interface on your machine (and this can also be the case if you are running VPN software on your machine), then Hazelcast may be using the wrong one.

To tell Hazelcast to use a specific interface you can provide the IP address of the interface in the interfaces element of the configuration. Make sure you set the enabled attribute to true. For example:

<interfaces enabled="true">
 <interface>192.168.1.20</interface>
</interfaces>

Using a VPN

This is a variation of the above case. VPN software often works by creating a virtual network interface which often doesn’t support multicast. If you have a VPN running and you do not specify the correct interface to use in both the hazelcast configuration and to Vert.x then the VPN interface may be chosen instead of the correct interface.

So, if you have a VPN running you may have to configure both the Hazelcast and Vert.x to use the correct interface as described in the previous section.

When multicast is not available

In some cases you may not be able to use multicast as it might not be available in your environment. In that case you should configure another transport, e.g. TCP to use TCP sockets, or AWS when running on Amazon EC2.

For more information on available Hazelcast transports and how to configure them please consult the Hazelcast documentation.

Enabling logging

When trouble-shooting clustering issues with Hazelcast it’s often useful to get some logging output from Hazelcast to see if it’s forming a cluster properly. You can do this (when using the default JUL logging) by adding a file called vertx-default-jul-logging.properties on your classpath. This is a standard java.util.logging (JUL) configuration file. Inside it set:

com.hazelcast.level=INFO

and also

java.util.logging.ConsoleHandler.level=INFO
java.util.logging.FileHandler.level=INFO

Hazelcast logging

The logging backend used by Hazelcast is jdk by default (understand JUL). If you want to redirect the logging to another library, you need to set the hazelcast.logging.type system property such as in:

-Dhazelcast.logging.type=slf4j

See the hazelcast documentation for more details.

Using a different Hazelcast version

You may want to use a different version of Hazelcast. The default version is 4.2.8. To do so, you need to:

  • put the version you want in the application classpath

  • if you are running a fat jar, configure your build manager to use the right version

In this later case, you would need in Maven:

<dependency>
 <groupId>com.hazelcast</groupId>
 <artifactId>hazelcast</artifactId>
 <version>ENTER_YOUR_VERSION_HERE</version>
</dependency>
<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-hazelcast</artifactId>
 <version>4.5.7</version>
</dependency>

Depending on the version, you may need to exclude some transitive dependencies.

On Gradle, you can achieve the same overloading using:

dependencies {
compile ("io.vertx:vertx-hazelcast:4.5.7"){
  exclude group: 'com.hazelcast', module: 'hazelcast'
}
compile "com.hazelcast:hazelcast:ENTER_YOUR_VERSION_HERE"
}

Configuring for Kubernetes

Caution

The beginning of this section is relevant only when using the default Hazelcast version, which is 4.2.8.

Since version 5.0, Hazelcast includes hazelcast-kubernetes and does not require an additional dependency. For details about running Hazelcast 5 on Kubernetes, check out the documentation.

On Kubernetes, Hazelcast should be configured to use the Hazelcast Kubernetes plugin.

First, add the io.vertx:vertx-hazelcast:${vertx.version} and com.hazelcast:hazelcast-kubernetes:${hazelcast-kubernetes.version} dependencies to your project. With Maven it looks like:

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-hazelcast</artifactId>
 <version>${vertx.version}</version>
</dependency>
<dependency>
 <groupId>com.hazelcast</groupId>
 <artifactId>hazelcast-kubernetes</artifactId>
 <version>${hazelcast-kubernetes.version}</version>
</dependency>
Note
If you use a different version of the Hazelcast core library, make sure to use a compatible version of the Kubernetes discovery plugin.

The second step is to configure the discovery plugin inside your Hazelcast configuration, by either providing a custom cluster.xml file or programmatically, as described in Configuring this cluster manager.

The plugin provides two discovery modes: Kubernetes API and DNS Lookup. Please refer to the plugin project page for pros and cons of both modes.

In this document, we will use DNS Lookup discovery. The following properties have to be changed / added:

<hazelcast>
 <properties>
   <property name="hazelcast.discovery.enabled">true</property> (1)
 </properties>

 <network>
   <join>
     <multicast enabled="false"/> (2)
     <tcp-ip enabled="false" />

     <discovery-strategies>
       <discovery-strategy enabled="true"> (3)
           class="com.hazelcast.kubernetes.HazelcastKubernetesDiscoveryStrategy">
         <properties>
           <property name="service-dns">MY-SERVICE-DNS-NAME</property> (4)
         </properties>
       </discovery-strategy>
     </discovery-strategies>
   </join>
 </network>
</hazelcast>
  1. Activate Discovery SPI

  2. Deactivate other discoveries

  3. Activate the Kubernetes plugin

  4. Service DNS, usually in the form of MY-SERVICE-NAME.MY-NAMESPACE.svc.cluster.local but depends on the Kubernetes distribution

The MY-SERVICE-DNS-NAME value must be a headless Kubernetes service name that will be used by Hazelcast to identify all cluster members. A headless service can be created with:

apiVersion: v1
kind: Service
metadata:
 namespace: MY-NAMESPACE
 name: MY-SERVICE-NAME
spec:
 selector:
   component: MY-SERVICE-NAME (1)
 clusterIP: None
 ports:
 - name: hz-port-name
   port: 5701
   protocol: TCP
  1. Cluster members selected by label

Eventually, attach the component label to all deployments that should be part of the cluster:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
 namespace: MY-NAMESPACE
spec:
 template:
   metadata:
     labels:
       component: MY-SERVICE-NAME

Further configuration details are available on the Hazelcast Kubernetes Plugin page.

Rolling updates

During rolling updates, it is recommended to replace pods one by one.

To do so, we must configure Kubernetes to:

  • never start more than one new pod at once

  • forbid more than one unavailable pod during the process

spec:
 strategy:
   type: Rolling
   rollingParams:
     updatePeriodSeconds: 10
     intervalSeconds: 20
     timeoutSeconds: 600
     maxUnavailable: 1 (1)
     maxSurge: 1 (2)
  1. the maximum number of pods that can be unavailable during the update process

  2. the maximum number of pods that can be created over the desired number of pods

Also, the pod readiness probe must take the cluster state into account. Please refer to the cluster administration section for details on how to implement a readiness probe with Vert.x Health Checks.

Cluster administration

The Hazelcast cluster manager works by turning Vert.x nodes into members of a Hazelcast cluster. As a consequence, Vert.x cluster manager administration should follow the Hazelcast management guidelines.

First, let’s take a step back and introduce data partitioning and split-brain syndrome.

Data partitioning

Each Vert.x node holds pieces of the clustering data: eventbus subscriptions, async map entries, clustered counters…​ etc.

When a member joins or leaves the cluster, Hazelcast migrates data partitions. In other words, it moves data around to accomodate the new cluster topology. This process may take some time, depending on the amount of clustered data and number of nodes.

Split-brain syndrome

In a perfect world, there would be no network equipment failures. Reality is, though, that sooner or later your cluster will be divided into smaller groups, unable to see each others.

Hazelcast is capable of merging the nodes back into a single cluster. But just as with data partition migrations, this process may take some time. Before the cluster is fully functional again, some eventbus consumers might not be able to get messages. Or high-availability may not be able to redeploy a failing verticle.

Note

It is difficult (if possible at all) to make a difference between a network partition and:

  • long GC pauses (leading to missed heartbeats checks),

  • many nodes being killed forcefully, at-once, because you are deploying a new version of your application

Recommendations

Considering the common clustering issues discussed above, it is recommended to stick to the following good practices.

Graceful shutdown

Avoid stopping members forcefully (e.g, kill -9 a node).

Of course process crashes are inevitable, but a graceful shutdown helps to get the remaining nodes in a stable state faster.

One node after the other

When rolling a new version of your app, scaling-up or down your cluster, add or remove nodes one after the other.

Stopping nodes one by one prevents the cluster from thinking a network partition occured. Adding them one by one allows for clean, incremental data partition migrations.

The cluster safety can be verified with Vert.x Health Checks:

Handler<Promise<Status>> procedure = ClusterHealthCheck.createProcedure(vertx);
HealthChecks checks = HealthChecks.create(vertx).register("cluster-health", procedure);

After creation, the health check can be exposed over HTTP with a Vert.x Web router handler:

Router router = Router.router(vertx);
router.get("/readiness").handler(HealthCheckHandler.createWithHealthChecks(checks));

Using Lite Members

To minimize the time a Vert.x cluster spends accomodating a new topology, you may use external data nodes and mark Vert.x nodes as Lite Members.

Lite Members participate in a Hazelcast cluster like regular members, but they do not own any data partition. Therefore, Hazelcast does not need to migrate partitions when such members are added or removed.

Important
You must start the external data nodes beforehand as Hazelcast won’t create a cluster with Lite Members only.

To start an external node, you can use the Hazelcast distribution start script, or proceed programmatically.

Vert.x nodes can be marked as Lite Members in the XML configuration:

<lite-member enabled="true"/>

You can also do it programmatically:

Config hazelcastConfig = ConfigUtil.loadConfig()
  .setLiteMember(true);

ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);

Vertx
  .builder()
  .withClusterManager(mgr)
  .buildClustered()
  .onComplete(res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
  } else {
    // failed!
  }
});