Vert.x Kafka client

This component provides a Kafka client for reading and sending messages from/to an Apache Kafka cluster.

As consumer, the API provides methods for subscribing to a topic partition receiving messages asynchronously or reading them as a stream (even with the possibility to pause/resume the stream).

As producer, the API provides methods for sending message to a topic partition like writing on a stream.

Using the Vert.x Kafka client

To use this component, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-kafka-client</artifactId>
 <version>4.5.11</version>
</dependency>
  • Gradle (in your build.gradle file):

compile io.vertx:vertx-kafka-client:4.5.11

Creating Kafka clients

Creating consumers and producers is quite similar and on how it works using the native Kafka client library.

They need to be configured with a bunch of properties as described in the official Apache Kafka documentation, for the consumer and for the producer.

To achieve that, a map can be configured with such properties passing it to one of the static creation methods exposed by KafkaConsumer and KafkaProducer

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// use consumer for interacting with Apache Kafka
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

In the above example, a KafkaConsumer instance is created using a map instance in order to specify the Kafka nodes list to connect (just one) and the deserializers to use for getting key and value from each received message.

Likewise a producer can be created

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("acks", "1");

// use producer for interacting with Apache Kafka
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);
The event loop that creates the KafkaConsumer will be the one to handle its messages. For example if you want messages to be handled on a Verticle’s event loop, create the Kafka Consumer within the Verticle’s start method.

Receiving messages from a topic joining a consumer group

In order to start receiving messages from Kafka topics, the consumer can use the subscribe method for subscribing to a set of topics being part of a consumer group (specified by the properties on creation).

It’s also possible to use the subscribe method for subscribing to more topics specifying a Java regex.

You also need to register a handler for handling incoming messages using the handler.

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// subscribe to several topics with list
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer.subscribe(topics);

// or using a Java regex
Pattern pattern = Pattern.compile("topic\\d");
consumer.subscribe(pattern);

// or just subscribe to a single topic
consumer.subscribe("a-single-topic");

The handler can be registered before or after the call to subscribe(); messages won’t be consumed until both methods have been called. This allows you to call subscribe(), then seek() and finally handler() in order to only consume messages starting from a particular offset, for example.

A handler can also be passed during subscription to be aware of the subscription result and being notified when the operation is completed.

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// subscribe to several topics
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer
  .subscribe(topics)
  .onSuccess(v ->
    System.out.println("subscribed")
  ).onFailure(cause ->
    System.out.println("Could not subscribe " + cause.getMessage())
  );

// or just subscribe to a single topic
consumer
  .subscribe("a-single-topic")
  .onSuccess(v ->
    System.out.println("subscribed")
  ).onFailure(cause ->
    System.out.println("Could not subscribe " + cause.getMessage())
  );

Using the consumer group way, the Kafka cluster assigns partitions to the consumer taking into account other connected consumers in the same consumer group, so that partitions can be spread across them.

The Kafka cluster handles partitions re-balancing when a consumer leaves the group (so assigned partitions are free to be assigned to other consumers) or a new consumer joins the group (so it wants partitions to read from).

You can register handlers on a KafkaConsumer to be notified of the partitions revocations and assignments by the Kafka cluster using partitionsRevokedHandler and partitionsAssignedHandler.

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// registering handlers for assigned and revoked partitions
consumer.partitionsAssignedHandler(topicPartitions -> {
  System.out.println("Partitions assigned");
  for (TopicPartition topicPartition : topicPartitions) {
    System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  }
});

consumer.partitionsRevokedHandler(topicPartitions -> {
  System.out.println("Partitions revoked");
  for (TopicPartition topicPartition : topicPartitions) {
    System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  }
});

// subscribes to the topic
consumer
  .subscribe("test")
  .onSuccess(v ->
    System.out.println("subscribed")
  ).onFailure(cause ->
    System.out.println("Could not subscribe " + cause.getMessage())
  );

After joining a consumer group for receiving messages, a consumer can decide to leave the consumer group in order to not get messages anymore using unsubscribe

consumer.unsubscribe();

You can add an handler to be notified of the result

