Advanced

Advanced Vert.x Guide

This guide document advanced/internal stuff about Vert.x (5.0).

It aims to explain and discuss the following

  • Vert.x design

  • Internal APIs

  • Integration with Netty

You want to read this guide when you want to

  • understand better Vert.x internals

  • integrate Vert.x with thirdparty libraries

  • perform networking with Netty and Vert.x

This is a live guide and you can contribute, just open a PR or an issue in the repo.

Some of the internal Vert.x APIs are exposed in this guide and you should keep in mind that these APIs are subject to be changed when it is needed.

Contexts in Vert.x

The io.vertx.core.Context interface is an essential component of Vert.x.

At a high level contexts can be thought of as controlling the execution of how events (or tasks created by handlers) are executed by the application.

Most events are dispatched through contexts, when the application consumes an event there is most likely a context associated with the dispatch of the event.

Verticle contexts

When an instance of a verticle is deployed, Vert.x creates and associates a context with this instance. You can access this context in your verticle using the context field of AbstractVerticle:

public class MyVerticle extends AbstractVerticle {
  public void start() {
    JsonObject config = context.config();
  }
}

When MyVerticle is deployed, Vert.x emits a start event, the start method is called by a thread of the Verticle context:

  • by default the context is always an event-loop context, the caller thread is an event-loop

  • when the verticle is deployed as a worker the caller thread is one of the worker pool of Vert.x

Ad hoc contexts

Using Vert.x APIs without using Verticle is supported since Vert.x 3 and leads to the interesting question of which context is used.

When a Vert.x API is called, Vert.x associates the current thread with an ad hoc event-loop context, Vertx#getOrCreateContext() creates a context the first time it is called for a non vertx thread and then returns this context on subsequent calls.

Consequently, callbacks on asynchronous Vert.x API happen on the same context:

public class Main {
  public static void main(String[] args) {
    WebClient client = WebClient.create(vertx);

    for (int i = 0;i < 4;i++) {
      client
        .get(8080, "myserver.mycompany.com", "/some-uri")
        .send()
        .onSuccess(ar -> {
         // All callbacks are on the same context
        });
    }
  }
}

This behavior differs from previous major versions, Vert.x 3 would create a different context for each HTTP request.

While Vert.x does encourage confining code in context, such behavior avoids potential data races.

Propagation of contexts

Most Vert.x APIs are aware of contexts.

Asynchronous operations executed within a context will call back the application with the same context.

Likewise, event handlers are also dispatched on the same context.

public class MyVerticle extends AbstractVerticle {
  public void start() {
    Future<HttpServer> future = vertx.createHttpServer()
      .requestHandler(request -> {
        // Executed in the verticle context
      })
      .listen(8080, "localhost");

    future.onComplete(ar -> {
       // Executed in the verticle context
    });
  }
}

Dealing with contexts

Most application don’t need tight interactions with a context but sometimes it can be useful to access them, e.g your application uses another library that performs a callback on its own thread and you want to execute code in the original context.

Above we have seen a verticle can access its context through the context field but that implies to use a verticle and to have a reference on the verticle which might not always be handy.

You can get the current context with getOrCreateContext():

Context context = vertx.getOrCreateContext();

You can also use the static method Vertx.currentContext():

Context context = Vertx.currentContext();

The later might return null if the current thread is not associated with a context, whereas the former will create one if needed and thus never returns null.

After you obtained a context, you can use it to run code in this context:

public void integrateWithExternalSystem(Handler<Event> handler) {
  // Capture the current context
  Context context = vertx.getOrCreateContext();

  // Run the event handler on the application context
  externalSystem.onEvent(event -> {
    context.runOnContext(v -> handler.handle(event));
  });
}

In practice, many Vert.x APIs and thirdparty libraries are implemented this way.

Event-loop context

An event loop context uses an event loop to run code: handlers are executed directly on the IO threads, as a consequence:

  • A handler will always be executed with the same thread

  • A handler must never block the thread, otherwise it will create starvation for all the IO tasks associated with that event loop.

This behavior allows for a greatly simplified threading model by guaranteeing that associated handlers will always be executed on the same thread, thus removing the need for synchronization and other locking mechanisms.

This is the type of context that is the default and most commonly used type of context. A verticle deployed without the worker flag will always be deployed with an event loop context.

Worker context

