val vertx = Vertx.vertx()
GlobalScope.launch(vertx.dispatcher()) {
val timerId = awaitEvent<Long> { handler ->
vertx.setTimer(1000, handler)
}
println("Event fired from timer with id $timerId")
}
vertx-lang-kotlin-coroutines
The vertx-lang-kotlin-coroutines
integrates Kotlin coroutines for performing asynchronous operations and processing events. This results in using a programming model that looks like sequential code, yet it does not block kernel threads.
Introduction
One of the key advantages of Vert.x over many legacy application platforms is that it is almost entirely non-blocking (of kernel threads). This allows Vert.x-based applications to handle a lot of concurrency (e.g., many connections and messages) using a very small number of kernel threads, which in turns unlocks great scalability.
The non-blocking nature of Vert.x leads to asynchronous APIs. Asynchronous APIs can take various forms including callbacks, promises, fibers or reactive extensions. Vert.x uses the callback style for the core APIs, but it also supports other models like RxJava.
In some cases, programming using asynchronous APIs can be more challenging than using a classic / sequential style of code, in particular with several operations need to be done in sequence. Also, error propagation is often more complex when using asynchronous APIs.
vertx-lang-kotlin-coroutines
uses coroutines. Coroutines are very lightweight threads that do not correspond to underlying kernel threads, so that when a coroutine needs to "block" it gets suspended and frees its current kernel thread so that another coroutine can process events.
vertx-lang-kotlin-coroutines
uses kotlinx.coroutines
to implement the Coroutines.
Running a coroutine from a Vert.x context
Having imported io.vertx.kotlin.coroutines.VertxCoroutine
, the GlobalScope.launch
method allows to run a block of code as a coroutine in the "Global" application scope (bounded on the lifetime of the application):
The vertx.dispatcher()
returns a coroutine dispatcher that execute coroutines using the Vert.x event loop.
The awaitEvent
function suspends the execution of the coroutine until the timer fires and resumes the coroutines with the value that was given to the handler.
More details are given in the next sections on handlers, events and stream of events.
Extending CoroutineVerticle
You can deploy your code as instances of io.vertx.kotlin.coroutines.CoroutineVerticle
, a specialized type of verticle for Kotlin coroutines. The CoroutineVerticle
class implements the kotlinx.coroutines.CoroutineScope
interface, making all coroutines builder methods bounded by default to the verticle context. You should override the suspending start()
and (optionally) the suspending stop()
methods of the verticle:
class MyVerticle : CoroutineVerticle() {
override suspend fun start() {
// ...
}
override suspend fun stop() {
// ...
}
}
All code examples below assume to be run inside a CoroutineVerticle
implementation, but you can replace all <builder> { .. }
calls with GlobalScope.<builder> { .. }
to use the application scope instead.
Getting one-shot asynchronous results
Many asynchronous operations in Vert.x take a Handler<AsyncResult<T>>
as the last argument. An example would be executing an object retrieval using the Vert.x Mongo client, or sending an event bus message then waiting for a reply.
This is achieved by using the awaitResult
method which returns the value or throws an exception.
The coroutine is being suspended until the event is being processed, and no kernel thread is being blocked.
The method is executed by specifying the asynchronous operation that needs to be executed in the form of a block that is passed to the handler at run-time.
Here is an example:
suspend fun awaitResultExample() {
val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
consumer.handler { message ->
println("Consumer received: ${message.body()}")
message.reply("pong")
}
// Send a message and wait for a reply
val reply = awaitResult<Message<String>> { h ->
vertx.eventBus().request<String>("a.b.c", "ping").onComplete(h)
}
println("Reply received: ${reply.body()}")
}
When the block produces a failure, the caller can handle it as an exception using the usual exception try
/catch
constructs:
suspend fun awaitResultFailureExample() {
val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
consumer.handler { message ->
// The consumer will get a failure
message.fail(0, "it failed!!!")
}
// Send a message and wait for a reply
try {
awaitResult<Message<String>> { h ->
vertx.eventBus().request<String>("a.b.c", "ping").onComplete(h)
}
} catch (e: ReplyException) {
// Handle specific reply exception here
println("Reply failure: ${e.message}")
}
}
Getting one-shot events
Processing a one-shot event (and not the next occurrences, if any) is achieved using the awaitEvent
function:
suspend fun awaitEventExample() {
val id = awaitEvent<Long> { h -> vertx.setTimer(2000L, h) }
println("This should be fired in 2s by some time with id=$id")
}
Getting one-shot worker results
Processing a blocking computation is achieved using the awaitBlocking
function:
suspend fun awaitBlockingExample() {
awaitBlocking {
Thread.sleep(1000)
"some-string"
}
}
Streams of events
In many places in Vert.x APIs, streams of events are processed through handlers. Examples include event bus message consumers and HTTP server requests.
The ReceiveChannelHandler
class allows receiving events through the (suspendable) receive
method:
suspend fun streamExample() {
val adapter = vertx.receiveChannelHandler<Message<Int>>()
vertx.eventBus().localConsumer<Int>("a.b.c").handler(adapter)
// Send 15 messages
for (i in 0..15) vertx.eventBus().send("a.b.c", i)
// Receive the first 10 messages
for (i in 0..10) {
val message = adapter.receive()
println("Received: ${message.body()}")
}
}
Awaiting the completion of Vert.x asynchronous results
Vert.x 4 provides futures and Future
has a coAwait()
suspending method latching the asynchronous result.
The coAwait
extension method on instances of Vert.x Future
suspends coroutines until they have completed, in which case the method returns the value or throws the failure of the corresponding AsyncResult<T>
object.
suspend fun awaitingFuture(anotherFuture: Future<String>) {
// Getting a future
val httpServerFuture = vertx.createHttpServer()
.requestHandler { req -> req.response().end("Hello!") }
.listen(8000)
val httpServer = httpServerFuture.coAwait()
println("HTTP server port: ${httpServer.actualPort()}")
// It also works for composite futures
val result = Future.all(httpServerFuture, anotherFuture).coAwait()
if (result.succeeded()) {
println("The server is now running!")
} else {
result.cause().printStackTrace()
}
}
Channels
Channels are similar to Java BlockingQueue
except that they suspend the coroutine instead of blocking the thread:
-
sending a value to full channel suspends the coroutine
-
receiving a value from an empty channel also suspends the coroutine
Vert.x ReadStream
and WriteStream
can be adapted to channels with the toChannel
extension method.
These adapters take care of managing the back-pressure and the stream termination
-
ReadStream<T>
is adapted to aReceiveChannel<T>
-
WriteStream<T>
is adapted to aSendChannel<T>
Receiving data
Channel can be really useful when you need to handle a stream of correlated values:
suspend fun handleTemperatureStream() {
val stream = vertx.eventBus().consumer<Double>("temperature")
val channel = stream.toReceiveChannel(vertx)
var min = Double.MAX_VALUE
var max = Double.MIN_VALUE
// Iterate until the stream is closed
// Non-blocking
for (msg in channel) {
val temperature = msg.body()
min = min(min, temperature)
max = max(max, temperature)
}
// The stream is now closed
}
It can also be useful for parsing protocols. We will build a non-blocking HTTP request parser to show the power of channels.
We will rely on the RecordParser
to slice the stream of buffer to a stream of buffer delimited by \r\n
.
Here is the initial version of the parser, that handles only the HTTP request-line
vertx.createNetServer().connectHandler { socket ->
// The record parser provides a stream of buffers delimited by \r\n
val stream = RecordParser.newDelimited("\r\n", socket)
// Convert the stream to a Kotlin channel
val channel = stream.toReceiveChannel(vertx)
// Run the coroutine
launch {
// Receive the request-line
// Non-blocking
val line = channel.receive().toString().split(" ")
val method = line[0]
val uri = line[1]
println("Received HTTP request ($method, $uri)")
// Still need to parse headers and body...
}
}
Parsing the request-line is as simple as calling receive
on the channel.
The next step parses HTTP headers by receiving chunks until we get an empty one
// Receive HTTP headers
val headers = HashMap<String, String>()
while (true) {
// Non-blocking
val header = channel.receive().toString()
// Done with parsing headers
if (header.isEmpty()) {
break
}
val pos = header.indexOf(':')
headers[header.substring(0, pos).lowercase(Locale.getDefault())] = header.substring(pos + 1).trim()
}
println("Received HTTP request ($method, $uri) with headers ${headers.keys}")
Finally, we terminate the parser by handling optional request bodies
// Receive the request body
val transferEncoding = headers["transfer-encoding"]
val contentLength = headers["content-length"]
val body: Buffer?
if (transferEncoding == "chunked") {
// Handle chunked encoding, e.g
// 5\r\n
// HELLO\r\n
// 0\r\n
// \r\n
body = Buffer.buffer()
while (true) {
// Parse length chunk
// Non-blocking
val len = channel.receive().toString().toInt(16)
if (len == 0) {
break
}
// The stream is flipped to parse a chunk of the exact size
stream.fixedSizeMode(len + 2)
// Receive the chunk and append it
// Non-blocking
val chunk = channel.receive()
body.appendBuffer(chunk, 0, chunk.length() - 2)
// The stream is flipped back to the \r\n delimiter to parse the next chunk
stream.delimitedMode("\r\n")
}
} else if (contentLength != null) {
// The stream is flipped to parse a body of the exact size
stream.fixedSizeMode(contentLength.toInt())
// Non-blocking
body = channel.receive()
} else {
body = null
}
val bodySize = body?.length() ?: 0
println("Received HTTP request ($method, $uri) with headers ${headers.keys} and body with size $bodySize")
Sending data
Using a channel to send data is quite straightforward:
suspend fun sendChannel(httpResponse : HttpServerResponse) {
val channel = httpResponse.toSendChannel(vertx)
while (true) {
val buffer = readBuffer()
// Broadcast the temperature
// Non-blocking but could be suspended
channel.send(buffer)
// Wait for one second
awaitEvent<Long> { vertx.setTimer(1000, it) }
}
}
Both SendChannel#send
and WriteStream#write
are non-blocking operations, however unlike SendChannel#send
can suspend the execution when the channel is full, the equivalent without a channel would look like
suspend fun sendChannel(httpResponse : HttpServerResponse) {
val channel = httpResponse.toSendChannel(vertx)
while (true) {
val buffer = readBuffer()
// Broadcast the temperature
// Non-blocking but could be suspended
channel.send(buffer)
// Wait for one second
awaitEvent<Long> { vertx.setTimer(1000, it) }
}
}
Delay, cancellation and timeouts
Vertx dispatcher fully supports coroutine delay
function via Vert.x timers:
launch {
// Set a one-second Vertx timer
delay(1000)
}
Timers support cancellation
val job = launch {
// Set a one-second Vertx timer
while (true) {
delay(1000)
// Do something periodically
}
}
// Sometimes later
job.cancel()
cancellation is cooperative
You can also schedule timeout with the withTimeout
function
launch {
try {
val id = withTimeout(1000) {
awaitEvent { anAsyncMethod(it) }
}
} catch (e: TimeoutCancellationException) {
// Cancelled
}
}
Coroutine builders
Vert.x works with all coroutine builders, as long as an instance of CoroutineScope
is available: launch
, async
, `produce', … . A couple of important things to remember:
-
The
runBlocking
doesn’t need aCoroutineScope
and must not be used from a Vert.x event loop thread. -
To avoid memory leaks, always use
coroutineScope {..}
to define a child scope. In this way, if a coroutine fails inside the scope, all the others, defined inside that scope, will be cancelled too.
Vert.x also provides a coroutine builder which returns a io.vertx.core.Future
instance.
// Can be called on any thread
val future1: Future<String> = vertxFuture(vertx) {
computeSomethingWithSuspendingFunction()
}
// Can be called only when running on a Vert.x context
val future2: Future<String> = vertxFuture {
computeSomethingWithSuspendingFunction()
}
Coroutine interoperability
Vert.x integration has been designed to be fully interoperable with Kotlin coroutines
-
kotlinx.coroutines.sync.Mutex
are executed on the event loop thread when using the vertx dispatcher
RxJava interoperability
The module vertx-lang-kotlin-coroutines
does not have specific integration with RxJava. However, Kotlin coroutines provide integration with RxJava, which works out nicely with vertx-lang-kotlin-coroutines
.
You can read more about it in Coroutines for reactive streams.
Vert.x API extensions for coroutines
Event bus
The Vert.x EventBus
and MessageConsumer
objects are extended with support for coroutines inside a coroutineEventBus
scope function:
val bus = vertx.eventBus()
coroutineEventBus {
bus.coConsumer<String>("some-address") {
computeSomethingWithSuspendingFunction()
it.reply("done")
}
}
The scope function is not necessary if the surrounding type implements |
Vert.x Web
The Vert.x Web Router
and Route
objects are extended with support for coroutines inside a coroutineRouter
scope function:
val router = Router.router(vertx)
coroutineRouter {
// Route.coRespond is similar to Route.respond but using a suspending function
router.get("/my-resource").coRespond {
// similar to Route.respond but using a suspending function
val response = computeSomethingWithSuspendingFunction()
response // sent by Vert.x to the client
}
// Router.coErrorHandler is similar to Router.errorHandler but using a suspending function
router.coErrorHandler(404) { rc ->
val html = computeHtmlPageWithSuspendingFunction()
rc.response().setStatusCode(404).putHeader(CONTENT_TYPE, TEXT_HTML).end(html)
}
}
The scope function is not necessary if the surrounding type implements |