consumer
  .unsubscribe()
  .onSuccess(v ->
    System.out.println("Consumer unsubscribed")
  );

Receiving messages from a topic requesting specific partitions

Besides being part of a consumer group for receiving messages from a topic, a consumer can ask for a specific topic partition. When the consumer is not part of a consumer group the overall application cannot rely on the re-balancing feature.

You can use assign in order to ask for specific partitions.

consumer.handler(record -> {
  System.out.println("key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

//
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(new TopicPartition()
  .setTopic("test")
  .setPartition(0));

// requesting to be assigned the specific partition
consumer
  .assign(topicPartitions)
  .onSuccess(v -> System.out.println("Partition assigned"))
  // After the assignment is completed, get the assigned partitions to this consumer
  .compose(v -> consumer.assignment())
  .onSuccess(partitions -> {
    for (TopicPartition topicPartition : partitions) {
      System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
    }
  });

As with subscribe(), the handler can be registered before or after the call to assign(); messages won’t be consumed until both methods have been called. This allows you to call assign(), then seek() and finally handler() in order to only consume messages starting from a particular offset, for example.

Calling assignment provides the list of the current assigned partitions.

Receiving messages with explicit polling

Other than using the internal polling mechanism in order to receive messages from Kafka, the client can subscribe to a topic, avoiding to register the handler for getting the messages and then using the poll method.

In this way, the user application is in charge to execute the poll for getting messages when it needs, for example after processing the previous ones.

consumer
  .subscribe("test")
  .onSuccess(v -> {
    System.out.println("Consumer subscribed");

    // Let's poll every second
    vertx.setPeriodic(1000, timerId ->
      consumer
        .poll(Duration.ofMillis(100))
        .onSuccess(records -> {
          for (int i = 0; i < records.size(); i++) {
            KafkaConsumerRecord<String, String> record = records.recordAt(i);
            System.out.println("key=" + record.key() + ",value=" + record.value() +
              ",partition=" + record.partition() + ",offset=" + record.offset());
          }
        })
        .onFailure(cause -> {
          System.out.println("Something went wrong when polling " + cause.toString());
          cause.printStackTrace();

          // Stop polling if something went wrong
          vertx.cancelTimer(timerId);
        })
    );
});

After subscribing successfully, the application start a periodic timer in order to execute the poll and getting messages from Kafka periodically.

Changing the subscription or assignment

You can change the subscribed topics, or assigned partitions after you have started to consume messages, simply by calling subscribe() or assign() again.

Note that due to internal buffering of messages it is possible that the record handler will continue to observe messages from the old subscription or assignment after the subscribe() or assign() method’s completion handler has been called. This is not the case for messages observed by the batch handler: Once the completion handler has been called it will only observe messages read from the subscription or assignment.

Getting topic partition information

You can call the partitionsFor to get information about partitions for a specified topic

consumer
  .partitionsFor("test")
  .onSuccess(partitions -> {
    for (PartitionInfo partitionInfo : partitions) {
      System.out.println(partitionInfo);
    }
  });

In addition listTopics provides all available topics with related partitions

consumer
  .listTopics()
  .onSuccess(partitionsTopicMap ->
    partitionsTopicMap.forEach((topic, partitions) -> {
      System.out.println("topic = " + topic);
      System.out.println("partitions = " + partitions);
    })
  );

Manual offset commit

In Apache Kafka the consumer is in charge to handle the offset of the last read message.

This is executed by the commit operation executed automatically every time a bunch of messages are read from a topic partition. The configuration parameter enable.auto.commit must be set to true when the consumer is created.

Manual offset commit, can be achieved with commit. It can be used to achieve at least once delivery to be sure that the read messages are processed before committing the offset.

consumer.commit().onSuccess(v ->
  System.out.println("Last read message offset committed")
);

Seeking in a topic partition

Apache Kafka can retain messages for a long period of time and the consumer can seek inside a topic partition and obtain arbitrary access to the messages.

You can use seek to change the offset for reading at a specific position

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// seek to a specific offset
consumer
  .seek(topicPartition, 10)
  .onSuccess(v -> System.out.println("Seeking done"));

When the consumer needs to re-read the stream from the beginning, it can use seekToBeginning

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// seek to the beginning of the partition
consumer
  .seekToBeginning(Collections.singleton(topicPartition))
  .onSuccess(v -> System.out.println("Seeking done"));

Finally seekToEnd can be used to come back at the end of the partition

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// seek to the end of the partition
consumer
  .seekToEnd(Collections.singleton(topicPartition))
  .onSuccess(v -> System.out.println("Seeking done"));

Note that due to internal buffering of messages it is possible that the record handler will continue to observe messages read from the original offset for a time after the seek*() method’s completion handler has been called. This is not the case for messages observed by the batch handler: Once the seek*() completion handler has been called it will only observe messages read from the new offset.

Offset lookup

You can use the beginningOffsets API introduced in Kafka 0.10.1.1 to get the first offset for a given partition. In contrast to seekToBeginning, it does not change the consumer’s offset.

Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);

consumer
  .beginningOffsets(topicPartitions)
  .onSuccess(results ->
    results.forEach((topic, beginningOffset) ->
      System.out.println(
        "Beginning offset for topic=" + topic.getTopic() + ", partition=" +
          topic.getPartition() + ", beginningOffset=" + beginningOffset
      )
    )
  );

// Convenience method for single-partition lookup
consumer
  .beginningOffsets(topicPartition)
  .onSuccess(beginningOffset ->
    System.out.println(
      "Beginning offset for topic=" + topicPartition.getTopic() + ", partition=" +
        topicPartition.getPartition() + ", beginningOffset=" + beginningOffset
    )
  );

You can use the endOffsets API introduced in Kafka 0.10.1.1 to get the last offset for a given partition. In contrast to seekToEnd, it does not change the consumer’s offset.

Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);

consumer.endOffsets(topicPartitions)
  .onSuccess(results ->
    results.forEach((topic, beginningOffset) ->
      System.out.println(
        "End offset for topic=" + topic.getTopic() + ", partition=" +
          topic.getPartition() + ", beginningOffset=" + beginningOffset
      )
    )
  );

// Convenience method for single-partition lookup
consumer
  .endOffsets(topicPartition)
  .onSuccess(endOffset ->
    System.out.println(
      "End offset for topic=" + topicPartition.getTopic() + ", partition=" +
        topicPartition.getPartition() + ", endOffset=" + endOffset
    )
);

You can use the offsetsForTimes API introduced in Kafka 0.10.1.1 to look up an offset by timestamp, i.e. search parameter is an epoch timestamp and the call returns the lowest offset with ingestion timestamp >= given timestamp.

Map<TopicPartition, Long> topicPartitionsWithTimestamps = new HashMap<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);