Worker contexts are assigned to verticles deployed with the worker option enabled. The worker context is differentiated from standard event loop contexts in that workers are executed on a separate worker thread pool.

This separation from event loop threads allows worker contexts to execute the types of blocking operations that will block the event loop: blocking such thread will not impact the application other than blocking one thread.

Just as is the case with the event loop context, worker contexts ensure that handlers are only executed on one thread at any given time. That is, handlers executed on a worker context will always be executed sequentially - one after the other - but different actions may be executed on different threads.

Context exception handler

An exception handler can be set on a context, to catch any unchecked exception thrown by task running on a context.

When no exception handler is set, the Vertx exception handler is called instead

context.exceptionHandler(throwable -> {
  // Any exception thrown by this context
});

vertx.exceptionHandler(throwable -> {
  // Any exception uncaught exception thrown on context
});

When no handler is set whatsoever, the exception is logged as an error with the message Unhandled exception

You can report an exception on a context with reportException

context.reportException(new Exception());

Firing events

runOnContext is the most common way to execute a piece of code on a context. Although it is very suited for integrating external libraries with Vert.x, it is not always the best fit to integrate code executing at the event-loop level (such as Netty events) with application code.

There are internal methods that achieve similar behaviors depending on the situation

  • ContextInternal#dispatch(E, Handler<E>)

  • ContextInternal#execute(E, Handler<E>)

  • ContextInternal#emit(E, Handler<E>)

Dispatch

dispatch assumes the caller thread is the context thread, it associates the current thread of execution with the context:

assertNull(Vertx.currentContext());
context.dispatch(event, evt -> {
  assertSame(context, Vertx.currentContext());
});

The handler is also monitored by the blocked thread checker.

Finally, any exception thrown by the handler is reported to the context:

context.exceptionHandler(err -> {
  // Should receive the exception thrown below
});
context.dispatch(event, evt -> {
  throw new RuntimeException();
});

Execute

execute executes a task on the context, when the caller thread is already a context thread, the task is executed directly, otherwise a task is scheduled for execution.

no context associated is done

Emit

emit is a combination of execute and dispatch

default void emit(E event, Handler<E> eventHandler) {
  execute(v -> dispatch(argument, task));
}

emit can be used from any thread to fire an event to a handler:

  • from any thread, it behaves like runOnContext

  • from a context thread, it runs the event handler with the context thread local association, the block thread checker and reports failures on the context

In most situations, the emit method is the way to go to have an application process an event. The main purpose of dispatch and execute methods is to give more control to the code to achieve very specific things.

Context aware futures

Until Vert.x 4, Future were statically created object with no specific relationship to a context. Vert.x 4 provides a future based API which need to respect the same semantics as Vert.x 3: any callback on a future should predictably run on the same context.

Vert.x 4 API creates futures bound to the caller context that run callbacks on the context:

Promise<String> promise = context.promise();

Future<String> future = promise.future();

future.onSuccess(handler);

Any callback is emitted on the context that created the promise, the code above is pretty much like:

Promise<String> promise = Promise.promise();

Future<String> future = promise.future();

future.onSuccess(result -> context.emit(result, handler));

In addition, the API allows to create succeeded and failed futures:

Future<String> succeeded = context.succeededFuture("OK usa");
Future<String> failed = context.failedFuture("Oh sorry");

Contexts and tracing

Since Vert.x 4, Vert.x integrates with popular distributing tracing systems.

Tracing libraries usually rely on thread local storage to propagate tracing data, e.g a trace received when processing an HTTP request should be propagated throughout the HTTP client.

Vert.x integrates tracing in a similar fashion but relies on contexts instead of thread local. Contexts are indeed propagated by Vert.x APIs and therefore offers a reliable storage for implementing tracing.

Since all HTTP requests processed by a given server use the same context that created the HTTP server, the server context is duplicated for each HTTP request, to grant unicity to each HTTP request.

public class MyVerticle extends AbstractVerticle {
  public void start() {
    vertx.createHttpServer()
      .requestHandler(request -> {
        // Executed in a duplicate verticle context
      })
      .listen(8080, "localhost");
  }
}

Duplication shares most of the characteristics of the original context and provides a specific local storage.

vertx.createHttpServer()
  .requestHandler(request -> {
    JsonObject specificRequestData = getRequestData(request);
    Context context = vertx.getOrCreateContext();
    context.putLocal("my-stuff", specificRequestData);
    processRequest(request);
  })
  .listen(8080, "localhost");

