public class MyVerticle extends AbstractVerticle {
public void start() {
JsonObject config = context.config();
}
}
Advanced Vert.x Guide
This guide document advanced/internal stuff about Vert.x (5.0).
It aims to explain and discuss the following
-
Vert.x design
-
Internal APIs
-
Integration with Netty
You want to read this guide when you want to
-
understand better Vert.x internals
-
integrate Vert.x with thirdparty libraries
-
perform networking with Netty and Vert.x
This is a live guide and you can contribute, just open a PR or an issue in the repo.
Some of the internal Vert.x APIs are exposed in this guide and you should keep in mind that these APIs are subject to be changed when it is needed. |
Contexts in Vert.x
The io.vertx.core.Context
interface is an essential component of Vert.x.
At a high level contexts can be thought of as controlling the execution of how events (or tasks created by handlers) are executed by the application.
Most events are dispatched through contexts, when the application consumes an event there is most likely a context associated with the dispatch of the event.
Verticle contexts
When an instance of a verticle is deployed, Vert.x creates and associates a context with this instance. You can access this context in your verticle using the context
field of AbstractVerticle
:
When MyVerticle
is deployed, Vert.x emits a start event, the start
method is called by a thread of the Verticle context:
-
by default the context is always an event-loop context, the caller thread is an event-loop
-
when the verticle is deployed as a worker the caller thread is one of the worker pool of Vert.x
Ad hoc contexts
Using Vert.x APIs without using Verticle is supported since Vert.x 3 and leads to the interesting question of which context is used.
When a Vert.x API is called, Vert.x associates the current thread with an ad hoc event-loop context, Vertx#getOrCreateContext()
creates a context the first time it is called for a non vertx thread and then returns this context on subsequent calls.
Consequently, callbacks on asynchronous Vert.x API happen on the same context:
public class Main {
public static void main(String[] args) {
WebClient client = WebClient.create(vertx);
for (int i = 0;i < 4;i++) {
client
.get(8080, "myserver.mycompany.com", "/some-uri")
.send()
.onSuccess(ar -> {
// All callbacks are on the same context
});
}
}
}
This behavior differs from previous major versions, Vert.x 3 would create a different context for each HTTP request.
While Vert.x does encourage confining code in context, such behavior avoids potential data races.
Propagation of contexts
Most Vert.x APIs are aware of contexts.
Asynchronous operations executed within a context will call back the application with the same context.
Likewise, event handlers are also dispatched on the same context.
public class MyVerticle extends AbstractVerticle {
public void start() {
Future<HttpServer> future = vertx.createHttpServer()
.requestHandler(request -> {
// Executed in the verticle context
})
.listen(8080, "localhost");
future.onComplete(ar -> {
// Executed in the verticle context
});
}
}
Dealing with contexts
Most application don’t need tight interactions with a context but sometimes it can be useful to access them, e.g your application uses another library that performs a callback on its own thread and you want to execute code in the original context.
Above we have seen a verticle can access its context through the context
field but that implies to use a verticle and to have a reference on the verticle which might not always be handy.
You can get the current context with getOrCreateContext()
:
Context context = vertx.getOrCreateContext();
You can also use the static method Vertx.currentContext()
:
Context context = Vertx.currentContext();
The later might return null if the current thread is not associated with a context, whereas the former will create one if needed and thus never returns null.
After you obtained a context, you can use it to run code in this context:
public void integrateWithExternalSystem(Handler<Event> handler) {
// Capture the current context
Context context = vertx.getOrCreateContext();
// Run the event handler on the application context
externalSystem.onEvent(event -> {
context.runOnContext(v -> handler.handle(event));
});
}
In practice, many Vert.x APIs and thirdparty libraries are implemented this way.
Event-loop context
An event loop context uses an event loop to run code: handlers are executed directly on the IO threads, as a consequence:
-
A handler will always be executed with the same thread
-
A handler must never block the thread, otherwise it will create starvation for all the IO tasks associated with that event loop.
This behavior allows for a greatly simplified threading model by guaranteeing that associated handlers will always be executed on the same thread, thus removing the need for synchronization and other locking mechanisms.
This is the type of context that is the default and most commonly used type of context. A verticle deployed without the worker flag will always be deployed with an event loop context.
Worker context
Worker contexts are assigned to verticles deployed with the worker option enabled. The worker context is differentiated from standard event loop contexts in that workers are executed on a separate worker thread pool.
This separation from event loop threads allows worker contexts to execute the types of blocking operations that will block the event loop: blocking such thread will not impact the application other than blocking one thread.
Just as is the case with the event loop context, worker contexts ensure that handlers are only executed on one thread at any given time. That is, handlers executed on a worker context will always be executed sequentially - one after the other - but different actions may be executed on different threads.
Context exception handler
An exception handler can be set on a context, to catch any unchecked exception thrown by task running on a context.
When no exception handler is set, the Vertx
exception handler is called instead
context.exceptionHandler(throwable -> {
// Any exception thrown by this context
});
vertx.exceptionHandler(throwable -> {
// Any exception uncaught exception thrown on context
});
When no handler is set whatsoever, the exception is logged as an error with the message Unhandled exception
You can report an exception on a context with reportException
context.reportException(new Exception());
Firing events
runOnContext
is the most common way to execute a piece of code on a context. Although it is very suited for integrating external libraries with Vert.x, it is not always the best fit to integrate code executing at the event-loop level (such as Netty events) with application code.
There are internal methods that achieve similar behaviors depending on the situation
-
ContextInternal#dispatch(E, Handler<E>)
-
ContextInternal#execute(E, Handler<E>)
-
ContextInternal#emit(E, Handler<E>)
Dispatch
dispatch
assumes the caller thread is the context thread, it associates the current thread of execution with the context:
assertNull(Vertx.currentContext());
context.dispatch(event, evt -> {
assertSame(context, Vertx.currentContext());
});
The handler is also monitored by the blocked thread checker.
Finally, any exception thrown by the handler is reported to the context:
context.exceptionHandler(err -> {
// Should receive the exception thrown below
});
context.dispatch(event, evt -> {
throw new RuntimeException();
});
Execute
execute
executes a task on the context, when the caller thread is already a context thread, the task is executed directly, otherwise a task is scheduled for execution.
no context associated is done |
Emit
emit
is a combination of execute
and dispatch
default void emit(E event, Handler<E> eventHandler) {
execute(v -> dispatch(argument, task));
}
emit
can be used from any thread to fire an event to a handler:
-
from any thread, it behaves like
runOnContext
-
from a context thread, it runs the event handler with the context thread local association, the block thread checker and reports failures on the context
In most situations, the emit
method is the way to go to have an application process an event. The main purpose of dispatch
and execute
methods is to give more control to the code to achieve very specific things.
Context aware futures
Until Vert.x 4, Future
were statically created object with no specific relationship to a context. Vert.x 4 provides a future based API which need to respect the same semantics as Vert.x 3: any callback on a future should predictably run on the same context.
Vert.x 4 API creates futures bound to the caller context that run callbacks on the context:
Promise<String> promise = context.promise();
Future<String> future = promise.future();
future.onSuccess(handler);
Any callback is emitted on the context that created the promise, the code above is pretty much like:
Promise<String> promise = Promise.promise();
Future<String> future = promise.future();
future.onSuccess(result -> context.emit(result, handler));
In addition, the API allows to create succeeded and failed futures:
Future<String> succeeded = context.succeededFuture("OK usa");
Future<String> failed = context.failedFuture("Oh sorry");
Contexts and tracing
Since Vert.x 4, Vert.x integrates with popular distributing tracing systems.
Tracing libraries usually rely on thread local storage to propagate tracing data, e.g a trace received when processing an HTTP request should be propagated throughout the HTTP client.
Vert.x integrates tracing in a similar fashion but relies on contexts instead of thread local. Contexts are indeed propagated by Vert.x APIs and therefore offers a reliable storage for implementing tracing.
Since all HTTP requests processed by a given server use the same context that created the HTTP server, the server context is duplicated for each HTTP request, to grant unicity to each HTTP request.
public class MyVerticle extends AbstractVerticle {
public void start() {
vertx.createHttpServer()
.requestHandler(request -> {
// Executed in a duplicate verticle context
})
.listen(8080, "localhost");
}
}
Duplication shares most of the characteristics of the original context and provides a specific local storage.
vertx.createHttpServer()
.requestHandler(request -> {
JsonObject specificRequestData = getRequestData(request);
Context context = vertx.getOrCreateContext();
context.putLocal("my-stuff", specificRequestData);
processRequest(request);
})
.listen(8080, "localhost");
Later the application can use it:
Context context = vertx.getOrCreateContext();
JsonObject specificRequestData = context.getLocal("my-stuff");
ContextInternal#duplicate()
duplicates the current context, it can be used to scope an activity associated with a trace
public void startProcessing(Request request) {
Context duplicate = context.duplicate();
request.setContext(duplicate);
}
Close hooks
Close hooks is an internal feature of Vert.x useful for creating components that are notified when a Verticle
or a Vertx
instance is closed. It can be used for implementing automatic clean-up in verticles feature, like for a Vert.x HTTP server.
The contract for receving a close notification is defined by the io.vertx.core.Closeable
interface and its close(Promise<Void> closePromise)
method:
@Override
public void close(Promise<Void> completion) {
// Do cleanup, the method will complete the future
doClose(completion);
}
The method ContextInternal#addCloseHook
registers a Closeable
instance to be notified when the context closes:
context.addCloseHook(closeable);
A context created by a Verticle deployment calls the hook when the verticle instance is stopped.
Otherwise, the hook is called when the Vertx instance is closed.
Context#removeCloseHook
unregisters the close hook and shall be used when the resource is closed before the close hook is called.
context.removeCloseHook(closeable);
Hooks are implemented with weak references to avoid leaks, nevertheless unregistering hooks should be done.
Adding a hook on a duplicate context, adds the hook to the original context.
Likewise VertxInternal
expose also the same operations to receive notifications when a Vertx
instance is closed.
Integrating Netty
Netty is one of the dependencies of Vert.x. In fact, Netty powers the networking services of Vert.x. Vert.x Core provides the basic network services one can expect from such library:
-
TCP
-
HTTP
-
UDP
-
DNS
These are built with various components from Netty. The Netty community has implemented a wide range of components and this chapter explains how to integrate such components in Vert.x.
In this chapter we will build a TIME prococol client and server. The Netty documentation provides client/server implementations of this simple protocol, we will focus on the integration of these components.
Netty integration points
The main purpose of this chapter is to explain some of the Vert.x’s internal interfaces. Such interfaces are extensions that exposes low level methods to interact with Netty that are useful for components that re-use Netty directly.
Most users don’t need to deal with this extension and thus such methods are isolated in an extension interface. |
Bootstrapping clients
ContextInternal
extends io.vertx.core.Context
and exposes various Netty integration points like VertxInternal
.
Usually contexts are obtained from the Vertx#getOrCreateContext()
method that returns the current execution context or create a new one if necessary: when called in a Verticle, getOrCreateContext()
returns the context of this Verticle, when used in a non Vert.x thread like a main
or a unit test, it creates a new one and returns it.
Context context = vertx.getOrCreateContext();
// Cast to access extra methods
Internals contextInternal = (Internals) context;
Contexts are always associated with a Netty event loop and thus using this context ensures our components re-use the same event loop if one existed before or use a new one.
The ContextInternal#nettyEventLoop()
method returns this particular event loop and we can use it on Bootstrap
(for client) or ServerBoostrap
(for server):
ContextInternal contextInt = (ContextInternal) context; (1)
EventLoop eventLoop = contextInt.nettyEventLoop();
Bootstrap bootstrap = new Bootstrap(); (2)
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(eventLoop);
1 | get the event-loop associated with this context |
2 | create a bootstrap for the client |
Bootstrapping servers
VertxInternal
extends io.vertx.core.Vertx
, among all VertxInternal#getAcceptorEventLoopGroup()
returns an EventLoopGroup
for accepting connections on a server, it’s typical usage is on a ServerBootstrap
:
ContextInternal contextInt = (ContextInternal) context; (1)
EventLoop eventLoop = contextInt.nettyEventLoop();
VertxInternal vertxInt = contextInt.owner(); (2)
EventLoopGroup acceptorGroup = vertxInt.getAcceptorEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap(); (3)
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(acceptorGroup, eventLoop);
1 | get the event-loop associated with this context |
2 | get the acceptor event-loop group of Vertx |
3 | create the boostrap for the server |
Handling events
Now that we are more intimate with ContextInternal
, let’s look at how we can use it to handle Netty events such as network events, channel life cycle, etc…
The ContextInternal#emit
methods is used to emit events to the application as it ensures:
-
the context concurrency: reuse the current event-loop thread or execute on a worker
-
the thread local association of the current context with the dispatch thread
-
any uncaught exception thrown is reported on the context, such exception is either logged or passed to the
Context#exceptionHandler
Here is a short example showing a server bootstrap
Handler<Channel> bindHandler = ch -> {
};
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
context.emit(ch, bindHandler);
}
});
Promise<Void> bindPromise = context.promise();
bootstrap.bind(socketAddress).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Signal application with bind success
bindPromise.complete();
} else {
// Signal application with bind error
bindPromise.fail(future.cause());
}
}
});
return bindPromise.future();
The typical usage of emit
is to dispatch one or multiple events to the same handler, like an event handler.
When it comes to a future, the ContextInternal#promise
method creates a promise that will behave with listeners like the emit
method.
The server
The original server example can be found here.
The Vert.x TIME server exposes a simple API:
-
a static method to create a
TimeServer
-
two methods:
listen
to bind a server andclose
to unbind -
the
requestHandler
for setting a handler for handling requests
public interface TimeServer {
/**
* @return a new time server
*/
static TimeServer create(Vertx vertx) {
return new TimeServerImpl(vertx);
}
/**
* Set the handler to be called when a time request happens. The handler should complete
* the future with the time value.
*
* @param handler the handler to be called
* @return this object
*/
TimeServer requestHandler(Handler<Promise<Long>> handler);
/**
* Start and bind the time server.
*
* @param port the server port
* @param host the server host
* @return the future completed when the socket is bound
*/
Future<Void> listen(int port, String host);
/**
* Close the time server.
*/
void close();
}
A TIME server serving the current JVM time is then straighforward to implement:
Vertx vertx = Vertx.vertx();
// Create the time server
TimeServer server = TimeServer.create(vertx);
server.requestHandler(time -> {
time.complete(System.currentTimeMillis());
});
// Start the server
server.listen(8037, "0.0.0.0")
.onSuccess(v -> System.out.println("Server started"))
.onFailure(err -> err.printStackTrace());
Let’s study now the server implementation.
The server bootstrap
First let’s have a look at the ServerBootstrap
creation and configuration
EventLoopGroup acceptorGroup = vertx.getAcceptorEventLoopGroup(); (1)
EventLoop eventLoop = context.nettyEventLoop(); (2)
bootstrap = new ServerBootstrap(); (3)
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(acceptorGroup, eventLoop);
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); (4)
TimeServerHandler handler = new TimeServerHandler(context, requestHandler);
pipeline.addLast(handler);
}
});
1 | VertxInternal returns the event loop group to use as acceptor group |
2 | ContextInternal returns the event loop to use as child group |
3 | create and configure the Netty’s ServerBootstrap |
4 | configure the channel with the TimeServerHandler initialized with the server requestHandler |
The creation of the ServerBootstrap
is quite straightforward and is very similar to the original version. The main difference is that we reuse the event-loop provided by the Verticle and Vert.x. This ensures that our server shares the same resources of our application.
Notice that the TimeServerHandler
is initialized with the server requestHandler
, this handler will be used when serving TIME requests.
The server bind
Now let’s have a look at the bind operation, again it’s very and does not differ much from the original example:
Promise<Void> promise = context.promise(); (1)
ChannelFuture bindFuture = bootstrap.bind(host, port);
bindFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
(2)
if (future.isSuccess()) {
channel = future.channel();
promise.complete();
} else {
promise.fail(future.cause());
}
}
});
return promise.future(); (3)
1 | create a promise bound to the server context |
2 | complete or succeed the result promise |
3 | return the future result |
The most important part is the creation of the context promise to make the application aware of the bind result.
The server handler
Now let’s finish our server with the TimeServerHandler
, which is an adaptation of the Netty’s original TimeServerHandler
:
Promise<Long> result = Promise.promise(); (1)
context.emit(result, requestHandler); (2)
result.future().onComplete(ar -> { (3)
if (ar.succeeded()) { (4)
ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (ar.result() / 1000L + 2208988800L));
ChannelFuture f = ctx.writeAndFlush(time);
f.addListener((ChannelFutureListener) channelFuture -> ctx.close());
} else { (5)
ctx.close();
}
});
1 | create a new blank promise that will be resolved by the requestHandler |
2 | let the context emit the event to the requestHandler with emit |
3 | the future handler is called when the requestHandler implementation completes the associated promise |
4 | write the current TIME to the channel and close it after |
5 | the application failed we simply close the socket |
emit
is used when a TIME request event happens, the promise to be completed is passed to the requestHandler
. When this promise is completed, the handler will either write the time result to the channel or close it.
The client
The original client example can be found here.
The Vert.x time client exposes a simple API:
-
a static method for creating a
TimeClient
-
the client
getTime
method for retrieving a time value from a server
public interface TimeClient {
/**
* @return a new time client
*/
static TimeClient create(Vertx vertx) {
return new TimeClientImpl(vertx);
}
/**
* Fetch the current time from a server.
*
* @param port the server port
* @param host the server host name
* @return the result future
*/
Future<Long> getTime(int port, String host);
}
The TIME client is straightforward to use:
Vertx vertx = Vertx.vertx();
// Create the time client
TimeClient server = TimeClient.create(vertx);
// Fetch the time
server.getTime(8037, "localhost").onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Time is " + new Date(ar.result()));
} else {
ar.cause().printStackTrace();
}
});
Let’s study now the client implementation.
The client bootstrap
First let’s have a look at the Bootstrap
creation and configuration
EventLoop eventLoop = context.nettyEventLoop(); (1)
// Create and configure the Netty bootstrap
Bootstrap bootstrap = new Bootstrap(); (2)
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline(); (3)
pipeline.addLast(new TimeClientHandler(result));
}
});
return bootstrap;
1 | ContextInternal returns the event loop to use as child group |
2 | create and configure the Netty’s Bootstrap |
3 | configure the channel with the TimeServerHandler initialized with the server resultHandler |
The creation of the Bootstrap
is quite straightforward and is very similar to the original version. The main difference is that we reuse the event-loop provided by the Verticle. This ensures that our client reuses the same event-loop than our verticle.
Like in the server example we use the ContextInternal
to obtain Netty’s EventLoop
to set on the Bootstrap
.
Notice that the TimeServerHandler
is initialized with the client resultHandler
, this handler will be called with the TIME request result.
The client connect
The bootstrap setup is very similar to the original example, in case of a failure the application callback uses a promise that holds the overall result.
ChannelFuture connectFuture = bootstrap.connect(host, port); (1)
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
result.fail(future.cause()); // 2
}
}
});
1 | connect to the server |
2 | upon connect error we fail the promise |
We only care of propagating a connect failure to the application, when the bootstrap connects successfully, the TimeServerHandler
will handle the network response to the application.
The client handler
Now let’s complete our client with the TimeServerHandler
, which is an adaptation of the Netty’s original TimeClientHandler
:
ByteBuf m = (ByteBuf) msg;
long currentTimeMillis;
try {
currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; (1)
resultPromise.complete(currentTimeMillis); (2)
resultPromise = null; (3)
ctx.close(); (4)
} finally {
m.release();
}
1 | decode the time response from the server |
2 | complete the resultPromise with the response |
3 | set the resultPromise to null |
4 | close the channel |
Again here, we complete the resultPromise
when a TIME response event happens.
Using Netty TCP codecs
In the previous section we have examined how Vert.x and Netty can share resources and the propagation of Netty events to a Vert.x application. In this section we will study the integration of existing Netty codecs.
Netty codecs are great for encapsulating and reusing network protocols encoder and decoder. The base Netty distribution provides a few codecs for popular protocols such as HTTP, Redis, Memcached or MQTT.
Client and server can be built on top of these codecs with Vert.x, e.g the Vert.x HTTP components reuses Netty’s HTTP codec, akin for Vert.x the MQTT protocols.
Vert.x TCP client/server can be customized to reuse Netty codecs. In fact the channel of a NetSocket
can be used to customize the pipeline and read/write arbitrary messages.
There is a lot of value in reusing NetSocket
this way
-
extend the Vert.x ecosystem, your clients/servers will be fully integrated with this ecosystem, i.e you can mix and match your middleware with existing Vert.x middleware, filesystem, etc…
-
build on top of
NetSocket
features-
SSL/TLS
-
Domain sockets
-
Client Socks/HTTP proxy handling
-
Server verticle scaling
-
Metrics
-
SNI handling
-
In this chapter we will write a client, but the same techniques can be applied for writing a server on top of Netty’s codec the same way.
everything achieved in this chapter can be also achieved using the techniques shown in the Integrating Netty chapter |
The Memcached client
As example we will build in this chapter a simple Memcached client on top of Netty’s Memcached binary codec.
Memcached is a popular free and open source, high-performance, distributed memory object caching system.
There are two versions of the protocol, text and binary. In this section we will build a client for the binary protocol described in this document.
The client is very straightforward to use:
Vertx vertx = Vertx.vertx();
MemcachedClient.connect(vertx, 11211, "localhost")
.compose(client -> {
System.out.println("connected");
// Put a value
return client.set("foo", "bar").compose(v -> {
System.out.println("Put successful");
// Now retrieve the same value
return client.get("foo");
});
}).onSuccess(res -> {
System.out.println("Get successful " + res + "");
}).onFailure(err -> err.printStackTrace());
You can easily start a Memcached server with Docker to try this example:
> docker run --rm --name my-memcache -p 11211:11211 -d memcached
Anatomy of the Memcached client
The client provides a simple API for connecting to a Memcached server and get/set entries.
public interface MemcachedClient {
/**
* Connect to memcached, the {@code completionHandler} will get the {@link MemcachedClient} instance.
*/
static Future<MemcachedClient> connect(Vertx vertx, int port, String host) {
return MemcachedClientImpl.connect(vertx, port, host, null);
}
/**
* Connect to memcached, the {@code completionHandler} will get the {@link MemcachedClient} instance.
*/
static Future<MemcachedClient> connect(Vertx vertx, int port, String host, NetClientOptions options) {
return MemcachedClientImpl.connect(vertx, port, host, options);
}
/**
* Get a cached entry.
*
* @param key the entry key
* @return the result future
*/
Future<@Nullable String> get(String key);
/**
* Set a cached entry.
*
* @param key the entry key
* @param value the entry value
* @return the result future
*/
Future<Void> set(String key, String value);
}
The Memcached codec
The Memcached codec provided by Netty takes care of encoding and decoding Netty ByteBuf
from and to Memcached request and response.
Our client will only require to use Memcached objects:
-
write
FullBinaryMemcacheRequest
to the pipeline-
has a
key
property: aByteBuf
to provide the cached entry key -
has a
opCode
property: an enum indicating the operation,GET
andSET
-
has a
extras
property: aBytebuf
to provide extra information, only used in Memcached SET requests -
has a
content
property: aBytebuf
to provide the cached entry value, only used in Memcached SET requests
-
-
read
FullBinaryMemcacheResponse
from the pipeline-
has a
status
property: a value equals to0
when the operation went successful -
has a
content
property: aBytebuf
to provide the cached entry value, only used in Memcached GET responses
-
Memcached provides a richer protocol than GET or SET , but we don’t cover it in this section, as the goal is just to be a demonstration and not a complete client |
Connecting to the server
Let’s look first at the client connect implementation:
NetClient tcpClient = options != null ? vertx.createNetClient(options) : vertx.createNetClient();
// Connect to the memcached instance
Future<NetSocket> connect = tcpClient.connect(port, host);
return connect.map(so -> {
// Create the client
MemcachedClientImpl memcachedClient = new MemcachedClientImpl((VertxInternal) vertx, (NetSocketInternal) so);
// Initialize the client: configure the pipeline and set the handlers
memcachedClient.init();
return memcachedClient;
});
The connect
implementation creates a Vert.x NetClient
to connect to the actual Memcached server. When the connect is a success
-
the Vert.x
NetSocket
is casted toNetSocketInternal
-
the Memcached client is created and initialized
The NetSocketInternal
is an advanced interface that gives access to a few extra methods that we need to build the client:
-
channelHandlerContext()
returns the context of theNetSocket
Netty’s handler -
writeMessage(Object, Handler<AsyncResult<Void>>)
writes an object to the pipeline -
messsageHandler(Handler<Object>)
sets and handler for processing pipeline messages
The Memcached client init
method uses some of them to
-
initialize the
NetSocket
with the Memcached codec -
sets a message handler to process the Memcached responses
ChannelPipeline pipeline = so.channelHandlerContext().pipeline();
// Add the memcached message aggregator
pipeline.addFirst("aggregator", new BinaryMemcacheObjectAggregator(Integer.MAX_VALUE));
// Add the memcached decoder
pipeline.addFirst("memcached", new BinaryMemcacheClientCodec());
// Set the message handler to process memcached message
so.messageHandler(this::processResponse);
Request / response correlation
The Memcached protocol is a pipelined protocol, the responses are received in the same order than the requests are sent.
Therefore the client needs to maintain an inflight
FIFO queue which is a simple Java ConcurrentLinkedQueue
. When a request is sent to the Memcached server, the response handler is added to the queue. When the response is received the handler is dequeued and can process the response.
Sending Memcached request messages
The client has a writeRequest
method that sends a request to the pipeline:
-
write the request message
-
when the write is successful, add the response handler to the
inflight
queue so the responses can be processed
return so.writeMessage(request).compose(v -> {
// The message has been encoded successfully and sent
// Create a response promise and add it to the inflight queue, so it can be resolved by the server ack
Promise<FullBinaryMemcacheResponse> promise = vertx.promise();
inflight.add(promise);
//
return promise.future();
});
Processing Memcached response messages
The client has a processResponse
method that is called each time the Memcached codec decodes a response:
-
dequeue the response handler
-
release the Netty message since the response messages are pooled, this method must be called otherwise a memory leak will happen
FullBinaryMemcacheResponse response = (FullBinaryMemcacheResponse) msg;
try {
// Get the handler that will process the response
Promise<FullBinaryMemcacheResponse> handler = inflight.poll();
// Handle the message
handler.complete(response);
} finally {
// Release the referenced counted message
response.release();
}
Sending Memcached GET requests
Memcached GET is fairly straightforward
-
create a
FullBinaryMemcacheRequest
-
set the
key
property -
set the
opCode
property toBinaryMemcacheOpcodes.GET
-
-
call
writeRequest
passing the request and providing the response handler
ByteBuf keyBuf = Unpooled.copiedBuffer(key, StandardCharsets.UTF_8);
// Create the memcached request
FullBinaryMemcacheRequest request = new DefaultFullBinaryMemcacheRequest(keyBuf, Unpooled.EMPTY_BUFFER);
// Set the memcached operation opcode to perform a GET
request.setOpcode(BinaryMemcacheOpcodes.GET);
// Execute the request and process the response
return writeRequest(request).map(response -> processGetResponse(response));
Processing Memcached GET responses
Memcached GET responses are processed by processGetResponse
short status = response.status();
switch (status) {
case 0:
// Succesfull get
return response.content().toString(StandardCharsets.UTF_8);
case 1:
// Empty response -> null
return null;
default:
// Memcached error
throw new MemcachedError(status);
}
The status
property of the response indicates whether the response is successful or not. We need to pay special attention when the status
is 1
as the client handles it as a Java null
value.
Sending Memcached SET requests
Memcached SET is straightforward too
-
create a
FullBinaryMemcacheRequest
-
set the
key
property -
set the
opCode
property toBinaryMemcacheOpcodes.SET
-
set the
extras
property to a the value0xDEADBEEF_00001C20
-
0xDEADBEEF
must be used per the protocol -
00001C20
is the expiration time set to 2 hours
-
-
set the
value
property
-
-
call
writeRequest
passing the request and providing the response handler
ByteBuf keyBuf = Unpooled.copiedBuffer(key, StandardCharsets.UTF_8);
// Create the memcached request
FullBinaryMemcacheRequest request = new DefaultFullBinaryMemcacheRequest(keyBuf, Unpooled.EMPTY_BUFFER);
// Set the memcached operation opcode to perform a GET
request.setOpcode(BinaryMemcacheOpcodes.GET);
// Execute the request and process the response
return writeRequest(request).map(response -> processGetResponse(response));
Processing Memcached SET responses
Memcached SET responses are processed by processSetResponse
short status = response.status();
if (status == 0) {
// Succesfull get
return null;
} else {
// Memcached error
throw new MemcachedError(status);
}