// We are interested in the offset for data ingested 60 seconds ago
long timestamp = (System.currentTimeMillis() - 60000);

topicPartitionsWithTimestamps.put(topicPartition, timestamp);
consumer
  .offsetsForTimes(topicPartitionsWithTimestamps)
  .onSuccess(results ->
    results.forEach((topic, offset) ->
      System.out.println(
        "Offset for topic=" + topic.getTopic() +
        ", partition=" + topic.getPartition() + "\n" +
        ", timestamp=" + timestamp + ", offset=" + offset.getOffset() +
        ", offsetTimestamp=" + offset.getTimestamp()
      )
    )
);

// Convenience method for single-partition lookup
consumer.offsetsForTimes(topicPartition, timestamp).onSuccess(offsetAndTimestamp ->
  System.out.println(
    "Offset for topic=" + topicPartition.getTopic() +
    ", partition=" + topicPartition.getPartition() + "\n" +
    ", timestamp=" + timestamp + ", offset=" + offsetAndTimestamp.getOffset() +
    ", offsetTimestamp=" + offsetAndTimestamp.getTimestamp()
  )
);

Message flow control

A consumer can control the incoming message flow and pause/resume the read operation from a topic, e.g it can pause the message flow when it needs more time to process the actual messages and then resume to continue message processing.

To achieve that you can use pause and resume.