Later the application can use it:

Context context = vertx.getOrCreateContext();
JsonObject specificRequestData = context.getLocal("my-stuff");

ContextInternal#duplicate() duplicates the current context, it can be used to scope an activity associated with a trace

public void startProcessing(Request request) {
  Context duplicate = context.duplicate();
  request.setContext(duplicate);
}

Close hooks

Close hooks is an internal feature of Vert.x useful for creating components that are notified when a Verticle or a Vertx instance is closed. It can be used for implementing automatic clean-up in verticles feature, like for a Vert.x HTTP server.

The contract for receving a close notification is defined by the io.vertx.core.Closeable interface and its close(Promise<Void> closePromise) method:

@Override
public void close(Promise<Void> completion) {
  // Do cleanup, the method will complete the future
   doClose(completion);
}

The method ContextInternal#addCloseHook registers a Closeable instance to be notified when the context closes:

context.addCloseHook(closeable);

A context created by a Verticle deployment calls the hook when the verticle instance is stopped.

Otherwise, the hook is called when the Vertx instance is closed.

Context#removeCloseHook unregisters the close hook and shall be used when the resource is closed before the close hook is called.

context.removeCloseHook(closeable);

Hooks are implemented with weak references to avoid leaks, nevertheless unregistering hooks should be done.

Adding a hook on a duplicate context, adds the hook to the original context.

Likewise VertxInternal expose also the same operations to receive notifications when a Vertx instance is closed.

Integrating Netty

Netty is one of the dependencies of Vert.x. In fact, Netty powers the networking services of Vert.x. Vert.x Core provides the basic network services one can expect from such library:

  • TCP

  • HTTP

  • UDP

  • DNS

These are built with various components from Netty. The Netty community has implemented a wide range of components and this chapter explains how to integrate such components in Vert.x.

In this chapter we will build a TIME prococol client and server. The Netty documentation provides client/server implementations of this simple protocol, we will focus on the integration of these components.

Netty integration points

The main purpose of this chapter is to explain some of the Vert.x’s internal interfaces. Such interfaces are extensions that exposes low level methods to interact with Netty that are useful for components that re-use Netty directly.

Most users don’t need to deal with this extension and thus such methods are isolated in an extension interface.

Bootstrapping clients

ContextInternal extends io.vertx.core.Context and exposes various Netty integration points like VertxInternal.

Usually contexts are obtained from the Vertx#getOrCreateContext() method that returns the current execution context or create a new one if necessary: when called in a Verticle, getOrCreateContext() returns the context of this Verticle, when used in a non Vert.x thread like a main or a unit test, it creates a new one and returns it.

Context context = vertx.getOrCreateContext();

// Cast to access extra methods
Internals contextInternal = (Internals) context;

Contexts are always associated with a Netty event loop and thus using this context ensures our components re-use the same event loop if one existed before or use a new one.

The ContextInternal#nettyEventLoop() method returns this particular event loop and we can use it on Bootstrap (for client) or ServerBoostrap (for server):

ContextInternal contextInt = (ContextInternal) context; (1)
EventLoop eventLoop = contextInt.nettyEventLoop();

Bootstrap bootstrap = new Bootstrap(); (2)
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(eventLoop);
1 get the event-loop associated with this context
2 create a bootstrap for the client

Bootstrapping servers

VertxInternal extends io.vertx.core.Vertx, among all VertxInternal#getAcceptorEventLoopGroup() returns an EventLoopGroup for accepting connections on a server, it’s typical usage is on a ServerBootstrap:

ContextInternal contextInt = (ContextInternal) context; (1)
EventLoop eventLoop = contextInt.nettyEventLoop();

VertxInternal vertxInt = contextInt.owner(); (2)
EventLoopGroup acceptorGroup = vertxInt.getAcceptorEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap(); (3)
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(acceptorGroup, eventLoop);
1 get the event-loop associated with this context
2 get the acceptor event-loop group of Vertx
3 create the boostrap for the server

Handling events

Now that we are more intimate with ContextInternal, let’s look at how we can use it to handle Netty events such as network events, channel life cycle, etc…​

The ContextInternal#emit methods is used to emit events to the application as it ensures:

  • the context concurrency: reuse the current event-loop thread or execute on a worker

  • the thread local association of the current context with the dispatch thread

  • any uncaught exception thrown is reported on the context, such exception is either logged or passed to the Context#exceptionHandler

