Skip to main content

This component provides a Kafka client for reading and sending messages from/to an Apache Kafka cluster.

As consumer, the API provides methods for subscribing to a topic partition receiving messages asynchronously or reading them as a stream (even with the possibility to pause/resume the stream).

As producer, the API provides methods for sending message to a topic partition like writing on a stream.

Warning
this module has the tech preview status, this means the API can change between versions.

Using the Vert.x Kafka client

As component not yet officially released in the Vert.x stack, to use the Vert.x Kafka client current snapshot version, add the following repository under the repositories section and the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-kafka-client</artifactId>
  <version>3.4.1</version>
</dependency>
  • Gradle (in your build.gradle file):

compile io.vertx:vertx-kafka-client:3.4.1

Creating Kafka clients

Creating consumers and sproducer is quite similar and on how it works using the native Kafka client library.

They need to be configured with a bunch of properties as described in the official Apache Kafka documentation, for the consumer and for the producer.

To achieve that, a map can be configured with such properties passing it to one of the static creation methods exposed by KafkaConsumer and KafkaProducer

// creating the consumer using map config
var config = mutableMapOf<String, Any?>()
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
config["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"

// use consumer for interacting with Apache Kafka
var consumer = KafkaConsumer.create<Any, Any>(vertx, config)

In the above example, a KafkaConsumer instance is created using a map instance in order to specify the Kafka nodes list to connect (just one) and the deserializers to use for getting key and value from each received message.

Likewise a producer can be created

// creating the producer using map and class types for key and value serializers/deserializers
var config = mutableMapOf<String, Any?>()
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
config["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
config["acks"] = "1"

// use producer for interacting with Apache Kafka
var producer = KafkaProducer.create<Any, Any>(vertx, config)

Another way is to use a Properties instance instead of the map.

// creating the consumer using properties config
var config = java.util.Properties()
config.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
config.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.`class`)
config.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.`class`)
config.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "my_group")
config.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
config.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

// use consumer for interacting with Apache Kafka
var consumer = KafkaConsumer.create<Any, Any>(vertx, config)

More advanced creation methods allow to specify the class type for the key and the value used for sending messages or provided by received messages; this is a way for setting the key and value serializers/deserializers instead of using the related properties for that

// creating the producer using map and class types for key and value serializers/deserializers
var config = java.util.Properties()
config.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
config.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, "1")

// use producer for interacting with Apache Kafka
var producer = KafkaProducer.create(vertx, config, String.`class`, String.`class`)

Here the KafkaProducer instance is created in using a Properties for specifying Kafka nodes list to connect (just one) and the acknowledgment mode; the key and value deserializers are specified as parameters of KafkaProducer.create.

Receiving messages from a topic joining a consumer group

In order to start receiving messages from Kafka topics, the consumer can use the subscribe method for subscribing to a set of topics being part of a consumer group (specified by the properties on creation).

You need to register an handler for handling incoming messages using the handler

