Class KafkaConsumer<K,V>
- java.lang.Object
-
- io.vertx.reactivex.kafka.client.consumer.KafkaConsumer<K,V>
-
- All Implemented Interfaces:
ReadStream<KafkaConsumerRecord<K,V>>
,StreamBase
public class KafkaConsumer<K,V> extends Object implements ReadStream<KafkaConsumerRecord<K,V>>
Vert.x Kafka consumer.You receive Kafka records by providing a
handler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecord<K, V>>)
. As messages arrive the handler will be called with the records.The
pause()
andresume()
provides global control over reading the records from the consumer.The
pause()
andresume()
provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.NOTE: This class has been automatically generated from the
original
non RX-ified interface using Vert.x codegen.
-
-
Field Summary
Fields Modifier and Type Field Description static io.vertx.lang.rx.TypeArg<KafkaConsumer>
__TYPE_ARG
io.vertx.lang.rx.TypeArg<K>
__typeArg_0
io.vertx.lang.rx.TypeArg<V>
__typeArg_1
-
Constructor Summary
Constructors Constructor Description KafkaConsumer(KafkaConsumer delegate)
KafkaConsumer(Object delegate, io.vertx.lang.rx.TypeArg<K> typeArg_0, io.vertx.lang.rx.TypeArg<V> typeArg_1)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description Future<Void>
assign(TopicPartition topicPartition)
Manually assign a partition to this consumer.Future<Void>
assign(Set<TopicPartition> topicPartitions)
Manually assign a list of partition to this consumer.Future<Set<TopicPartition>>
assignment()
Get the set of partitions currently assigned to this consumer.KafkaConsumer<K,V>
batchHandler(Handler<KafkaConsumerRecords<K,V>> handler)
Set the handler to be used when batches of messages are fetched from the Kafka server.Future<Long>
beginningOffsets(TopicPartition topicPartition)
Get the first offset for the given partitions.Future<Void>
close()
Close the consumerFuture<Void>
commit()
Commit current offsets for all the subscribed list of topics and partition.Future<OffsetAndMetadata>
committed(TopicPartition topicPartition)
Get the last committed offset for the given partition (whether the commit happened by this process or another).static <K,V>
KafkaConsumer<K,V>create(Vertx vertx, KafkaClientOptions options)
Create a new KafkaConsumer instancestatic <K,V>
KafkaConsumer<K,V>create(Vertx vertx, KafkaClientOptions options, Class<K> keyType, Class<V> valueType)
Create a new KafkaConsumer instancestatic <K,V>
KafkaConsumer<K,V>create(Vertx vertx, Map<String,String> config)
Create a new KafkaConsumer instancestatic <K,V>
KafkaConsumer<K,V>create(Vertx vertx, Map<String,String> config, Class<K> keyType, Class<V> valueType)
Create a new KafkaConsumer instancestatic <K,V>
KafkaConsumer<K,V>create(Vertx vertx, org.apache.kafka.clients.consumer.Consumer<K,V> consumer)
Create a new KafkaConsumer instance from a native .static <K,V>
KafkaConsumer<K,V>create(Vertx vertx, org.apache.kafka.clients.consumer.Consumer<K,V> consumer, KafkaClientOptions options)
Create a new KafkaConsumer instance from a native .long
demand()
Returns the current demand.KafkaConsumer<K,V>
endHandler(Handler<Void> endHandler)
Set an end handler.Future<Long>
endOffsets(TopicPartition topicPartition)
Get the last offset for the given partition.boolean
equals(Object o)
KafkaConsumer<K,V>
exceptionHandler(Handler<Throwable> handler)
Set an exception handler on the read stream.KafkaConsumer<K,V>
fetch(long amount)
Fetch the specifiedamount
of elements.KafkaConsumer
getDelegate()
KafkaConsumer<K,V>
handler(Handler<KafkaConsumerRecord<K,V>> handler)
Set a data handler.int
hashCode()
static <K,V>
KafkaConsumer<K,V>newInstance(KafkaConsumer arg)
static <K,V>
KafkaConsumer<K,V>newInstance(KafkaConsumer arg, io.vertx.lang.rx.TypeArg<K> __typeArg_K, io.vertx.lang.rx.TypeArg<V> __typeArg_V)
Future<OffsetAndTimestamp>
offsetsForTimes(TopicPartition topicPartition, Long timestamp)
Look up the offset for the given partition by timestamp.KafkaConsumer<K,V>
partitionsAssignedHandler(Handler<Set<TopicPartition>> handler)
Set the handler called when topic partitions are assigned to the consumerFuture<List<PartitionInfo>>
partitionsFor(String topic)
Get metadata about the partitions for a given topic.KafkaConsumer<K,V>
partitionsRevokedHandler(Handler<Set<TopicPartition>> handler)
Set the handler called when topic partitions are revoked to the consumerKafkaConsumer<K,V>
pause()
Pause theReadStream
, it sets the buffer infetch
mode and clears the actual demand.Future<Void>
pause(TopicPartition topicPartition)
Suspend fetching from the requested partition.Future<Void>
pause(Set<TopicPartition> topicPartitions)
Suspend fetching from the requested partitions.Future<Set<TopicPartition>>
paused()
Get the set of partitions that were previously paused by a call to pause(Set).Pipe<KafkaConsumerRecord<K,V>>
pipe()
Pause this stream and return a to transfer the elements of this stream to a destination .Future<Void>
pipeTo(WriteStream<KafkaConsumerRecord<K,V>> dst)
Pipe thisReadStream
to theWriteStream
.Future<KafkaConsumerRecords<K,V>>
poll(java.time.Duration timeout)
Executes a poll for getting messages from Kafka.KafkaConsumer<K,V>
pollTimeout(java.time.Duration timeout)
Sets the poll timeout for the underlying native Kafka Consumer.Future<Long>
position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).KafkaConsumer<K,V>
resume()
Resume reading, and sets the buffer inflowing
mode.Future<Void>
resume(TopicPartition topicPartition)
Resume specified partition which have been paused with pause.Future<Void>
resume(Set<TopicPartition> topicPartitions)
Resume specified partitions which have been paused with pause.Completable
rxAssign(TopicPartition topicPartition)
Manually assign a partition to this consumer.Completable
rxAssign(Set<TopicPartition> topicPartitions)
Manually assign a list of partition to this consumer.Single<Set<TopicPartition>>
rxAssignment()
Get the set of partitions currently assigned to this consumer.Single<Long>
rxBeginningOffsets(TopicPartition topicPartition)
Get the first offset for the given partitions.Completable
rxClose()
Close the consumerCompletable
rxCommit()
Commit current offsets for all the subscribed list of topics and partition.Single<OffsetAndMetadata>
rxCommitted(TopicPartition topicPartition)
Get the last committed offset for the given partition (whether the commit happened by this process or another).Single<Long>
rxEndOffsets(TopicPartition topicPartition)
Get the last offset for the given partition.Single<OffsetAndTimestamp>
rxOffsetsForTimes(TopicPartition topicPartition, Long timestamp)
Look up the offset for the given partition by timestamp.Single<List<PartitionInfo>>
rxPartitionsFor(String topic)
Get metadata about the partitions for a given topic.Completable
rxPause(TopicPartition topicPartition)
Suspend fetching from the requested partition.Completable
rxPause(Set<TopicPartition> topicPartitions)
Suspend fetching from the requested partitions.Single<Set<TopicPartition>>
rxPaused()
Get the set of partitions that were previously paused by a call to pause(Set).Completable
rxPipeTo(WriteStream<KafkaConsumerRecord<K,V>> dst)
Pipe thisReadStream
to theWriteStream
.Single<KafkaConsumerRecords<K,V>>
rxPoll(java.time.Duration timeout)
Executes a poll for getting messages from Kafka.Single<Long>
rxPosition(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).Completable
rxResume(TopicPartition topicPartition)
Resume specified partition which have been paused with pause.Completable
rxResume(Set<TopicPartition> topicPartitions)
Resume specified partitions which have been paused with pause.Completable
rxSeek(TopicPartition topicPartition, long offset)
Overrides the fetch offsets that the consumer will use on the next poll.Completable
rxSeek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata)
Overrides the fetch offsets that the consumer will use on the next poll.Completable
rxSeekToBeginning(TopicPartition topicPartition)
Seek to the first offset for each of the given partition.Completable
rxSeekToBeginning(Set<TopicPartition> topicPartitions)
Seek to the first offset for each of the given partitions.Completable
rxSeekToEnd(TopicPartition topicPartition)
Seek to the last offset for each of the given partition.Completable
rxSeekToEnd(Set<TopicPartition> topicPartitions)
Seek to the last offset for each of the given partitions.Completable
rxSubscribe(String topic)
Subscribe to the given topic to get dynamically assigned partitions.Completable
rxSubscribe(Pattern pattern)
Subscribe to all topics matching specified pattern to get dynamically assigned partitions.Completable
rxSubscribe(Set<String> topics)
Subscribe to the given list of topics to get dynamically assigned partitions.Single<Set<String>>
rxSubscription()
Get the current subscription.Completable
rxUnsubscribe()
Unsubscribe from topics currently subscribed with subscribe.Future<Void>
seek(TopicPartition topicPartition, long offset)
Overrides the fetch offsets that the consumer will use on the next poll.Future<Void>
seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata)
Overrides the fetch offsets that the consumer will use on the next poll.Future<Void>
seekToBeginning(TopicPartition topicPartition)
Seek to the first offset for each of the given partition.Future<Void>
seekToBeginning(Set<TopicPartition> topicPartitions)
Seek to the first offset for each of the given partitions.Future<Void>
seekToEnd(TopicPartition topicPartition)
Seek to the last offset for each of the given partition.Future<Void>
seekToEnd(Set<TopicPartition> topicPartitions)
Seek to the last offset for each of the given partitions.Future<Void>
subscribe(String topic)
Subscribe to the given topic to get dynamically assigned partitions.Future<Void>
subscribe(Pattern pattern)
Subscribe to all topics matching specified pattern to get dynamically assigned partitions.Future<Void>
subscribe(Set<String> topics)
Subscribe to the given list of topics to get dynamically assigned partitions.Future<Set<String>>
subscription()
Get the current subscription.Flowable<KafkaConsumerRecord<K,V>>
toFlowable()
Observable<KafkaConsumerRecord<K,V>>
toObservable()
String
toString()
Future<Void>
unsubscribe()
Unsubscribe from topics currently subscribed with subscribe.
-
-
-
Field Detail
-
__TYPE_ARG
public static final io.vertx.lang.rx.TypeArg<KafkaConsumer> __TYPE_ARG
-
__typeArg_0
public final io.vertx.lang.rx.TypeArg<K> __typeArg_0
-
__typeArg_1
public final io.vertx.lang.rx.TypeArg<V> __typeArg_1
-
-
Constructor Detail
-
KafkaConsumer
public KafkaConsumer(KafkaConsumer delegate)
-
-
Method Detail
-
getDelegate
public KafkaConsumer getDelegate()
- Specified by:
getDelegate
in interfaceReadStream<K>
- Specified by:
getDelegate
in interfaceStreamBase
-
toObservable
public Observable<KafkaConsumerRecord<K,V>> toObservable()
- Specified by:
toObservable
in interfaceReadStream<K>
-
toFlowable
public Flowable<KafkaConsumerRecord<K,V>> toFlowable()
- Specified by:
toFlowable
in interfaceReadStream<K>
-
pipe
public Pipe<KafkaConsumerRecord<K,V>> pipe()
Pause this stream and return a to transfer the elements of this stream to a destination . The stream will be resumed when the pipe will be wired to aWriteStream
.- Specified by:
pipe
in interfaceReadStream<K>
- Returns:
- a pipe
-
pipeTo
public Future<Void> pipeTo(WriteStream<KafkaConsumerRecord<K,V>> dst)
Pipe thisReadStream
to theWriteStream
.Elements emitted by this stream will be written to the write stream until this stream ends or fails.
- Specified by:
pipeTo
in interfaceReadStream<K>
- Parameters:
dst
- the destination write stream- Returns:
- a future notified when the write stream will be ended with the outcome
-
rxPipeTo
public Completable rxPipeTo(WriteStream<KafkaConsumerRecord<K,V>> dst)
Pipe thisReadStream
to theWriteStream
.Elements emitted by this stream will be written to the write stream until this stream ends or fails.
- Specified by:
rxPipeTo
in interfaceReadStream<K>
- Parameters:
dst
- the destination write stream- Returns:
- a future notified when the write stream will be ended with the outcome
-
create
public static <K,V> KafkaConsumer<K,V> create(Vertx vertx, Map<String,String> config)
Create a new KafkaConsumer instance- Parameters:
vertx
- Vert.x instance to useconfig
- Kafka consumer configuration- Returns:
- an instance of the KafkaConsumer
-
create
public static <K,V> KafkaConsumer<K,V> create(Vertx vertx, Map<String,String> config, Class<K> keyType, Class<V> valueType)
Create a new KafkaConsumer instance- Parameters:
vertx
- Vert.x instance to useconfig
- Kafka consumer configurationkeyType
- class type for the key deserializationvalueType
- class type for the value deserialization- Returns:
- an instance of the KafkaConsumer
-
create
public static <K,V> KafkaConsumer<K,V> create(Vertx vertx, KafkaClientOptions options)
Create a new KafkaConsumer instance- Parameters:
vertx
- Vert.x instance to useoptions
- Kafka consumer options- Returns:
- an instance of the KafkaConsumer
-
create
public static <K,V> KafkaConsumer<K,V> create(Vertx vertx, KafkaClientOptions options, Class<K> keyType, Class<V> valueType)
Create a new KafkaConsumer instance- Parameters:
vertx
- Vert.x instance to useoptions
- Kafka consumer optionskeyType
- class type for the key deserializationvalueType
- class type for the value deserialization- Returns:
- an instance of the KafkaConsumer
-
exceptionHandler
public KafkaConsumer<K,V> exceptionHandler(Handler<Throwable> handler)
Description copied from interface:ReadStream
Set an exception handler on the read stream.- Specified by:
exceptionHandler
in interfaceReadStream<K>
- Specified by:
exceptionHandler
in interfaceStreamBase
- Parameters:
handler
- the exception handler- Returns:
- a reference to this, so the API can be used fluently
-
handler
public KafkaConsumer<K,V> handler(Handler<KafkaConsumerRecord<K,V>> handler)
Description copied from interface:ReadStream
Set a data handler. As data is read, the handler will be called with the data.- Specified by:
handler
in interfaceReadStream<K>
- Returns:
- a reference to this, so the API can be used fluently
-
pause
public KafkaConsumer<K,V> pause()
Description copied from interface:ReadStream
Pause theReadStream
, it sets the buffer infetch
mode and clears the actual demand.While it's paused, no data will be sent to the data
handler
.- Specified by:
pause
in interfaceReadStream<K>
- Returns:
- a reference to this, so the API can be used fluently
-
resume
public KafkaConsumer<K,V> resume()
Description copied from interface:ReadStream
Resume reading, and sets the buffer inflowing
mode. If theReadStream
has been paused, reading will recommence on it.- Specified by:
resume
in interfaceReadStream<K>
- Returns:
- a reference to this, so the API can be used fluently
-
fetch
public KafkaConsumer<K,V> fetch(long amount)
Description copied from interface:ReadStream
Fetch the specifiedamount
of elements. If theReadStream
has been paused, reading will recommence with the specifiedamount
of items, otherwise the specifiedamount
will be added to the current stream demand.- Specified by:
fetch
in interfaceReadStream<K>
- Returns:
- a reference to this, so the API can be used fluently
-
endHandler
public KafkaConsumer<K,V> endHandler(Handler<Void> endHandler)
Description copied from interface:ReadStream
Set an end handler. Once the stream has ended, and there is no more data to be read, this handler will be called.- Specified by:
endHandler
in interfaceReadStream<K>
- Returns:
- a reference to this, so the API can be used fluently
-
demand
public long demand()
Returns the current demand.-
If the stream is in flowing mode will return
- If the stream is in fetch mode, will return the current number of elements still to be delivered or 0 if paused.
Long
.- Returns:
- current demand
-
subscribe
public Future<Void> subscribe(String topic)
Subscribe to the given topic to get dynamically assigned partitions.Due to internal buffering of messages, when changing the subscribed topic the old topic may remain in effect (as observed by the record handler}) until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new topic.- Parameters:
topic
- topic to subscribe to- Returns:
- a
Future
completed with the operation result
-
rxSubscribe
public Completable rxSubscribe(String topic)
Subscribe to the given topic to get dynamically assigned partitions.Due to internal buffering of messages, when changing the subscribed topic the old topic may remain in effect (as observed by the record handler}) until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new topic.- Parameters:
topic
- topic to subscribe to- Returns:
- a
Future
completed with the operation result
-
subscribe
public Future<Void> subscribe(Set<String> topics)
Subscribe to the given list of topics to get dynamically assigned partitions.Due to internal buffering of messages, when changing the subscribed topics the old set of topics may remain in effect (as observed by the record handler}) until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new set of topics.- Parameters:
topics
- topics to subscribe to- Returns:
- a
Future
completed with the operation result
-
rxSubscribe
public Completable rxSubscribe(Set<String> topics)
Subscribe to the given list of topics to get dynamically assigned partitions.Due to internal buffering of messages, when changing the subscribed topics the old set of topics may remain in effect (as observed by the record handler}) until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new set of topics.- Parameters:
topics
- topics to subscribe to- Returns:
- a
Future
completed with the operation result
-
assign
public Future<Void> assign(TopicPartition topicPartition)
Manually assign a partition to this consumer.Due to internal buffering of messages, when reassigning the old partition may remain in effect (as observed by the record handler)} until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new partition.- Parameters:
topicPartition
- partition which want assigned- Returns:
- a
Future
completed with the operation result
-
rxAssign
public Completable rxAssign(TopicPartition topicPartition)
Manually assign a partition to this consumer.Due to internal buffering of messages, when reassigning the old partition may remain in effect (as observed by the record handler)} until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new partition.- Parameters:
topicPartition
- partition which want assigned- Returns:
- a
Future
completed with the operation result
-
assign
public Future<Void> assign(Set<TopicPartition> topicPartitions)
Manually assign a list of partition to this consumer.Due to internal buffering of messages, when reassigning the old set of partitions may remain in effect (as observed by the record handler)} until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new set of partitions.- Parameters:
topicPartitions
- partitions which want assigned- Returns:
- a
Future
completed with the operation result
-
rxAssign
public Completable rxAssign(Set<TopicPartition> topicPartitions)
Manually assign a list of partition to this consumer.Due to internal buffering of messages, when reassigning the old set of partitions may remain in effect (as observed by the record handler)} until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new set of partitions.- Parameters:
topicPartitions
- partitions which want assigned- Returns:
- a
Future
completed with the operation result
-
assignment
public Future<Set<TopicPartition>> assignment()
Get the set of partitions currently assigned to this consumer.- Returns:
- a future notified on operation completed
-
rxAssignment
public Single<Set<TopicPartition>> rxAssignment()
Get the set of partitions currently assigned to this consumer.- Returns:
- a future notified on operation completed
-
unsubscribe
public Future<Void> unsubscribe()
Unsubscribe from topics currently subscribed with subscribe.- Returns:
- a
Future
completed with the operation result
-
rxUnsubscribe
public Completable rxUnsubscribe()
Unsubscribe from topics currently subscribed with subscribe.- Returns:
- a
Future
completed with the operation result
-
subscription
public Future<Set<String>> subscription()
Get the current subscription.- Returns:
- a future notified on operation completed
-
rxSubscription
public Single<Set<String>> rxSubscription()
Get the current subscription.- Returns:
- a future notified on operation completed
-
pause
public Future<Void> pause(TopicPartition topicPartition)
Suspend fetching from the requested partition.Due to internal buffering of messages, the will continue to observe messages from the given
topicPartition
until some time after the givencompletionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will not see messages from the giventopicPartition
.- Parameters:
topicPartition
- topic partition from which suspend fetching- Returns:
- a
Future
completed with the operation result
-
rxPause
public Completable rxPause(TopicPartition topicPartition)
Suspend fetching from the requested partition.Due to internal buffering of messages, the will continue to observe messages from the given
topicPartition
until some time after the givencompletionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will not see messages from the giventopicPartition
.- Parameters:
topicPartition
- topic partition from which suspend fetching- Returns:
- a
Future
completed with the operation result
-
pause
public Future<Void> pause(Set<TopicPartition> topicPartitions)
Suspend fetching from the requested partitions.Due to internal buffering of messages, the will continue to observe messages from the given
topicPartitions
until some time after the givencompletionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will not see messages from the giventopicPartitions
.- Parameters:
topicPartitions
- topic partition from which suspend fetching- Returns:
- a
Future
completed with the operation result
-
rxPause
public Completable rxPause(Set<TopicPartition> topicPartitions)
Suspend fetching from the requested partitions.Due to internal buffering of messages, the will continue to observe messages from the given
topicPartitions
until some time after the givencompletionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will not see messages from the giventopicPartitions
.- Parameters:
topicPartitions
- topic partition from which suspend fetching- Returns:
- a
Future
completed with the operation result
-
paused
public Future<Set<TopicPartition>> paused()
Get the set of partitions that were previously paused by a call to pause(Set).- Returns:
- a future notified on operation completed
-
rxPaused
public Single<Set<TopicPartition>> rxPaused()
Get the set of partitions that were previously paused by a call to pause(Set).- Returns:
- a future notified on operation completed
-
resume
public Future<Void> resume(TopicPartition topicPartition)
Resume specified partition which have been paused with pause.- Parameters:
topicPartition
- topic partition from which resume fetching- Returns:
- a
Future
completed with the operation result
-
rxResume
public Completable rxResume(TopicPartition topicPartition)
Resume specified partition which have been paused with pause.- Parameters:
topicPartition
- topic partition from which resume fetching- Returns:
- a
Future
completed with the operation result
-
resume
public Future<Void> resume(Set<TopicPartition> topicPartitions)
Resume specified partitions which have been paused with pause.- Parameters:
topicPartitions
- topic partition from which resume fetching- Returns:
- a
Future
completed with the operation result
-
rxResume
public Completable rxResume(Set<TopicPartition> topicPartitions)
Resume specified partitions which have been paused with pause.- Parameters:
topicPartitions
- topic partition from which resume fetching- Returns:
- a
Future
completed with the operation result
-
partitionsRevokedHandler
public KafkaConsumer<K,V> partitionsRevokedHandler(Handler<Set<TopicPartition>> handler)
Set the handler called when topic partitions are revoked to the consumer- Parameters:
handler
- handler called on revoked topic partitions- Returns:
- current KafkaConsumer instance
-
partitionsAssignedHandler
public KafkaConsumer<K,V> partitionsAssignedHandler(Handler<Set<TopicPartition>> handler)
Set the handler called when topic partitions are assigned to the consumer- Parameters:
handler
- handler called on assigned topic partitions- Returns:
- current KafkaConsumer instance
-
seek
public Future<Void> seek(TopicPartition topicPartition, long offset)
Overrides the fetch offsets that the consumer will use on the next poll.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartition
- topic partition for which seekoffset
- offset to seek inside the topic partition- Returns:
- a
Future
completed with the operation result
-
rxSeek
public Completable rxSeek(TopicPartition topicPartition, long offset)
Overrides the fetch offsets that the consumer will use on the next poll.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartition
- topic partition for which seekoffset
- offset to seek inside the topic partition- Returns:
- a
Future
completed with the operation result
-
seek
public Future<Void> seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata)
Overrides the fetch offsets that the consumer will use on the next poll.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartition
- topic partition for which seekoffsetAndMetadata
- offset to seek inside the topic partition- Returns:
- a
Future
completed with the operation result
-
rxSeek
public Completable rxSeek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata)
Overrides the fetch offsets that the consumer will use on the next poll.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartition
- topic partition for which seekoffsetAndMetadata
- offset to seek inside the topic partition- Returns:
- a
Future
completed with the operation result
-
seekToBeginning
public Future<Void> seekToBeginning(TopicPartition topicPartition)
Seek to the first offset for each of the given partition.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartition
- topic partition for which seek- Returns:
- a
Future
completed with the operation result
-
rxSeekToBeginning
public Completable rxSeekToBeginning(TopicPartition topicPartition)
Seek to the first offset for each of the given partition.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartition
- topic partition for which seek- Returns:
- a
Future
completed with the operation result
-
seekToBeginning
public Future<Void> seekToBeginning(Set<TopicPartition> topicPartitions)
Seek to the first offset for each of the given partitions.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartitions
- topic partition for which seek- Returns:
- a
Future
completed with the operation result
-
rxSeekToBeginning
public Completable rxSeekToBeginning(Set<TopicPartition> topicPartitions)
Seek to the first offset for each of the given partitions.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartitions
- topic partition for which seek- Returns:
- a
Future
completed with the operation result
-
seekToEnd
public Future<Void> seekToEnd(TopicPartition topicPartition)
Seek to the last offset for each of the given partition.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartition
- topic partition for which seek- Returns:
- a
Future
completed with the operation result
-
rxSeekToEnd
public Completable rxSeekToEnd(TopicPartition topicPartition)
Seek to the last offset for each of the given partition.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartition
- topic partition for which seek- Returns:
- a
Future
completed with the operation result
-
seekToEnd
public Future<Void> seekToEnd(Set<TopicPartition> topicPartitions)
Seek to the last offset for each of the given partitions.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartitions
- topic partition for which seek- Returns:
- a
Future
completed with the operation result
-
rxSeekToEnd
public Completable rxSeekToEnd(Set<TopicPartition> topicPartitions)
Seek to the last offset for each of the given partitions.Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new offset.- Parameters:
topicPartitions
- topic partition for which seek- Returns:
- a
Future
completed with the operation result
-
commit
public Future<Void> commit()
Commit current offsets for all the subscribed list of topics and partition.- Returns:
-
rxCommit
public Completable rxCommit()
Commit current offsets for all the subscribed list of topics and partition.- Returns:
-
committed
public Future<OffsetAndMetadata> committed(TopicPartition topicPartition)
Get the last committed offset for the given partition (whether the commit happened by this process or another).- Parameters:
topicPartition
- topic partition for getting last committed offset- Returns:
- a future notified on operation completed
-
rxCommitted
public Single<OffsetAndMetadata> rxCommitted(TopicPartition topicPartition)
Get the last committed offset for the given partition (whether the commit happened by this process or another).- Parameters:
topicPartition
- topic partition for getting last committed offset- Returns:
- a future notified on operation completed
-
partitionsFor
public Future<List<PartitionInfo>> partitionsFor(String topic)
Get metadata about the partitions for a given topic.- Parameters:
topic
- topic partition for which getting partitions info- Returns:
- a future notified on operation completed
-
rxPartitionsFor
public Single<List<PartitionInfo>> rxPartitionsFor(String topic)
Get metadata about the partitions for a given topic.- Parameters:
topic
- topic partition for which getting partitions info- Returns:
- a future notified on operation completed
-
batchHandler
public KafkaConsumer<K,V> batchHandler(Handler<KafkaConsumerRecords<K,V>> handler)
Set the handler to be used when batches of messages are fetched from the Kafka server. Batch handlers need to take care not to block the event loop when dealing with large batches. It is better to process records individually using the#handler(Handler) record handler
.- Parameters:
handler
- handler called when batches of messages are fetched- Returns:
- current KafkaConsumer instance
-
rxClose
public Completable rxClose()
Close the consumer- Returns:
-
position
public Future<Long> position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).- Parameters:
partition
- The partition to get the position for- Returns:
- a future notified on operation completed
-
rxPosition
public Single<Long> rxPosition(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).- Parameters:
partition
- The partition to get the position for- Returns:
- a future notified on operation completed
-
offsetsForTimes
public Future<OffsetAndTimestamp> offsetsForTimes(TopicPartition topicPartition, Long timestamp)
Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future- Parameters:
topicPartition
- TopicPartition to query.timestamp
- Timestamp to be used in the query.- Returns:
- a future notified on operation completed
-
rxOffsetsForTimes
public Single<OffsetAndTimestamp> rxOffsetsForTimes(TopicPartition topicPartition, Long timestamp)
Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future- Parameters:
topicPartition
- TopicPartition to query.timestamp
- Timestamp to be used in the query.- Returns:
- a future notified on operation completed
-
beginningOffsets
public Future<Long> beginningOffsets(TopicPartition topicPartition)
Get the first offset for the given partitions.- Parameters:
topicPartition
- the partition to get the earliest offset.- Returns:
- a future notified on operation completed
-
rxBeginningOffsets
public Single<Long> rxBeginningOffsets(TopicPartition topicPartition)
Get the first offset for the given partitions.- Parameters:
topicPartition
- the partition to get the earliest offset.- Returns:
- a future notified on operation completed
-
endOffsets
public Future<Long> endOffsets(TopicPartition topicPartition)
Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.- Parameters:
topicPartition
- the partition to get the end offset.- Returns:
- a future notified on operation completed
-
rxEndOffsets
public Single<Long> rxEndOffsets(TopicPartition topicPartition)
Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.- Parameters:
topicPartition
- the partition to get the end offset.- Returns:
- a future notified on operation completed
-
create
public static <K,V> KafkaConsumer<K,V> create(Vertx vertx, org.apache.kafka.clients.consumer.Consumer<K,V> consumer)
Create a new KafkaConsumer instance from a native .- Parameters:
vertx
- Vert.x instance to useconsumer
- the Kafka consumer to wrap- Returns:
- an instance of the KafkaConsumer
-
create
public static <K,V> KafkaConsumer<K,V> create(Vertx vertx, org.apache.kafka.clients.consumer.Consumer<K,V> consumer, KafkaClientOptions options)
Create a new KafkaConsumer instance from a native .- Parameters:
vertx
- Vert.x instance to useconsumer
- the Kafka consumer to wrapoptions
- options used only for tracing settings- Returns:
- an instance of the KafkaConsumer
-
subscribe
public Future<Void> subscribe(Pattern pattern)
Subscribe to all topics matching specified pattern to get dynamically assigned partitions.Due to internal buffering of messages, when changing the subscribed topics the old set of topics may remain in effect (as observed by the record handler}) until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new set of topics.- Parameters:
pattern
- Pattern to subscribe to- Returns:
- a
Future
completed with the operation result
-
rxSubscribe
public Completable rxSubscribe(Pattern pattern)
Subscribe to all topics matching specified pattern to get dynamically assigned partitions.Due to internal buffering of messages, when changing the subscribed topics the old set of topics may remain in effect (as observed by the record handler}) until some time after the given
completionHandler
is called. In contrast, the once the givencompletionHandler
is called thebatchHandler(io.vertx.core.Handler<io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecords<K, V>>)
will only see messages consistent with the new set of topics.- Parameters:
pattern
- Pattern to subscribe to- Returns:
- a
Future
completed with the operation result
-
pollTimeout
public KafkaConsumer<K,V> pollTimeout(java.time.Duration timeout)
Sets the poll timeout for the underlying native Kafka Consumer. Defaults to 1000ms. Setting timeout to a lower value results in a more 'responsive' client, because it will block for a shorter period if no data is available in the assigned partition and therefore allows subsequent actions to be executed with a shorter delay. At the same time, the client will poll more frequently and thus will potentially create a higher load on the Kafka Broker.- Parameters:
timeout
- The time, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the native Kafka consumer's buffer, else returns empty. Must not be negative.- Returns:
-
poll
public Future<KafkaConsumerRecords<K,V>> poll(java.time.Duration timeout)
Executes a poll for getting messages from Kafka.- Parameters:
timeout
- The maximum time to block (must not be greater thanLong
milliseconds)- Returns:
- a future notified on operation completed
-
rxPoll
public Single<KafkaConsumerRecords<K,V>> rxPoll(java.time.Duration timeout)
Executes a poll for getting messages from Kafka.- Parameters:
timeout
- The maximum time to block (must not be greater thanLong
milliseconds)- Returns:
- a future notified on operation completed
-
newInstance
public static <K,V> KafkaConsumer<K,V> newInstance(KafkaConsumer arg)
-
newInstance
public static <K,V> KafkaConsumer<K,V> newInstance(KafkaConsumer arg, io.vertx.lang.rx.TypeArg<K> __typeArg_K, io.vertx.lang.rx.TypeArg<V> __typeArg_V)
-
-