Here is a short example showing a server bootstrap

Handler<Channel> bindHandler = ch -> {
};

bootstrap.childHandler(new ChannelInitializer<Channel>() {
  @Override
  protected void initChannel(Channel ch) {
    context.emit(ch, bindHandler);
  }
});

Promise<Void> bindPromise = context.promise();

bootstrap.bind(socketAddress).addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) throws Exception {
    if (future.isSuccess()) {
      // Signal application with bind success
      bindPromise.complete();
    } else {
      // Signal application with bind error
      bindPromise.fail(future.cause());
    }
  }
});

return bindPromise.future();

The typical usage of emit is to dispatch one or multiple events to the same handler, like an event handler.

When it comes to a future, the ContextInternal#promise method creates a promise that will behave with listeners like the emit method.

The server

The original server example can be found here.

The Vert.x TIME server exposes a simple API:

  • a static method to create a TimeServer

  • two methods: listen to bind a server and close to unbind

  • the requestHandler for setting a handler for handling requests

public interface TimeServer {

  /**
   * @return a new time server
   */
  static TimeServer create(Vertx vertx) {
    return new TimeServerImpl(vertx);
  }

  /**
   * Set the handler to be called when a time request happens. The handler should complete
   * the future with the time value.
   *
   * @param handler the handler to be called
   * @return this object
   */
  TimeServer requestHandler(Handler<Promise<Long>> handler);

  /**
   * Start and bind the time server.
   *
   * @param port the server port
   * @param host the server host
   * @return the future completed when the socket is bound
   */
  Future<Void> listen(int port, String host);

  /**
   * Close the time server.
   */
  void close();

}

A TIME server serving the current JVM time is then straighforward to implement:

Vertx vertx = Vertx.vertx();

// Create the time server
TimeServer server = TimeServer.create(vertx);
server.requestHandler(time -> {
  time.complete(System.currentTimeMillis());
});

// Start the server
server.listen(8037, "0.0.0.0")
    .onSuccess(v -> System.out.println("Server started"))
    .onFailure(err -> err.printStackTrace());

Let’s study now the server implementation.

The server bootstrap

First let’s have a look at the ServerBootstrap creation and configuration

EventLoopGroup acceptorGroup = vertx.getAcceptorEventLoopGroup(); (1)
EventLoop eventLoop = context.nettyEventLoop(); (2)
bootstrap = new ServerBootstrap(); (3)
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(acceptorGroup, eventLoop);
bootstrap.childHandler(new ChannelInitializer<Channel>() {
  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline(); (4)
    TimeServerHandler handler = new TimeServerHandler(context, requestHandler);
    pipeline.addLast(handler);
  }
});
1 VertxInternal returns the event loop group to use as acceptor group
2 ContextInternal returns the event loop to use as child group
3 create and configure the Netty’s ServerBootstrap
4 configure the channel with the TimeServerHandler initialized with the server requestHandler

The creation of the ServerBootstrap is quite straightforward and is very similar to the original version. The main difference is that we reuse the event-loop provided by the Verticle and Vert.x. This ensures that our server shares the same resources of our application.

Notice that the TimeServerHandler is initialized with the server requestHandler, this handler will be used when serving TIME requests.

The server bind

Now let’s have a look at the bind operation, again it’s very and does not differ much from the original example:

Promise<Void> promise = context.promise(); (1)

ChannelFuture bindFuture = bootstrap.bind(host, port);
bindFuture.addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) {
    (2)
    if (future.isSuccess()) {
      channel = future.channel();
      promise.complete();
    } else {
      promise.fail(future.cause());
    }
  }
});

return promise.future(); (3)
1 create a promise bound to the server context
2 complete or succeed the result promise
3 return the future result

The most important part is the creation of the context promise to make the application aware of the bind result.

The server handler

Now let’s finish our server with the TimeServerHandler, which is an adaptation of the Netty’s original TimeServerHandler:

Promise<Long> result = Promise.promise(); (1)

context.emit(result, requestHandler); (2)