// register the handler for incoming messages
consumer.handler({ record ->
  println("Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})

// subscribe to several topics
var topics = java.util.HashSet()
topics.add("topic1")
topics.add("topic2")
topics.add("topic3")
consumer.subscribe(topics)

// or just subscribe to a single topic
consumer.subscribe("a-single-topic")

An handler can also be passed during subscription to be aware of the subscription result and being notified when the operation is completed.

// register the handler for incoming messages
consumer.handler({ record ->
  println("Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})

// subscribe to several topics
var topics = java.util.HashSet()
topics.add("topic1")
topics.add("topic2")
topics.add("topic3")
consumer.subscribe(topics, { ar ->
  if (ar.succeeded()) {
    println("subscribed")
  } else {
    println("Could not subscribe ${ar.cause().getMessage()}")
  }
})

// or just subscribe to a single topic
consumer.subscribe("a-single-topic", { ar ->
  if (ar.succeeded()) {
    println("subscribed")
  } else {
    println("Could not subscribe ${ar.cause().getMessage()}")
  }
})

Using the consumer group way, the Kafka cluster assigns partitions to the consumer taking into account other connected consumers in the same consumer group, so that partitions can be spread across them.

The Kafka cluster handles partitions re-balancing when a consumer leaves the group (so assigned partitions are free to be assigned to other consumers) or a new consumer joins the group (so it wants partitions to read from).

You can register handlers on a KafkaConsumer to be notified of the partitions revocations and assignments by the Kafka cluster using partitionsRevokedHandler and partitionsAssignedHandler.

// register the handler for incoming messages
consumer.handler({ record ->
  println("Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})

// registering handlers for assigned and revoked partitions
consumer.partitionsAssignedHandler({ topicPartitions ->

  println("Partitions assigned")
  for (topicPartition in topicPartitions) {
    println("${topicPartition.topic} ${topicPartition.partition}")
  }
})

consumer.partitionsRevokedHandler({ topicPartitions ->

  println("Partitions revoked")
  for (topicPartition in topicPartitions) {
    println("${topicPartition.topic} ${topicPartition.partition}")
  }
})

// subscribes to the topic
consumer.subscribe("test", { ar ->

  if (ar.succeeded()) {
    println("Consumer subscribed")
  }
})

After joining a consumer group for receiving messages, a consumer can decide to leave the consumer group in order to not get messages anymore using unsubscribe

// consumer is already member of a consumer group

// unsubscribing request
consumer.unsubscribe()

You can add an handler to be notified of the result

// consumer is already member of a consumer group

// unsubscribing request
consumer.unsubscribe({ ar ->

  if (ar.succeeded()) {
    println("Consumer unsubscribed")
  }
})

Receiving messages from a topic requesting specific partitions

Besides being part of a consumer group for receiving messages from a topic, a consumer can ask for a specific topic partition. When the consumer is not part part of a consumer group the overall application cannot rely on the re-balancing feature.

You can use assign in order to ask for specific partitions.

// register the handler for incoming messages
consumer.handler({ record ->
  println("key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})

//
var topicPartitions = java.util.HashSet()
topicPartitions.add(TopicPartition(
  topic = "test",
  partition = 0))

// requesting to be assigned the specific partition
consumer.assign(topicPartitions, { done ->

  if (done.succeeded()) {
    println("Partition assigned")

    // requesting the assigned partitions
    consumer.assignment({ done1 ->

      if (done1.succeeded()) {

        for (topicPartition in done1.result()) {
          println("${topicPartition.topic} ${topicPartition.partition}")
        }
      }
    })
  }
})

Calling assignment provides the list of the current assigned partitions.

Getting topic partition information

You can call the partitionsFor to get information about partitions for a specified topic

// asking partitions information about specific topic
consumer.partitionsFor("test", { ar ->

  if (ar.succeeded()) {

    for (partitionInfo in ar.result()) {
      println(partitionInfo)
    }
  }
})

In addition listTopics provides all available topics with related partitions

// asking information about available topics and related partitions
consumer.listTopics({ ar ->

  if (ar.succeeded()) {

    var map = ar.result()
    for ((topic, partitions) in map) {
      println("topic = ${topic}")
      println("partitions = ${map[topic]}")
    }

  }
})

Manual offset commit

In Apache Kafka the consumer is in charge to handle the offset of the last read message.

This is executed by the commit operation executed automatically every time a bunch of messages are read from a topic partition. The configuration parameter enable.auto.commit must be set to true when the consumer is created.

Manual offset commit, can be achieved with commit. It can be used to achieve at least once delivery to be sure that the read messages are processed before committing the offset.

// consumer is processing read messages

// committing offset of the last read message
consumer.commit({ ar ->

  if (ar.succeeded()) {
    println("Last read message offset committed")
  }
})

Seeking in a topic partition

Apache Kafka can retain messages for a long period of time and the consumer can seek inside a topic partition and obtain arbitrary access to the messages.

You can use seek to change the offset for reading at a specific position

var topicPartition = TopicPartition(
  topic = "test",
  partition = 0)

// seek to a specific offset
consumer.seek(topicPartition, 10, { done ->

  if (done.succeeded()) {
    println("Seeking done")
  }
})

When the consumer needs to re-read the stream from the beginning, it can use seekToBeginning

var topicPartition = TopicPartition(
  topic = "test",
  partition = 0)

// seek to the beginning of the partition
consumer.seekToBeginning(java.util.Collections.singleton(topicPartition), { done ->

  if (done.succeeded()) {
    println("Seeking done")
  }
})

Finally seekToEnd can be used to come back at the end of the partition

var topicPartition = TopicPartition(
  topic = "test",
  partition = 0)

// seek to the end of the partition
consumer.seekToEnd(java.util.Collections.singleton(topicPartition), { done ->

  if (done.succeeded()) {
    println("Seeking done")
  }
})

Offset lookup

You can use the beginningOffsets API introduced in Kafka 0.10.1.1 to get the first offset for a given partition. In contrast to seekToBeginning, it does not change the consumer’s offset.

var topicPartitions = java.util.HashSet()
var topicPartition = TopicPartition(
  topic = "test",
  partition = 0)
topicPartitions.add(topicPartition)

consumer.beginningOffsets(topicPartitions, { done ->
  if (done.succeeded()) {
    var results = done.result()
    for ((topic, beginningOffset) in results) {
      println("Beginning offset for topic=${topic.topic}, partition=${topic.partition}, beginningOffset=${beginningOffset}")
    }

  }
})

// Convenience method for single-partition lookup
consumer.beginningOffsets(topicPartition, { done ->
  if (done.succeeded()) {
    var beginningOffset = done.result()
    println("Beginning offset for topic=${topicPartition.topic}, partition=${topicPartition.partition}, beginningOffset=${beginningOffset}")
  }
})

You can use the endOffsets API introduced in Kafka 0.10.1.1 to get the last offset for a given partition. In contrast to seekToEnd, it does not change the consumer’s offset.

var topicPartitions = java.util.HashSet()
var topicPartition = TopicPartition(
  topic = "test",
  partition = 0)
topicPartitions.add(topicPartition)

consumer.endOffsets(topicPartitions, { done ->
  if (done.succeeded()) {
    var results = done.result()
    for ((topic, endOffset) in results) {
      println("End offset for topic=${topic.topic}, partition=${topic.partition}, endOffset=${endOffset}")
    }

  }
})

// Convenience method for single-partition lookup
consumer.endOffsets(topicPartition, { done ->
  if (done.succeeded()) {
    var endOffset = done.result()
    println("End offset for topic=${topicPartition.topic}, partition=${topicPartition.partition}, endOffset=${endOffset}")
  }
})

You can use the offsetsForTimes API introduced in Kafka 0.10.1.1 to look up an offset by timestamp, i.e. search parameter is an epoch timestamp and the call returns the lowest offset with ingestion timestamp >= given timestamp.

Code not translatable

Message flow control

A consumer can control the incoming message flow and pause/resume the read operation from a topic, e.g it can pause the message flow when it needs more time to process the actual messages and then resume to continue message processing.

To achieve that you can use pause and resume

var topicPartition = TopicPartition(
  topic = "test",
  partition = 0)

// registering the handler for incoming messages
consumer.handler({ record ->
  println("key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")

  // i.e. pause/resume on partition 0, after reading message up to offset 5
  if ((record.partition() == 0) && (record.offset() == 5)) {

    // pause the read operations
    consumer.pause(topicPartition, { ar ->

      if (ar.succeeded()) {

        println("Paused")

        // resume read operation after a specific time
        vertx.setTimer(5000, { timeId ->

          // resumi read operations
          consumer.resume(topicPartition)
        })
      }
    })
  }
})

Closing a consumer

Call close to close the consumer. Closing the consumer closes any open connections and releases all consumer resources.

The close is actually asynchronous and might not complete until some time after the call has returned. If you want to be notified when the actual close has completed then you can pass in a handler.

This handler will then be called when the close has fully completed.

consumer.close({ res ->
  if (res.succeeded()) {
    println("Consumer is now closed")
  } else {
    println("close failed")
  }
})

Sending messages to a topic

You can use write to send messages (records) to a topic.

The simplest way to send a message is to specify only the destination topic and the related value, omitting its key or partition, in this case the messages are sent in a round robin fashion across all the partitions of the topic.

for (i in 0 until 5) {

  // only topic and message value are specified, round robin on destination partitions
  var record = KafkaProducerRecord.create<Any, String>("test", "message_${i}")

  producer.write(record)

}

You can receive message sent metadata like its topic, its destination partition and its assigned offset.

for (i in 0 until 5) {

  // only topic and message value are specified, round robin on destination partitions
  var record = KafkaProducerRecord.create<Any, String>("test", "message_${i}")

  producer.write(record, { done ->

    if (done.succeeded()) {

      var recordMetadata = done.result()
      println("Message ${record.value()} written on topic=${recordMetadata.topic}, partition=${recordMetadata.partition}, offset=${recordMetadata.offset}")
    }

  })

}

When you need to assign a partition to a message, you can specify its partition identifier or its key

for (i in 0 until 10) {

  // a destination partition is specified
  var record = KafkaProducerRecord.create<Any, String>("test", null, "message_${i}", 0)

  producer.write(record)

}

Since the producers identifies the destination using key hashing, you can use that to guarantee that all messages with the same key are sent to the same partition and retain the order.

for (i in 0 until 10) {

  // i.e. defining different keys for odd and even messages
  var key = i % 2

  // a key is specified, all messages with same key will be sent to the same partition
  var record = KafkaProducerRecord.create("test", String.valueOf(key), "message_${i}")

  producer.write(record)

}
Note
the shared producer is created on the first createShared call and its configuration is defined at this moment, shared producer usage must use the same configuration.

Sharing a producer

Sometimes you want to share the same producer from within several verticles or contexts.

Calling KafkaProducer.createShared returns a producer that can be shared safely.

// Create a shared producer identified by 'the-producer'
var producer1 = KafkaProducer.createShared<Any, Any>(vertx, "the-producer", config)

// Sometimes later you can close it
producer1.close()

The same resources (thread, connection) will be shared between the producer returned by this method.

When you are done with the producer, just close it, when all shared producers are closed, the resources will be released for you.

Closing a producer

Call close to close the producer. Closing the producer closes any open connections and releases all producer resources.

The close is actually asynchronous and might not complete until some time after the call has returned. If you want to be notified when the actual close has completed then you can pass in a handler.

This handler will then be called when the close has fully completed.

producer.close({ res ->
  if (res.succeeded()) {
    println("Producer is now closed")
  } else {
    println("close failed")
  }
})

Getting topic partition information

You can call the partitionsFor to get information about partitions for a specified topic:

// asking partitions information about specific topic
producer.partitionsFor("test", { ar ->

  if (ar.succeeded()) {

    for (partitionInfo in ar.result()) {
      println(partitionInfo)
    }
  }
})

Handling errors

Errors handling (e.g timeout) between a Kafka client (consumer or producer) and the Kafka cluster is done using exceptionHandler or exceptionHandler

// setting handler for errors
consumer.exceptionHandler({ e ->
  println("Error = ${e.getMessage()}")
})

Automatic clean-up in verticles

If you’re creating consumers and producer from inside verticles, those consumers and producers will be automatically closed when the verticle is undeployed.

Using Vert.x serializers/deserizaliers

Vert.x Kafka client comes out of the box with serializers and deserializers for buffers, json object and json array.

In a consumer you can use buffers

// Creating a consumer able to deserialize to buffers
var config = mutableMapOf<String, Any?>()
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "io.vertx.kafka.client.serialization.BufferDeserializer"
config["value.deserializer"] = "io.vertx.kafka.client.serialization.BufferDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"

// Creating a consumer able to deserialize to json object
config = mutableMapOf<String, Any?>()
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "io.vertx.kafka.client.serialization.JsonObjectDeserializer"
config["value.deserializer"] = "io.vertx.kafka.client.serialization.JsonObjectDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"

// Creating a consumer able to deserialize to json array
config = mutableMapOf<String, Any?>()
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "io.vertx.kafka.client.serialization.JsonArrayDeserializer"
config["value.deserializer"] = "io.vertx.kafka.client.serialization.JsonArrayDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"

Or in a producer

// Creating a producer able to serialize to buffers
var config = mutableMapOf<String, Any?>()
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "io.vertx.kafka.client.serialization.BufferSerializer"
config["value.serializer"] = "io.vertx.kafka.client.serialization.BufferSerializer"
config["acks"] = "1"

// Creating a producer able to serialize to json object
config = mutableMapOf<String, Any?>()
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "io.vertx.kafka.client.serialization.JsonObjectSerializer"
config["value.serializer"] = "io.vertx.kafka.client.serialization.JsonObjectSerializer"
config["acks"] = "1"

// Creating a producer able to serialize to json array
config = mutableMapOf<String, Any?>()
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "io.vertx.kafka.client.serialization.JsonArraySerializer"
config["value.serializer"] = "io.vertx.kafka.client.serialization.JsonArraySerializer"
config["acks"] = "1"

You can also specify the serizalizers/deserializers at creation time:

In a consumer

Code not translatable

Or in a producer

Code not translatable

Stream implementation and native Kafka objects

When you want to operate on native Kafka records you can use a stream oriented implementation which handles native Kafka objects.

The KafkaReadStream shall be used for reading topic partitions, it is a read stream of ConsumerRecord objects.

The KafkaWriteStream shall be used for writing to topics, it is a write stream of ProducerRecord.

The API exposed by these interfaces is mostly the same than the polyglot version.