In the case of the partition-specific pause and resume it is possible that the record handler will continue to observe messages from a paused partition for a time after the pause() method’s completion handler has been called. This is not the case for messages observed by the batch handler: Once the pause() completion handler has been called it will only observe messages from those partitions which are not paused.

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// registering the handler for incoming messages
consumer.handler(record -> {
  System.out.println("key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());

  // i.e. pause/resume on partition 0, after reading message up to offset 5
  if ((record.partition() == 0) && (record.offset() == 5)) {

    // pause the read operations
    consumer.pause(topicPartition)
      .onSuccess(v -> System.out.println("Paused"))
      .onSuccess(v -> vertx.setTimer(5000, timeId ->
        // resume read operations
        consumer.resume(topicPartition)
      ));
  }
});

Closing a consumer

Call close to close the consumer. Closing the consumer closes any open connections and releases all consumer resources.

The close is actually asynchronous and might not complete until some time after the call has returned. If you want to be notified when the actual close has completed then you can pass in a handler.

This handler will then be called when the close has fully completed.

consumer
  .close()
  .onSuccess(v -> System.out.println("Consumer is now closed"))
  .onFailure(cause -> System.out.println("Close failed: " + cause));

Sending messages to a topic

You can use write to send messages (records) to a topic.

The simplest way to send a message is to specify only the destination topic and the related value, omitting its key or partition, in this case the messages are sent in a round robin fashion across all the partitions of the topic.

for (int i = 0; i < 5; i++) {

  // only topic and message value are specified, round robin on destination partitions
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", "message_" + i);

  producer.write(record);
}

You can receive message sent metadata like its topic, its destination partition and its assigned offset.

for (int i = 0; i < 5; i++) {

  // only topic and message value are specified, round robin on destination partitions
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", "message_" + i);

  producer.send(record).onSuccess(recordMetadata ->
    System.out.println(
      "Message " + record.value() + " written on topic=" + recordMetadata.getTopic() +
      ", partition=" + recordMetadata.getPartition() +
      ", offset=" + recordMetadata.getOffset()
    )
  );
}

When you need to assign a partition to a message, you can specify its partition identifier or its key

for (int i = 0; i < 10; i++) {

  // a destination partition is specified
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", null, "message_" + i, 0);

  producer.write(record);
}

Since the producers identifies the destination using key hashing, you can use that to guarantee that all messages with the same key are sent to the same partition and retain the order.

for (int i = 0; i < 10; i++) {

  // i.e. defining different keys for odd and even messages
  int key = i % 2;

  // a key is specified, all messages with same key will be sent to the same partition
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", String.valueOf(key), "message_" + i);

  producer.write(record);
}
the shared producer is created on the first createShared call and its configuration is defined at this moment, shared producer usage must use the same configuration.

Sharing a producer

Sometimes you want to share the same producer from within several verticles or contexts.

Calling KafkaProducer.createShared returns a producer that can be shared safely.

KafkaProducer<String, String> producer1 = KafkaProducer.createShared(vertx, "the-producer", config);

// Sometimes later you can close it
producer1.close();

The same resources (thread, connection) will be shared between the producer returned by this method.

When you are done with the producer, just close it, when all shared producers are closed, the resources will be released for you.

Closing a producer

Call close to close the producer. Closing the producer closes any open connections and releases all producer resources.

The close is actually asynchronous and might not complete until some time after the call has returned. If you want to be notified when the actual close has completed then you can pass in a handler.

This handler will then be called when the close has fully completed.

producer
  .close()
  .onSuccess(v -> System.out.println("Producer is now closed"))
  .onFailure(cause -> System.out.println("Close failed: " + cause));

Getting topic partition information

You can call the partitionsFor to get information about partitions for a specified topic:

producer
  .partitionsFor("test")
  .onSuccess(partitions ->
    partitions.forEach(System.out::println)
  );

Handling errors

Errors handling (e.g timeout) between a Kafka client (consumer or producer) and the Kafka cluster is done using exceptionHandler or exceptionHandler

consumer.exceptionHandler(e -> {
  System.out.println("Error = " + e.getMessage());
});

Automatic clean-up in verticles

If you’re creating consumers and producer from inside verticles, those consumers and producers will be automatically closed when the verticle is undeployed.

Using Vert.x serializers/deserializers

Vert.x Kafka client comes out of the box with serializers and deserializers for buffers, json object and json array.

In a consumer you can use buffers

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// Creating a consumer able to deserialize to json object
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// Creating a consumer able to deserialize to json array
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

Or in a producer

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("acks", "1");

// Creating a producer able to serialize to json object
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("acks", "1");

// Creating a producer able to serialize to json array
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("acks", "1");

You can also specify the serializers/deserializers at creation time:

In a consumer

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// Creating a consumer able to deserialize buffers
KafkaConsumer<Buffer, Buffer> bufferConsumer = KafkaConsumer.create(vertx, config, Buffer.class, Buffer.class);

// Creating a consumer able to deserialize json objects
KafkaConsumer<JsonObject, JsonObject> jsonObjectConsumer = KafkaConsumer.create(vertx, config, JsonObject.class, JsonObject.class);

// Creating a consumer able to deserialize json arrays
KafkaConsumer<JsonArray, JsonArray> jsonArrayConsumer = KafkaConsumer.create(vertx, config, JsonArray.class, JsonArray.class);

Or in a producer

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("acks", "1");

// Creating a producer able to serialize to buffers
KafkaProducer<Buffer, Buffer> bufferProducer = KafkaProducer.create(vertx, config, Buffer.class, Buffer.class);

// Creating a producer able to serialize to json objects
KafkaProducer<JsonObject, JsonObject> jsonObjectProducer = KafkaProducer.create(vertx, config, JsonObject.class, JsonObject.class);

// Creating a producer able to serialize to json arrays
KafkaProducer<JsonArray, JsonArray> jsonArrayProducer = KafkaProducer.create(vertx, config, JsonArray.class, JsonArray.class);

RxJava 3 API

The Kafka client provides an Rxified version of the original API.

Observable<KafkaConsumerRecord<String, Long>> observable = consumer.toObservable();

observable
  .map(record -> record.value())
  .buffer(256)
  .map(
  list -> list.stream().mapToDouble(n -> n).average()
).subscribe(val -> {

  // Obtained an average

});

Automatic trace propagation

When Vert.x is configured with Tracing enabled (see setTracingOptions), traces will be automatically propagated with Kafka messages.

Kafka producers will add a span to traces when a message is being written, trace contexts will be propagated as Kafka headers, and the consumers will also create a span while reading a message.

Following the OpenTracing semantic convention, span tags are:

  • span.kind, it is either consumer or producer

  • peer.address can be configured from setTracePeerAddress. If unset, it will be the configured bootstrap server.

  • peer.hostname is parsed from peer.address

  • peer.port is parsed from peer.address

  • peer.service is always kafka

  • message_bus.destination, which is set to the topic in use

Vert.x Kafka Admin Client

This component provides a Vert.x wrapper around the Kafka Admin Client API. The Kafka Admin Client is used to create, modify, and delete topics. It also provides methods for handling ACLs (Access Control Lists), consumer groups and many more.

Creating the Kafka Admin Client

Creating the admin client is quite similar on how it works using the native Kafka client library.

It needs to be configured with a bunch of properties as described in the official Apache Kafka documentation, for the admin.

To achieve that, a map can be configured with such properties passing it to one of the static creation methods exposed by KafkaAdminClient.

Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

KafkaAdminClient adminClient = KafkaAdminClient.create(vertx, config);

Using the Kafka Admin Client

Listing topics

You can call the listTopics for listing the topics in the cluster. The only parameter is the usual callback to handle the result, which provides the topics list.

adminClient.listTopics().onSuccess(topics ->
    System.out.println("Topics= " + topics)
);

Describe topics

You can call describeTopics to describe topics in the cluster. Describing a topic means getting all related metadata like number of partitions, replicas, leader, in-sync replicas and so on. The needed parameters are the list of topics names to describe, and the usual callback to handle the result providing a map with topic names and related TopicDescription.

adminClient.describeTopics(Collections.singletonList("my-topic")).onSuccess(topics -> {
  TopicDescription topicDescription = topics.get("first-topic");

  System.out.println("Topic name=" + topicDescription.getName() +
      " isInternal= " + topicDescription.isInternal() +
      " partitions= " + topicDescription.getPartitions().size());

  for (TopicPartitionInfo topicPartitionInfo : topicDescription.getPartitions()) {
    System.out.println("Partition id= " + topicPartitionInfo.getPartition() +
      " leaderId= " + topicPartitionInfo.getLeader().getId() +
      " replicas= " + topicPartitionInfo.getReplicas() +
      " isr= " + topicPartitionInfo.getIsr());
  }
});

Create topic

You can call createTopics to create topics in the cluster. The needed parameters are the list of the topics to create, and the usual callback to handle the result. The topics to create are defined via the NewTopic class specifying the name, the number of partitions and the replication factor. It is also possible to describe the replicas assignment, mapping each replica to the broker id, instead of specifying the number of partitions and the replication factor (which in this case has to be set to -1).

adminClient.createTopics(Collections.singletonList(new NewTopic("testCreateTopic", 1, (short)1)))
  .onSuccess(v -> {
    // topics created successfully
  })
  .onFailure(cause -> {
    // something went wrong when creating the topics
  });

Delete topic

You can call deleteTopics to delete topics in the cluster. The needed parameters are the list of the topics to delete, and the usual callback to handle the result.

adminClient.deleteTopics(Collections.singletonList("topicToDelete"))
  .onSuccess(v -> {
    // topics deleted successfully
  })
  .onFailure(cause -> {
    // something went wrong when removing the topics
  });

Describe configuration

You can call describeConfigs to describe resources configuration. Describing resources configuration means getting all configuration information for cluster resources like topics or brokers. The needed parameters are the list of the resources for which you want the configuration, and the usual callback to handle the result. The resources are described by a collection of ConfigResource while the result maps each resource with a corresponding Config which as more ConfigEntry for each configuration parameter.

adminClient.describeConfigs(Collections.singletonList(
  new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, "my-topic"))).onSuccess(configs -> {
  // check the configurations
});