result.future().onComplete(ar -> { (3)
  if (ar.succeeded()) {  (4)
    ByteBuf time = ctx.alloc().buffer(4);
    time.writeInt((int) (ar.result() / 1000L + 2208988800L));
    ChannelFuture f = ctx.writeAndFlush(time);
    f.addListener((ChannelFutureListener) channelFuture -> ctx.close());
  } else {  (5)
    ctx.close();
  }
});
1 create a new blank promise that will be resolved by the requestHandler
2 let the context emit the event to the requestHandler with emit
3 the future handler is called when the requestHandler implementation completes the associated promise
4 write the current TIME to the channel and close it after
5 the application failed we simply close the socket

emit is used when a TIME request event happens, the promise to be completed is passed to the requestHandler. When this promise is completed, the handler will either write the time result to the channel or close it.

The client

The original client example can be found here.

The Vert.x time client exposes a simple API:

  • a static method for creating a TimeClient

  • the client getTime method for retrieving a time value from a server

public interface TimeClient {

  /**
   * @return a new time client
   */
  static TimeClient create(Vertx vertx) {
    return new TimeClientImpl(vertx);
  }

  /**
   * Fetch the current time from a server.
   *
   * @param port the server port
   * @param host the server host name
   * @return the result future
   */
  Future<Long> getTime(int port, String host);

}

The TIME client is straightforward to use:

Vertx vertx = Vertx.vertx();

// Create the time client
TimeClient server = TimeClient.create(vertx);

// Fetch the time
server.getTime(8037, "localhost").onComplete(ar -> {
  if (ar.succeeded()) {
    System.out.println("Time is " + new Date(ar.result()));
  } else {
    ar.cause().printStackTrace();
  }
});

Let’s study now the client implementation.

The client bootstrap

First let’s have a look at the Bootstrap creation and configuration

EventLoop eventLoop = context.nettyEventLoop();  (1)

// Create and configure the Netty bootstrap
Bootstrap bootstrap = new Bootstrap(); (2)
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<Channel>() {
  @Override
  protected void initChannel(Channel ch) {
    ChannelPipeline pipeline = ch.pipeline(); (3)
    pipeline.addLast(new TimeClientHandler(result));
  }
});

return bootstrap;
1 ContextInternal returns the event loop to use as child group
2 create and configure the Netty’s Bootstrap
3 configure the channel with the TimeServerHandler initialized with the server resultHandler

The creation of the Bootstrap is quite straightforward and is very similar to the original version. The main difference is that we reuse the event-loop provided by the Verticle. This ensures that our client reuses the same event-loop than our verticle.

Like in the server example we use the ContextInternal to obtain Netty’s EventLoop to set on the Bootstrap.

Notice that the TimeServerHandler is initialized with the client resultHandler, this handler will be called with the TIME request result.

The client connect

The bootstrap setup is very similar to the original example, in case of a failure the application callback uses a promise that holds the overall result.

ChannelFuture connectFuture = bootstrap.connect(host, port); (1)
connectFuture.addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) throws Exception {
    if (!future.isSuccess()) {
      result.fail(future.cause()); // 2
    }
  }
});
1 connect to the server
2 upon connect error we fail the promise

We only care of propagating a connect failure to the application, when the bootstrap connects successfully, the TimeServerHandler will handle the network response to the application.

The client handler

Now let’s complete our client with the TimeServerHandler, which is an adaptation of the Netty’s original TimeClientHandler:

ByteBuf m = (ByteBuf) msg;
long currentTimeMillis;
try {
  currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; (1)
  resultPromise.complete(currentTimeMillis);  (2)
  resultPromise = null; (3)
  ctx.close(); (4)
} finally {
  m.release();
}
1 decode the time response from the server
2 complete the resultPromise with the response
3 set the resultPromise to null
4 close the channel

Again here, we complete the resultPromise when a TIME response event happens.

Using Netty TCP codecs

In the previous section we have examined how Vert.x and Netty can share resources and the propagation of Netty events to a Vert.x application. In this section we will study the integration of existing Netty codecs.

Netty codecs are great for encapsulating and reusing network protocols encoder and decoder. The base Netty distribution provides a few codecs for popular protocols such as HTTP, Redis, Memcached or MQTT.

Client and server can be built on top of these codecs with Vert.x, e.g the Vert.x HTTP components reuses Netty’s HTTP codec, akin for Vert.x the MQTT protocols.

Vert.x TCP client/server can be customized to reuse Netty codecs. In fact the channel of a NetSocket can be used to customize the pipeline and read/write arbitrary messages.

