Vert.x-redis

Vert.x-redis is redis client to be used with Vert.x.

This module allows data to be saved, retrieved, searched for, and deleted in a Redis. Redis is an open source, advanced key-value store. It is often referred to as a data structure server since keys can contain strings, hashes, lists, sets and sorted sets. To use this module you must have a Redis server instance running on your network.

Redis has a rich API and it can be organized in the following groups:

  • Cluster - Commands related to cluster management, note that using most of these commands you will need a redis server with version >=3.0.0

  • Connection - Commands that allow you to switch DBs, connect, disconnect and authenticate to a server.

  • Hashes - Commands that allow operations on hashes.

  • HyperLogLog - Commands to approximating the number of distinct elements in a multiset, a HyperLogLog.

  • Keys - Commands to work with Keys.

  • List - Commands to work with Lists.

  • Pub/Sub - Commands to create queues and pub/sub clients.

  • Scripting - Commands to run Lua Scripts in redis.

  • Server - Commands to manage and get server configurations.

  • Sets - Commands to work with un ordered sets.

  • Sorted Sets - Commands to work with sorted sets.

  • Strings - Commands to work with Strings.

  • Transactions - Commands to handle transaction lifecycle.

  • Streams - Commands to handle streaming.

Using Vert.x-Redis

To use the Vert.x Redis client, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-redis-client</artifactId>
<version>4.2.1</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-redis-client:4.2.1'

Connecting to Redis

The Redis client can operate in 4 distinct modes:

  • Simple client (probably what most users need).

  • Sentinel (when working with Redis in High Availability mode).

  • Cluster (when working with Redis in Clustered mode).

  • Replication (single shard, one node write, multiple read).

The connection mode is selected by the factory method on the Redis interface. Regardless of the mode the client can be configured using a RedisOptions data object. By default, some configuration values are initialized with the following values:

  • netClientOptions: default is TcpKeepAlive: true, TcpNoDelay: true

  • endpoint: default is redis://localhost:6379

  • masterName: default is mymaster

  • role default is MASTER

  • useReplicas default is NEVER

In order to obtain a connection use the following code:

Redis.createClient(vertx)
  .connect()
  .onSuccess(conn -> {
    // use the connection
  });

In the configuration contains a password and/or a select database, these 2 commands will be executed automatically once a successful connection is established to the server.

Redis.createClient(
  vertx,
  // The client handles REDIS URLs. The select database as per spec is the
  // numerical path of the URL and the password is the password field of
  // the URL authority
  "redis://:[email protected]:6379/1")
  .connect()
  .onSuccess(conn -> {
    // use the connection
  });

Connection String

The client will recognize addresses that follow the expression:

redis://[:[email protected]]host[:port][/db-number]

Or

unix://[:[email protected]]/domain/docker.sock[?select=db-number]

When specifying a password or a database those commands are always executed on connection start.

Running commands

Given that the redis client is connected to the server, all commands are now possible to execute using this module. The module offers a clean API for executing commands without the need to hand write the command itself, for example if one wants to get a value of a key it can be done as:

RedisAPI redis = RedisAPI.api(client);

redis
  .get("mykey")
  .onSuccess(value -> {
    // do something...
  });

The response object is a generic type that allow converting from the basic redis types to your language types. For example, if your response is of type INTEGER then you can get the value as any numeric primitive type int, long, etc…​

Or you can perform more complex tasks such as handling responses as iterators:

if (response.type() == ResponseType.MULTI) {
  for (Response item : response) {
    // do something with item...
  }
}

High Availability mode

To work with high availability mode the connection creation is quite similar:

Redis.createClient(
  vertx,
  new RedisOptions()
    .setType(RedisClientType.SENTINEL)
    .addConnectionString("redis://127.0.0.1:5000")
    .addConnectionString("redis://127.0.0.1:5001")
    .addConnectionString("redis://127.0.0.1:5002")
    .setMasterName("sentinel7000")
    .setRole(RedisRole.MASTER))
  .connect()
  .onSuccess(conn -> {
    conn.send(Request.cmd(Command.INFO))
      .onSuccess(info -> {
        // do something...
      });
  });