Alter configuration

You can call alterConfigs to alter resources configuration. Altering resources configuration means updating configuration information for cluster resources like topics or brokers. The needed parameters are the list of the resources with the related configurations to updated, and the usual callback to handle the result. It is possible to alter configurations for different resources with just one call. The input parameter maps each ConfigResource with the corresponding Config you want to apply.

ConfigResource resource = new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, "my-topic");
// create a entry for updating the retention.ms value on the topic
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "51000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singletonList(retentionEntry)));
adminClient.alterConfigs(updateConfig)
  .onSuccess(v -> {
    // configuration altered successfully
  })
  .onFailure(cause -> {
    // something went wrong when altering configs
  });

List consumer groups

You can call the listConsumerGroups for listing the consumer groups in the cluster. The only parameter is the usual callback to handle the result, which provides the consumer groups list.

adminClient.listConsumerGroups().onSuccess(consumerGroups ->
  System.out.println("ConsumerGroups= " + consumerGroups)
);

Describe consumer groups

You can call describeConsumerGroups to describe consumer groups in the cluster. Describing a consumer group means getting all related information like members, related ids, topics subscribed, partitions assignment and so on. The needed parameters are the list of consumer groups names to describe, and the usual callback to handle the result providing a map with consumer group names and related MemberDescription.

adminClient.describeTopics(Collections.singletonList("my-topic")).onSuccess(topics -> {
  TopicDescription topicDescription = topics.get("first-topic");

  System.out.println("Topic name=" + topicDescription.getName() +
      " isInternal= " + topicDescription.isInternal() +
      " partitions= " + topicDescription.getPartitions().size());

  for (TopicPartitionInfo topicPartitionInfo : topicDescription.getPartitions()) {
    System.out.println("Partition id= " + topicPartitionInfo.getPartition() +
      " leaderId= " + topicPartitionInfo.getLeader().getId() +
      " replicas= " + topicPartitionInfo.getReplicas() +
      " isr= " + topicPartitionInfo.getIsr());
  }
});