There is a lot of value in reusing NetSocket this way

  • extend the Vert.x ecosystem, your clients/servers will be fully integrated with this ecosystem, i.e you can mix and match your middleware with existing Vert.x middleware, filesystem, etc…​

  • build on top of NetSocket features

    • SSL/TLS

    • Domain sockets

    • Client Socks/HTTP proxy handling

    • Server verticle scaling

    • Metrics

    • SNI handling

In this chapter we will write a client, but the same techniques can be applied for writing a server on top of Netty’s codec the same way.

everything achieved in this chapter can be also achieved using the techniques shown in the Integrating Netty chapter

The Memcached client

As example we will build in this chapter a simple Memcached client on top of Netty’s Memcached binary codec.

Memcached is a popular free and open source, high-performance, distributed memory object caching system.

There are two versions of the protocol, text and binary. In this section we will build a client for the binary protocol described in this document.

The client is very straightforward to use:

Vertx vertx = Vertx.vertx();

MemcachedClient.connect(vertx, 11211, "localhost")
    .compose(client -> {
      System.out.println("connected");

      // Put a value
      return client.set("foo", "bar").compose(v -> {
        System.out.println("Put successful");

        // Now retrieve the same value
        return client.get("foo");
      });
    }).onSuccess(res -> {
      System.out.println("Get successful " + res + "");
    }).onFailure(err -> err.printStackTrace());

You can easily start a Memcached server with Docker to try this example:

> docker run --rm --name my-memcache -p 11211:11211 -d memcached

Anatomy of the Memcached client

The client provides a simple API for connecting to a Memcached server and get/set entries.

public interface MemcachedClient {

  /**
   * Connect to memcached, the {@code completionHandler} will get the {@link MemcachedClient} instance.
   */
  static Future<MemcachedClient> connect(Vertx vertx, int port, String host) {
    return MemcachedClientImpl.connect(vertx, port, host, null);
  }

  /**
   * Connect to memcached, the {@code completionHandler} will get the {@link MemcachedClient} instance.
   */
  static Future<MemcachedClient> connect(Vertx vertx, int port, String host, NetClientOptions options) {
    return MemcachedClientImpl.connect(vertx, port, host, options);
  }

  /**
   * Get a cached entry.
   *
   * @param key the entry key
   * @return the result future
   */
  Future<@Nullable String> get(String key);

  /**
   * Set a cached entry.
   *
   * @param key the entry key
   * @param value the entry value
   * @return the result future
   */
  Future<Void> set(String key, String value);

}

The Memcached codec

The Memcached codec provided by Netty takes care of encoding and decoding Netty ByteBuf from and to Memcached request and response.

Our client will only require to use Memcached objects:

  • write FullBinaryMemcacheRequest to the pipeline

    • has a key property: a ByteBuf to provide the cached entry key

    • has a opCode property: an enum indicating the operation, GET and SET

    • has a extras property: a Bytebuf to provide extra information, only used in Memcached SET requests

    • has a content property: a Bytebuf to provide the cached entry value, only used in Memcached SET requests

  • read FullBinaryMemcacheResponse from the pipeline

    • has a status property: a value equals to 0 when the operation went successful

    • has a content property: a Bytebuf to provide the cached entry value, only used in Memcached GET responses

Memcached provides a richer protocol than GET or SET, but we don’t cover it in this section, as the goal is just to be a demonstration and not a complete client

Connecting to the server

Let’s look first at the client connect implementation:

NetClient tcpClient = options != null ? vertx.createNetClient(options) : vertx.createNetClient();

// Connect to the memcached instance
Future<NetSocket> connect = tcpClient.connect(port, host);
return connect.map(so -> {
  // Create the client
  MemcachedClientImpl memcachedClient = new MemcachedClientImpl((VertxInternal) vertx, (NetSocketInternal) so);

  // Initialize the client: configure the pipeline and set the handlers
  memcachedClient.init();

  return memcachedClient;
});

The connect implementation creates a Vert.x NetClient to connect to the actual Memcached server. When the connect is a success

  • the Vert.x NetSocket is casted to NetSocketInternal

  • the Memcached client is created and initialized

The NetSocketInternal is an advanced interface that gives access to a few extra methods that we need to build the client:

  • channelHandlerContext() returns the context of the NetSocket Netty’s handler

  • writeMessage(Object, Handler<AsyncResult<Void>>) writes an object to the pipeline

  • messsageHandler(Handler<Object>) sets and handler for processing pipeline messages