What is important to notice is that in this mode, an extra connection is established to the server(s) and behind the scenes the client will listen for events from the sentinel. When the sentinel notifies that we switched masters, then an exception is send to the client and you can decide what to do next.

Cluster mode

To work with cluster the connection creation is quite similar:

final RedisOptions options = new RedisOptions()
  .addConnectionString("redis://127.0.0.1:7000")
  .addConnectionString("redis://127.0.0.1:7001")
  .addConnectionString("redis://127.0.0.1:7002")
  .addConnectionString("redis://127.0.0.1:7003")
  .addConnectionString("redis://127.0.0.1:7004")
  .addConnectionString("redis://127.0.0.1:7005");

In this case the configuration requires one of more members of the cluster to be known. This list will be used to ask the cluster for the current configuration, which means if any of the listed members is not available it will be skipped.

In cluster mode a connection is established to each node and special care is needed when executing commands. It is recommended to read redis manual in order to understand how clustering works. The client operating in this mode will do a best effort to identify which slot is used by the executed command in order to execute it on the right node. There could be cases where this isn’t possible to identify and in that case as a best effort the command will be run on a random node.

Replication Mode

Working with replication is transparent to the client. Acquiring a connection is an expensive operation. The client will loop the provided endpoints until the master node is found. Once the master node is identified (this is the node where all write commands will be executed) a best effort is done to connect to all replica nodes (the read nodes).

With all node knowledge the client will now filter operations that perform read or writes to the right node type. Note that the useReplica configuration affects this choice. Just like with clustering, when the configuration states that the use of replica nodes is ALWAYS then any read operation will be performed on a replica node, SHARED will randomly share the read between master and replicas and finally NEVER means that replicas are never to be used.

The recommended usage of this mode, given the connection acquisition cost, is to re-use the connection as long as the application may need it.

Redis.createClient(
  vertx,
  new RedisOptions()
    .setType(RedisClientType.REPLICATION)
    .addConnectionString("redis://localhost:7000")
    .setMaxPoolSize(4)
    .setMaxPoolWaiting(16))
  .connect()
  .onSuccess(conn -> {
    // this is a replication client.
    // write operations will end up in the master node
    conn.send(Request.cmd(Command.SET).arg("key").arg("value"));
    // and read operations will end up in the replica nodes if available
    conn.send(Request.cmd(Command.GET).arg("key"));
  });

Pub/Sub mode

Redis supports queues and pub/sub mode, when operated in this mode once a connection invokes a subscriber mode then it cannot be used for running other commands than the command to leave that mode.

To start a subscriber one would do:

Redis.createClient(vertx, new RedisOptions())
  .connect()
  .onSuccess(conn -> {
    conn.handler(message -> {
      // do whatever you need to do with your message
    });
  });

And from another place in the code publish messages to the queue:

redis.send(Request.cmd(Command.PUBLISH).arg("channel1").arg("Hello World!"))
  .onSuccess(res -> {
    // published!
  });
Note
It is important to remember that the commands SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE and PUNSUBSCRIBE are void. This means that the result in case of success is null not a instance of response. All messages are then routed through the handler on the client.

Domain Sockets

Most of the examples shown connecting to a TCP sockets, however it is also possible to use Redis connecting to a UNIX domain docket:

Redis.createClient(vertx, "unix:///tmp/redis.sock")
  .connect()
  .onSuccess(conn -> {
    // so something...
  });

Be aware that HA and cluster modes report server addresses always on TCP addresses not domain sockets. So the combination is not possible. Not because of this client but how Redis works.

Connection Pooling

All client variations are backed by a connection pool. By default the configuration sets the pool size to 1, which means that it operates just like a single connection. There are 4 tunnables for the pool:

  • maxPoolSize the max number of connections on the pool (default 6)

  • maxPoolWaiting the max waiting handlers to get a connection on a queue (default 24)

  • poolCleanerInterval the interval when connections will be clean default is -1 (disabled)

  • poolRecycleTimeout the timeout to keep an open connection on the pool waiting and then close (default 15_000)

Pooling is quite useful to avoid custom connection management, for example you can just use as:

Redis.createClient(vertx, "redis://localhost:7006")
  .send(Request.cmd(Command.PING))
  .onSuccess(res -> {
    // Should have received a pong...
  });

It is important to observe that no connection was acquired or returned, it’s all handled by the pool. However there might be some scalability issues when more than 1 concurrent request attempts to get a connection from the pool, in order to overcome this we need to tune the pool. A common configuration is to set the maximum size of the pool to the number of available CPU cores and allow requests to get a connection from the pool to queue:

Redis.createClient(
  vertx,
  new RedisOptions()
    .setConnectionString("redis://localhost:7006")
    // allow at max 8 connections to redis
    .setMaxPoolSize(8)
    // allow 32 connection requests to queue waiting
    // for a connection to be available.
    .setMaxWaitingHandlers(32))
  .send(Request.cmd(Command.PING))
  .onSuccess(res -> {
    // Should have received a pong...
  });
Note
Pooling is not compatible with SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE or PUNSUBSCRIBE because these commands will modify the way the connection operates and the connection cannot be reused.

Implementing Reconnect on Error

While the connection pool is quite useful, for performance, a connection should not be auto managed but controlled by you. In this case you will need to handle connection recovery, error handling and reconnect.

A typical scenario is that a user will want to reconnect to the server whenever an error occurs. The automatic reconnect is not part of the redis client as it will force a behaviour that might not match the user expectations, for example:

  1. What should happen to current in-flight requests?

  2. Should the exception handler be invoked or not?

  3. What if the retry will also fail?

  4. Should the previous state (db, authentication, subscriptions) be restored?

  5. Etc…​

In order to give the user full flexibility, this decision should not be performed by the client. However a simple reconnect with backoff timeout could be implemented as follows:

class RedisVerticle extends AbstractVerticle {

  private static final int MAX_RECONNECT_RETRIES = 16;

  private final RedisOptions options = new RedisOptions();
  private RedisConnection client;

  @Override
  public void start() {
    createRedisClient()
      .onSuccess(conn -> {
        // connected to redis!
      });
  }

  /**
   * Will create a redis client and setup a reconnect handler when there is
   * an exception in the connection.
   */
  private Future<RedisConnection> createRedisClient() {
    Promise<RedisConnection> promise = Promise.promise();

    Redis.createClient(vertx, options)
      .connect()
      .onSuccess(conn -> {
        // make sure the client is reconnected on error
        conn.exceptionHandler(e -> {
          // attempt to reconnect,
          // if there is an unrecoverable error
          attemptReconnect(0);
        });
        // allow further processing
        promise.complete(conn);
      })
      .onFailure(t -> {
        promise.fail(t);
      });

    return promise.future();
  }

  /**
   * Attempt to reconnect up to MAX_RECONNECT_RETRIES
   */
  private void attemptReconnect(int retry) {
    if (retry > MAX_RECONNECT_RETRIES) {
      // we should stop now, as there's nothing we can do.
    } else {
      // retry with backoff up to 10240 ms
      long backoff = (long) (Math.pow(2, Math.min(retry, 10)) * 10);

      vertx.setTimer(backoff, timer -> {
        createRedisClient()
          .onFailure(t -> attemptReconnect(retry + 1));
      });
    }
  }
}

In this example the client object will be replaced on reconnect and the application will retry up to 16 times with a backoff up to 1280ms. By discarding the client we ensure that all old inflight responses are lost and all new ones will be on the new connection.

It is important to note that, the reconnect will create a new connection object, so these object references should not be cached and evaluated every time.

Protocol Parser

This client supports both RESP2 and RESP3 protocols, at the connection handshake time the client will automatically detect which version is supported by the server and use it.

The parser internally creates an "infinite" readable buffer from all the chunks received from the server, in order to avoid creating too much garbage in terms of memory collection, a tunnable watermark value is configurable at JVM startup time. The system property io.vertx.redis.parser.watermark defines how much data is keept in this readable buffer before it gets discarded. By default this value is 512Kb. This means that each connection to the server will use at least this amount of memory. As the client works in pipeline mode, keeping the number of connections low provides best results, which means 512Kb * nconn memory will be used. If the application will require a large number of connections, then reducing the watermark value to a smaller value or even disable it entirely is advisable.