Skip to main content

The vertx-lang-kotlin-coroutines integrates Kotlin coroutines for performing asynchronous operations and processing events. This results in using a programming model that looks like sequential code, yet it does not block kernel threads.

Introduction

One of the key advantages of Vert.x over many legacy application platforms is that it is almost entirely non-blocking (of kernel threads). This allows Vert.x-based applications to handle a lot of concurrency (e.g., many connections and messages) using a very small number of kernel threads, which in turns unlocks great scalability.

The non-blocking nature of Vert.x leads to asynchronous APIs. Asynchronous APIs can take various forms including callbacks, promises, fibers or reactive extensions. Vert.x uses the callback style for the core APIs but it also supports other models like RxJava 1 and 2.

In some cases, programming using asynchronous APIs can be more challenging than using a classic / sequential style of code, in particular with several operations need to be done in sequence. Also, error propagation is often more complex when using asynchronous APIs.

vertx-lang-kotlin-coroutines uses coroutines. Coroutines are very lightweight threads that do not correspond to underlying kernel threads, so that when a coroutine needs to "block" it gets suspended and frees its current kernel thread so that another coroutine can process events.

vertx-lang-kotlin-coroutines uses kotlinx.coroutines to implement the Coroutines.

Note
vertx-lang-kotlin-coroutines currently only works with Kotlin and is an experimental feature of Kotlin 1.1

Running a coroutine from a Vert.x context

Having imported io.vertx.kotlin.coroutines.VertxCoroutine, the launch method allows running a block of code as a coroutine that can be suspended:

val vertx = Vertx.vertx()
vertx.deployVerticle(ExampleVerticle())

launch(vertx.dispatcher()) {
  val timerId = awaitEvent<Long> { handler ->
    vertx.setTimer(1000, handler)
  }
  println("Event fired from timer with id ${timerId}")
}

The vertx.dispatcher() returns a coroutine dispatcher that runs coroutines using the Vert.x event loop.

The awaitEvent function suspends the execution of the coroutine until the timer fires and returns the value that was given to the handler. More details are given in the next sections on handlers, events and stream of events.

Extending CoroutineVerticle

You can deploy your code as instances of io.vertx.kotlin.coroutines.CoroutineVerticle, a specialized type of verticle for Kotlin coroutines. You should override the start() and (optionally) the stop() methods of the verticle:

class MyVerticle : CoroutineVerticle() {
  suspend override fun start() {
    // ...
  }
}

Getting one-shot asynchronous results

Many asynchronous operations in Vert.x take a Handler<AsyncResult<T>> as the last argument. An example would be executing an object retrieval using the Vert.x Mongo client, or sending an event bus message then awaiting for a reply.

This is completion by using the awaitResult method which returns the value or throws an exception.

The coroutine is being suspended until the event is being processed, and no kernel thread is being blocked.

The method is executed by specifying the asynchronous operation that needs to be executed in the form of a block that is passed to the handler at run-time.

Here is an example:

suspend fun awaitResultExample() {
  val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
  consumer.handler { message ->
    println("Consumer received: ${message.body()}")
    message.reply("pong")
  }

  // Send a message and wait for a reply
  val reply = awaitResult<Message<String>> { h ->
    vertx.eventBus().send("a.b.c", "ping", h)
  }
  println("Reply received: ${reply.body()}")
}

When the block produces a failure, the caller can handle it as an exception using the usual exception try/catch constructs:

suspend fun awaitResultFailureExample() {
  val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
  consumer.handler { message ->
    // The consumer will get a failure
    message.fail(0, "it failed!!!")
  }

  // Send a message and wait for a reply
  try {
    val reply: Message<String> = awaitResult<Message<String>> { h ->
         vertx.eventBus().send("a.b.c", "ping", h)
       }
  } catch(e: ReplyException) {
    // Handle specific reply exception here
    println("Reply failure: ${e.message}")
  }
}

Getting one-shot events

Processing a one-shot event (and not the next occurrences, if any) is completion using the awaitEvent function:

suspend fun awaitEventExample() {
  val id = awaitEvent<Long> { h -> vertx.setTimer(2000L, h) }
  println("This should be fired in 2s by some time with id=$id")
}

Streams of events

In many places in Vert.x APIs, streams of events are processed through handlers. Examples include event bus message consumers and HTTP server requests.

The ReceiveChannelHandler class allows receiving events through the (suspendable) receive method:

suspend fun streamExample() {
  val adapter = vertx.receiveChannelHandler<Message<Int>>()
  vertx.eventBus().localConsumer<Int>("a.b.c").handler(adapter)

  // Send 15 messages
  for (i in 0..15) vertx.eventBus().send("a.b.c", i)

  // Receive the first 10 messages
  for (i in 0..10) {
    val message = adapter.receive()
    println("Received: ${message.body()}")
  }
}

Handlers

Calling launch allows running Vert.x handlers on a coroutine, as in:

vertx.createHttpServer().requestHandler { req ->
  launch(context.dispatcher()) {
    val timerID = awaitEvent<Long> { h -> vertx.setTimer(2000, h) }
    req.response().end("Hello, this is timerID $timerID after 2 seconds!")
  }
}.listen(8081)

Awaiting the completion of Vert.x futures

The await extension method on instances of Vert.x future objects suspend coroutines until they have completed, in which case the method returns the corresponding AsyncResult<T> object.