The Memcached client init method uses some of them to

  • initialize the NetSocket with the Memcached codec

  • sets a message handler to process the Memcached responses

ChannelPipeline pipeline = so.channelHandlerContext().pipeline();

// Add the memcached message aggregator
pipeline.addFirst("aggregator", new BinaryMemcacheObjectAggregator(Integer.MAX_VALUE));

// Add the memcached decoder
pipeline.addFirst("memcached", new BinaryMemcacheClientCodec());

// Set the message handler to process memcached message
so.messageHandler(this::processResponse);

Request / response correlation

The Memcached protocol is a pipelined protocol, the responses are received in the same order than the requests are sent.

Therefore the client needs to maintain an inflight FIFO queue which is a simple Java ConcurrentLinkedQueue. When a request is sent to the Memcached server, the response handler is added to the queue. When the response is received the handler is dequeued and can process the response.

Sending Memcached request messages

The client has a writeRequest method that sends a request to the pipeline:

  • write the request message

  • when the write is successful, add the response handler to the inflight queue so the responses can be processed

return so.writeMessage(request).compose(v -> {

  // The message has been encoded successfully and sent
  // Create a response promise and add it to the inflight queue, so it can be resolved by the server ack
  Promise<FullBinaryMemcacheResponse> promise = vertx.promise();
  inflight.add(promise);

  //
  return promise.future();
});

Processing Memcached response messages

The client has a processResponse method that is called each time the Memcached codec decodes a response:

  • dequeue the response handler

  • release the Netty message since the response messages are pooled, this method must be called otherwise a memory leak will happen

FullBinaryMemcacheResponse response = (FullBinaryMemcacheResponse) msg;

try {
  // Get the handler that will process the response
  Promise<FullBinaryMemcacheResponse> handler = inflight.poll();

  // Handle the message
  handler.complete(response);
} finally {
  // Release the referenced counted message
  response.release();
}

Sending Memcached GET requests

Memcached GET is fairly straightforward

  • create a FullBinaryMemcacheRequest

    • set the key property

    • set the opCode property to BinaryMemcacheOpcodes.GET

  • call writeRequest passing the request and providing the response handler

ByteBuf keyBuf = Unpooled.copiedBuffer(key, StandardCharsets.UTF_8);

// Create the memcached request
FullBinaryMemcacheRequest request = new DefaultFullBinaryMemcacheRequest(keyBuf, Unpooled.EMPTY_BUFFER);

// Set the memcached operation opcode to perform a GET
request.setOpcode(BinaryMemcacheOpcodes.GET);

// Execute the request and process the response
return writeRequest(request).map(response -> processGetResponse(response));

Processing Memcached GET responses

Memcached GET responses are processed by processGetResponse

short status = response.status();
switch (status) {

  case 0:
    // Succesfull get
    return response.content().toString(StandardCharsets.UTF_8);

  case 1:
    // Empty response -> null
    return null;

  default:
    // Memcached error
    throw new MemcachedError(status);
}

The status property of the response indicates whether the response is successful or not. We need to pay special attention when the status is 1 as the client handles it as a Java null value.

Sending Memcached SET requests

Memcached SET is straightforward too

  • create a FullBinaryMemcacheRequest

    • set the key property

    • set the opCode property to BinaryMemcacheOpcodes.SET

    • set the extras property to a the value 0xDEADBEEF_00001C20

      • 0xDEADBEEF must be used per the protocol

      • 00001C20 is the expiration time set to 2 hours

    • set the value property

  • call writeRequest passing the request and providing the response handler

ByteBuf keyBuf = Unpooled.copiedBuffer(key, StandardCharsets.UTF_8);

// Create the memcached request
FullBinaryMemcacheRequest request = new DefaultFullBinaryMemcacheRequest(keyBuf, Unpooled.EMPTY_BUFFER);

// Set the memcached operation opcode to perform a GET
request.setOpcode(BinaryMemcacheOpcodes.GET);

// Execute the request and process the response
return writeRequest(request).map(response -> processGetResponse(response));

Processing Memcached SET responses

Memcached SET responses are processed by processSetResponse

short status = response.status();
if (status == 0) {
  // Succesfull get
  return null;
} else {
  // Memcached error
  throw new MemcachedError(status);
}