This component provides a Kafka client for reading and sending messages from/to an Apache Kafka cluster.
As consumer, the API provides methods for subscribing to a topic partition receiving messages asynchronously or reading them as a stream (even with the possibility to pause/resume the stream).
As producer, the API provides methods for sending message to a topic partition like writing on a stream.
Warning
|
this module has the tech preview status, this means the API can change between versions. |
To use this component, add the following dependency to the dependencies section of your build descriptor:
Maven (in your pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
<version>${maven.version}</version>
</dependency>
Gradle (in your build.gradle
file):
compile io.vertx:vertx-kafka-client:${maven.version}
Creating consumers and producers is quite similar and on how it works using the native Kafka client library.
They need to be configured with a bunch of properties as described in the official Apache Kafka documentation, for the consumer and for the producer.
To achieve that, a map can be configured with such properties passing it to one of the
static creation methods exposed by KafkaConsumer
and
KafkaProducer
// creating the consumer using map config
var config = Map()
config + ("bootstrap.servers" -> "localhost:9092")
config + ("key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")
config + ("value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")
config + ("group.id" -> "my_group")
config + ("auto.offset.reset" -> "earliest")
config + ("enable.auto.commit" -> "false")
// use consumer for interacting with Apache Kafka
var consumer = KafkaConsumer.create(vertx, config)
In the above example, a KafkaConsumer
instance is created using
a map instance in order to specify the Kafka nodes list to connect (just one) and
the deserializers to use for getting key and value from each received message.
Likewise a producer can be created
// creating the producer using map and class types for key and value serializers/deserializers
var config = Map()
config + ("bootstrap.servers" -> "localhost:9092")
config + ("key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer")
config + ("value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer")
config + ("acks" -> "1")
// use producer for interacting with Apache Kafka
var producer = KafkaProducer.create(vertx, config)
In order to start receiving messages from Kafka topics, the consumer can use the
subscribe
method for
subscribing to a set of topics being part of a consumer group (specified by the properties on creation).
It’s also possible to use the subscribe
method for
subscribing to more topics specifying a Java regex.
You also need to register a handler for handling incoming messages using the
handler
.
// register the handler for incoming messages
consumer.handler((record: io.vertx.scala.kafka.client.consumer.KafkaConsumerRecord<java.lang.String,java.lang.String>) => {
println(s"Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})
// subscribe to several topics with list
var topics = new java.util.HashSet()
topics.add("topic1")
topics.add("topic2")
topics.add("topic3")
consumer.subscribe(topics)
// or using a Java regex
var pattern = java.util.regex.Pattern.compile("topic\\d")
consumer.subscribe(pattern)
// or just subscribe to a single topic
consumer.subscribe("a-single-topic")
The handler can be registered before or after the call to subscribe()
; messages won’t be consumed until both
methods have been called. This allows you to call subscribe()
, then seek()
and finally handler()
in
order to only consume messages starting from a particular offset, for example.
A handler can also be passed during subscription to be aware of the subscription result and being notified when the operation is completed.
// register the handler for incoming messages
consumer.handler((record: io.vertx.scala.kafka.client.consumer.KafkaConsumerRecord<java.lang.String,java.lang.String>) => {
println(s"Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})
// subscribe to several topics
var topics = new java.util.HashSet()
topics.add("topic1")
topics.add("topic2")
topics.add("topic3")
consumer.subscribeFuture(topics).onComplete{
case Success(result) => {
println("subscribed")
}
case Failure(cause) => {
println(s"$cause")
}
}
// or just subscribe to a single topic
consumer.subscribeFuture("a-single-topic").onComplete{
case Success(result) => {
println("subscribed")
}
case Failure(cause) => {
println(s"$cause")
}
}
Using the consumer group way, the Kafka cluster assigns partitions to the consumer taking into account other connected consumers in the same consumer group, so that partitions can be spread across them.
The Kafka cluster handles partitions re-balancing when a consumer leaves the group (so assigned partitions are free to be assigned to other consumers) or a new consumer joins the group (so it wants partitions to read from).
You can register handlers on a KafkaConsumer
to be notified
of the partitions revocations and assignments by the Kafka cluster using
partitionsRevokedHandler
and
partitionsAssignedHandler
.
// register the handler for incoming messages
consumer.handler((record: io.vertx.scala.kafka.client.consumer.KafkaConsumerRecord<java.lang.String,java.lang.String>) => {
println(s"Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})
// registering handlers for assigned and revoked partitions
consumer.partitionsAssignedHandler((topicPartitions: java.util.Set<io.vertx.scala.kafka.client.common.TopicPartition>) => {
println("Partitions assigned")
topicPartitions.foreach(topicPartition => {
println(s"${todo-renderDataObjectMemberSelect} ${todo-renderDataObjectMemberSelect}")
})
})
consumer.partitionsRevokedHandler((topicPartitions: java.util.Set<io.vertx.scala.kafka.client.common.TopicPartition>) => {
println("Partitions revoked")
topicPartitions.foreach(topicPartition => {
println(s"${todo-renderDataObjectMemberSelect} ${todo-renderDataObjectMemberSelect}")
})
})
// subscribes to the topic
consumer.subscribeFuture("test").onComplete{
case Success(result) => {
println("Consumer subscribed")
}
case Failure(cause) => println("Failure")
}
After joining a consumer group for receiving messages, a consumer can decide to leave the consumer group in order to
not get messages anymore using unsubscribe
// consumer is already member of a consumer group
// unsubscribing request
consumer.unsubscribe()
You can add an handler to be notified of the result
// consumer is already member of a consumer group
// unsubscribing request
consumer.unsubscribeFuture().onComplete{
case Success(result) => {
println("Consumer unsubscribed")
}
case Failure(cause) => println("Failure")
}
Besides being part of a consumer group for receiving messages from a topic, a consumer can ask for a specific topic partition. When the consumer is not part part of a consumer group the overall application cannot rely on the re-balancing feature.
You can use assign
in order to ask for specific partitions.
// register the handler for incoming messages
consumer.handler((record: io.vertx.scala.kafka.client.consumer.KafkaConsumerRecord<java.lang.String,java.lang.String>) => {
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})
//
var topicPartitions = new java.util.HashSet()
topicPartitions.add(TopicPartition()
.setTopic("test")
.setPartition(0)
)
// requesting to be assigned the specific partition
consumer.assignFuture(topicPartitions).onComplete{
case Success(result) => {
println("Partition assigned")
// requesting the assigned partitions
consumer.assignmentFuture().onComplete{
case Success(result) => {
result.foreach(topicPartition => {
println(s"${todo-renderDataObjectMemberSelect} ${todo-renderDataObjectMemberSelect}")
})
}
case Failure(cause) => println("Failure")
}
}
case Failure(cause) => println("Failure")
}
As with subscribe()
, the handler can be registered before or after the call to assign()
;
messages won’t be consumed until both methods have been called. This allows you to call
assign()
, then seek()
and finally handler()
in
order to only consume messages starting from a particular offset, for example.
Calling assignment
provides
the list of the current assigned partitions.
Other than using the internal polling mechanism in order to receive messages from Kafka, the client can subscribe to a
topic, avoiding to register the handler for getting the messages and then using the poll
method.
In this way, the user application is in charge to execute the poll for getting messages when it needs, for example after processing the previous ones.
// subscribes to the topic
consumer.subscribeFuture("test").onComplete{
case Success(result) => {
println("Consumer subscribed")
vertx.setPeriodic(1000, (timerId: java.lang.Long) => {
consumer.pollFuture(100).onComplete{
case Success(result) => {
var records = result
for ( i <- 0 until records.size()) {
var record = records.recordAt(i)
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
}
}
case Failure(cause) => println("Failure")
}
})
}
case Failure(cause) => println("Failure")
}
After subscribing successfully, the application start a periodic timer in order to execute the poll and getting messages from Kafka periodically.
You can change the subscribed topics, or assigned partitions after you have started to consume messages, simply
by calling subscribe()
or assign()
again.
Note that due to internal buffering of messages it is possible that the record handler will continue to
observe messages from the old subscription or assignment after the subscribe()
or assign()
method’s completion handler has been called. This is not the case for messages observed by the batch handler:
Once the completion handler has been called it will only observe messages read from the subscription or assignment.
You can call the partitionsFor
to get information about
partitions for a specified topic
// asking partitions information about specific topic
consumer.partitionsForFuture("test").onComplete{
case Success(result) => {
result.foreach(partitionInfo => {
println(partitionInfo)
})
}
case Failure(cause) => println("Failure")
}
In addition listTopics
provides all available topics
with related partitions
// asking information about available topics and related partitions
consumer.listTopicsFuture().onComplete{
case Success(result) => {
var map = result
map.foreach{
case (topic: java.lang.String, partitions: java.util.List<io.vertx.scala.kafka.client.common.PartitionInfo>) => {
println(s"topic = ${topic}")
println(s"partitions = ${map(topic)}")
}}
}
case Failure(cause) => println("Failure")
}
In Apache Kafka the consumer is in charge to handle the offset of the last read message.
This is executed by the commit operation executed automatically every time a bunch of messages are read
from a topic partition. The configuration parameter enable.auto.commit
must be set to true
when the
consumer is created.
Manual offset commit, can be achieved with commit
.
It can be used to achieve at least once delivery to be sure that the read messages are processed before committing
the offset.
// consumer is processing read messages
// committing offset of the last read message
consumer.commitFuture().onComplete{
case Success(result) => {
println("Last read message offset committed")
}
case Failure(cause) => println("Failure")
}
Apache Kafka can retain messages for a long period of time and the consumer can seek inside a topic partition and obtain arbitrary access to the messages.
You can use seek
to change the offset for reading at a specific
position
var topicPartition = TopicPartition()
.setTopic("test")
.setPartition(0)
// seek to a specific offset
consumer.seekFuture(topicPartition, 10).onComplete{
case Success(result) => {
println("Seeking done")
}
case Failure(cause) => println("Failure")
}
When the consumer needs to re-read the stream from the beginning, it can use seekToBeginning
var topicPartition = TopicPartition()
.setTopic("test")
.setPartition(0)
// seek to the beginning of the partition
consumer.seekToBeginningFuture(java.util.Collections.singleton(topicPartition)).onComplete{
case Success(result) => {
println("Seeking done")
}
case Failure(cause) => println("Failure")
}
Finally seekToEnd
can be used to come back at the end of the partition
var topicPartition = TopicPartition()
.setTopic("test")
.setPartition(0)
// seek to the end of the partition
consumer.seekToEndFuture(java.util.Collections.singleton(topicPartition)).onComplete{
case Success(result) => {
println("Seeking done")
}
case Failure(cause) => println("Failure")
}
Note that due to internal buffering of messages it is possible that the record handler will continue to
observe messages read from the original offset for a time after the seek*()
method’s completion
handler has been called. This is not the case for messages observed by the batch handler: Once the
seek*()
completion handler has been called it will only observe messages read from the new offset.
You can use the beginningOffsets API introduced in Kafka 0.10.1.1 to get the first offset
for a given partition. In contrast to seekToBeginning
,
it does not change the consumer’s offset.
var topicPartitions = new java.util.HashSet()
var topicPartition = TopicPartition()
.setTopic("test")
.setPartition(0)
topicPartitions.add(topicPartition)
consumer.beginningOffsetsFuture(topicPartitions).onComplete{
case Success(result) => {
var results = result
results.foreach{
case (topic: io.vertx.scala.kafka.client.common.TopicPartition, beginningOffset: java.lang.Long) => {
println(s"Beginning offset for topic=${todo-renderDataObjectMemberSelect}, partition=${todo-renderDataObjectMemberSelect}, beginningOffset=${beginningOffset}")
}}
}
case Failure(cause) => println("Failure")
}
// Convenience method for single-partition lookup
consumer.beginningOffsetsFuture(topicPartition).onComplete{
case Success(result) => {
var beginningOffset = result
println(s"Beginning offset for topic=${todo-renderDataObjectMemberSelect}, partition=${todo-renderDataObjectMemberSelect}, beginningOffset=${beginningOffset}")
}
case Failure(cause) => println("Failure")
}
You can use the endOffsets API introduced in Kafka 0.10.1.1 to get the last offset
for a given partition. In contrast to seekToEnd
,
it does not change the consumer’s offset.
var topicPartitions = new java.util.HashSet()
var topicPartition = TopicPartition()
.setTopic("test")
.setPartition(0)
topicPartitions.add(topicPartition)
consumer.endOffsetsFuture(topicPartitions).onComplete{
case Success(result) => {
var results = result
results.foreach{
case (topic: io.vertx.scala.kafka.client.common.TopicPartition, endOffset: java.lang.Long) => {
println(s"End offset for topic=${todo-renderDataObjectMemberSelect}, partition=${todo-renderDataObjectMemberSelect}, endOffset=${endOffset}")
}}
}
case Failure(cause) => println("Failure")
}
// Convenience method for single-partition lookup
consumer.endOffsetsFuture(topicPartition).onComplete{
case Success(result) => {
var endOffset = result
println(s"End offset for topic=${todo-renderDataObjectMemberSelect}, partition=${todo-renderDataObjectMemberSelect}, endOffset=${endOffset}")
}
case Failure(cause) => println("Failure")
}
You can use the offsetsForTimes API introduced in Kafka 0.10.1.1 to look up an offset by timestamp, i.e. search parameter is an epoch timestamp and the call returns the lowest offset with ingestion timestamp >= given timestamp.
Code not translatable
A consumer can control the incoming message flow and pause/resume the read operation from a topic, e.g it can pause the message flow when it needs more time to process the actual messages and then resume to continue message processing.
In the case of the partition-specific pause and resume it is possible that the record handler will continue to
observe messages from a paused partition for a time after the pause()
method’s completion
handler has been called. This is not the case for messages observed by the batch handler: Once the
pause()
completion handler has been called it will only observe messages from those partitions which
are not paused.
var topicPartition = TopicPartition()
.setTopic("test")
.setPartition(0)
// registering the handler for incoming messages
consumer.handler((record: io.vertx.scala.kafka.client.consumer.KafkaConsumerRecord<java.lang.String,java.lang.String>) => {
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
// i.e. pause/resume on partition 0, after reading message up to offset 5
if ((record.partition() == 0) && (record.offset() == 5)) {
// pause the read operations
consumer.pauseFuture(topicPartition).onComplete{
case Success(result) => {
println("Paused")
// resume read operation after a specific time
vertx.setTimer(5000, (timeId: java.lang.Long) => {
// resume read operations
consumer.resume(topicPartition)
})
}
case Failure(cause) => println("Failure")
}
}
})
Call close to close the consumer. Closing the consumer closes any open connections and releases all consumer resources.
The close is actually asynchronous and might not complete until some time after the call has returned. If you want to be notified when the actual close has completed then you can pass in a handler.
This handler will then be called when the close has fully completed.
consumer.closeFuture().onComplete{
case Success(result) => {
println("Consumer is now closed")
}
case Failure(cause) => {
println(s"$cause")
}
}
You can use write
to send messages (records) to a topic.
The simplest way to send a message is to specify only the destination topic and the related value, omitting its key or partition, in this case the messages are sent in a round robin fashion across all the partitions of the topic.
for ( i <- 0 until 5) {
// only topic and message value are specified, round robin on destination partitions
var record = KafkaProducerRecord.create("test", s"message_${i}")
producer.write(record)
}
You can receive message sent metadata like its topic, its destination partition and its assigned offset.
for ( i <- 0 until 5) {
// only topic and message value are specified, round robin on destination partitions
var record = KafkaProducerRecord.create("test", s"message_${i}")
producer.sendFuture(record).onComplete{
case Success(result) => {
var recordMetadata = result
println(s"Message ${record.value()} written on topic=${todo-renderDataObjectMemberSelect}, partition=${todo-renderDataObjectMemberSelect}, offset=${todo-renderDataObjectMemberSelect}")
}
case Failure(cause) => println("Failure")
}
}
When you need to assign a partition to a message, you can specify its partition identifier or its key
for ( i <- 0 until 10) {
// a destination partition is specified
var record = KafkaProducerRecord.create("test", null, s"message_${i}", 0)
producer.write(record)
}
Since the producers identifies the destination using key hashing, you can use that to guarantee that all messages with the same key are sent to the same partition and retain the order.
for ( i <- 0 until 10) {
// i.e. defining different keys for odd and even messages
var key = i % 2
// a key is specified, all messages with same key will be sent to the same partition
var record = KafkaProducerRecord.create("test", java.lang.String.valueOf(key), s"message_${i}")
producer.write(record)
}
Note
|
the shared producer is created on the first createShared call and its configuration is defined at this moment,
shared producer usage must use the same configuration.
|
Sometimes you want to share the same producer from within several verticles or contexts.
Calling KafkaProducer.createShared
returns a producer that can be shared safely.
// Create a shared producer identified by 'the-producer'
var producer1 = KafkaProducer.createShared(vertx, "the-producer", config)
// Sometimes later you can close it
producer1.close()
The same resources (thread, connection) will be shared between the producer returned by this method.
When you are done with the producer, just close it, when all shared producers are closed, the resources will be released for you.
Call close to close the producer. Closing the producer closes any open connections and releases all producer resources.
The close is actually asynchronous and might not complete until some time after the call has returned. If you want to be notified when the actual close has completed then you can pass in a handler.
This handler will then be called when the close has fully completed.
producer.closeFuture().onComplete{
case Success(result) => {
println("Producer is now closed")
}
case Failure(cause) => {
println(s"$cause")
}
}
You can call the partitionsFor
to get information about
partitions for a specified topic:
// asking partitions information about specific topic
producer.partitionsForFuture("test").onComplete{
case Success(result) => {
result.foreach(partitionInfo => {
println(partitionInfo)
})
}
case Failure(cause) => println("Failure")
}
Errors handling (e.g timeout) between a Kafka client (consumer or producer) and the Kafka cluster is done using
exceptionHandler
or
exceptionHandler
// setting handler for errors
consumer.exceptionHandler((e: java.lang.Throwable) => {
println(s"Error = ${e.getMessage()}")
})
If you’re creating consumers and producer from inside verticles, those consumers and producers will be automatically closed when the verticle is undeployed.
Vert.x Kafka client comes out of the box with serializers and deserializers for buffers, json object and json array.
In a consumer you can use buffers
// Creating a consumer able to deserialize to buffers
var config = Map()
config + ("bootstrap.servers" -> "localhost:9092")
config + ("key.deserializer" -> "io.vertx.kafka.client.serialization.BufferDeserializer")
config + ("value.deserializer" -> "io.vertx.kafka.client.serialization.BufferDeserializer")
config + ("group.id" -> "my_group")
config + ("auto.offset.reset" -> "earliest")
config + ("enable.auto.commit" -> "false")
// Creating a consumer able to deserialize to json object
config = Map()
config + ("bootstrap.servers" -> "localhost:9092")
config + ("key.deserializer" -> "io.vertx.kafka.client.serialization.JsonObjectDeserializer")
config + ("value.deserializer" -> "io.vertx.kafka.client.serialization.JsonObjectDeserializer")
config + ("group.id" -> "my_group")
config + ("auto.offset.reset" -> "earliest")
config + ("enable.auto.commit" -> "false")
// Creating a consumer able to deserialize to json array
config = Map()
config + ("bootstrap.servers" -> "localhost:9092")
config + ("key.deserializer" -> "io.vertx.kafka.client.serialization.JsonArrayDeserializer")
config + ("value.deserializer" -> "io.vertx.kafka.client.serialization.JsonArrayDeserializer")
config + ("group.id" -> "my_group")
config + ("auto.offset.reset" -> "earliest")
config + ("enable.auto.commit" -> "false")
Or in a producer
// Creating a producer able to serialize to buffers
var config = Map()
config + ("bootstrap.servers" -> "localhost:9092")
config + ("key.serializer" -> "io.vertx.kafka.client.serialization.BufferSerializer")
config + ("value.serializer" -> "io.vertx.kafka.client.serialization.BufferSerializer")
config + ("acks" -> "1")
// Creating a producer able to serialize to json object
config = Map()
config + ("bootstrap.servers" -> "localhost:9092")
config + ("key.serializer" -> "io.vertx.kafka.client.serialization.JsonObjectSerializer")
config + ("value.serializer" -> "io.vertx.kafka.client.serialization.JsonObjectSerializer")
config + ("acks" -> "1")
// Creating a producer able to serialize to json array
config = Map()
config + ("bootstrap.servers" -> "localhost:9092")
config + ("key.serializer" -> "io.vertx.kafka.client.serialization.JsonArraySerializer")
config + ("value.serializer" -> "io.vertx.kafka.client.serialization.JsonArraySerializer")
config + ("acks" -> "1")
This component provides a vert.x wrapper around the most important functions of Kafka’s AdminUtils. AdminUtils are used to create, modify, and delete topics. Other functionality covered by AdminUtils, but not this wrapper, includes Partition Management, Broker Configuration management, etc.
Warning
|
this class is now deprecated see KafkaAdminClient instead.
|
You can call createTopic
to create a topic.
Parameters are: topic name, number of partitions, number of replicas, and the usual callback to handle the result.
It might return an error, e.g. if the number of requested replicas is greater than the number of brokers.
var adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Create topic 'myNewTopic' with 2 partition and 1 replicas
adminUtils.createTopicFuture("myNewTopic", 2, 1).onComplete{
case Success(result) => {
println("Creation of topic myNewTopic successful!")}
case Failure(cause) => {
println(s"$cause")
}
}
You can call deleteTopic
to delete a topic.
Parameters are: topic name, and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
var adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Delete topic 'myNewTopic'
adminUtils.deleteTopicFuture("myNewTopic").onComplete{
case Success(result) => {
println("Deletion of topic myNewTopic successful!")}
case Failure(cause) => {
println(s"$cause")
}
}
If you need to update the configuration of a topic, e.g., you want to update the retention policy,
you can call changeTopicConfig
to update a topic.
Parameters are: topic name, a Map (String → String) with parameters to be changed,
and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
var adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Set retention to 1000 ms and max size of the topic partition to 1 kiByte
var properties = Map()
properties + ("delete.retention.ms" -> "1000")
properties + ("retention.bytes" -> "1024")
adminUtils.changeTopicConfigFuture("myNewTopic", properties).onComplete{
case Success(result) => {
println("Configuration change of topic myNewTopic successful!")}
case Failure(cause) => {
println(s"$cause")
}
}
}
If you want to check if a topic exists, you can call topicExists
.
Parameters are: topic name, and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
var adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
adminUtils.topicExistsFuture("myNewTopic").onComplete{
case Success(result) => {
println(s"Topic myNewTopic exists: ${result}")
}
case Failure(cause) => {
println(s"$cause")
}
}