suspend fun awaitingFuture() {
  val httpServerFuture = Future.future<HttpServer>()
  vertx.createHttpServer()
    .requestHandler { req -> req.response().end("Hello!") }
    .listen(8000, httpServerFuture)

  val httpServer = httpServerFuture.await()
  println("HTTP server port: ${httpServer.actualPort()}")

  val result = CompositeFuture.all(httpServerFuture, httpServerFuture).await()
  if (result.succeeded()) {
    println("The server is now running!")
  } else {
    result.cause().printStackTrace()
  }
}

Channels

Channels provide a way to transfer a stream of values, Vert.x ReadStream and WriteStream can be adapted to channels with the toChannel extension method.

These adapters take care of managing the back-pressure and the stream termination

  • ReadStream<T> is adapted to a ReceiveChannel<T>

  • WriteStream<T> is adapted to a SendChannel<T>

Receiving data

Channel can be really useful when you need to handle a stream of correlated values:

suspend fun handleTemperatureStream() {
  val stream = vertx.eventBus().consumer<Double>("temperature")
  val channel = stream.toChannel(vertx)

  var min = Double.MAX_VALUE
  var max = Double.MIN_VALUE

  // Iterate until the stream is closed
  // Non-blocking
  for (msg in channel) {
    val temperature = msg.body()
    min = Math.min(min, temperature)
    max = Math.max(max, temperature)
  }

  // The stream is now closed
}

It can also be useful for parsing protocols. We will build a non blocking HTTP request parser to show the power of channels.

We will rely on the RecordParser to slice the stream of buffer to a stream of buffer delimited by \r\n.

Here is the initial version of the parser, that handles only the HTTP request-line

val server = vertx.createNetServer().connectHandler { socket ->

  // The record parser provides a stream of buffers delimited by \r\n
  val stream = RecordParser.newDelimited("\r\n", socket)

  // Convert the stream to a Kotlin channel
  val channel = stream.toChannel(vertx)

  // Run the coroutine
  launch(vertx.dispatcher()) {

    // Receive the request-line
    // Non-blocking
    val line = channel.receive().toString().split(" ")
    val method = line[0]
    val uri = line[1]

    println("Received HTTP request ($method, $uri)")

    // Still need to parse headers and body...
  }
}

Parsing the request-line is as simple as calling receive on the channel.

The next step parses HTTP headers by receiving chunks until we get an empty one

// Receive HTTP headers
val headers = HashMap<String, String>()
while (true) {

  // Non-blocking
  val header = channel.receive().toString()

  // Done with parsing headers
  if (header.isEmpty()) {
    break
  }

  val pos = header.indexOf(':')
  headers[header.substring(0, pos).toLowerCase()] = header.substring(pos + 1).trim()
}

println("Received HTTP request ($method, $uri) with headers ${headers.keys}")

Finally we terminate the parser by handling optional request bodies

// Receive the request body
val transferEncoding = headers["transfer-encoding"]
val contentLength = headers["content-length"]

val body : Buffer?
if (transferEncoding == "chunked") {

  // Handle chunked encoding, e.g
  // 5\r\n
  // HELLO\r\n
  // 0\r\n
  // \r\n

  body = Buffer.buffer()
  while (true) {

    // Parse length chunk
    // Non-blocking
    val len = channel.receive().toString().toInt(16)
    if (len == 0) {
      break
    }

    // The stream is flipped to parse a chunk of the exact size
    stream.fixedSizeMode(len + 2)

    // Receive the chunk and append it
    // Non-blocking
    val chunk = channel.receive()
    body.appendBuffer(chunk, 0, chunk.length() - 2)

    // The stream is flipped back to the \r\n delimiter to parse the next chunk
    stream.delimitedMode("\r\n")
  }
} else if (contentLength != null) {

  // The stream is flipped to parse a body of the exact size
  stream.fixedSizeMode(contentLength.toInt())

  // Non-blocking
  body = channel.receive()
} else {
  body = null
}

println("Received HTTP request ($method, $uri) with headers ${headers.keys} and body with size ${body?.length() ?: 0}")

Sending data

Using a channel to send data is quite straightforward:

suspend fun sendChannel() {
  val stream = vertx.eventBus().publisher<Double>("temperature")
  val channel = stream.toChannel(vertx)

  while (true) {
    val temperature = readTemperatureSensor()

    // Broadcast the temperature
    // Non-blocking but could be suspended
    channel.send(temperature)

    // Wait for one second
    awaitEvent<Long> { vertx.setTimer(1000, it)  }
  }
}

Both SendChannel#send and WriteStream#write are non blocking operations, however unlike SendChannel#send can suspend the loop execution when the channel is full, the equivalent without a channel would look like

// Check we can write in the stream
if (stream.writeQueueFull()) {

  // We can't write so we set a drain handler to be called when we can write again
  stream.drainHandler { broadcastTemperature() }
} else {

  // Read temperature
  val temperature = readTemperatureSensor()

  // Write it to the stream
  stream.write(temperature)

  // Wait for one second
  vertx.setTimer(1000) {
    broadcastTemperature()
  }
}

RxJava integration

The module vertx-lang-kotlin-coroutines does not have specific integration with RxJava however Kotlin coroutines provide integration with RxJava, which works out nicely with vertx-lang-kotlin-coroutines.

You can read about it in the Guide to reactive streams with coroutines