Class: VertxKafkaClient::KafkaConsumer

Inherits:
Object
  • Object
show all
Includes:
Vertx::ReadStream
Defined in:
/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb

Overview

Vert.x Kafka consumer.

You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.

The #pause and #resume provides global control over reading the records from the consumer.

The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)

Create a new KafkaConsumer instance

Parameters:

  • vertx (::Vertx::Vertx) (defaults to: nil)
    Vert.x instance to use
  • config (Hash{String => String}) (defaults to: nil)
    Kafka consumer configuration
  • keyType (Nil) (defaults to: nil)
    class type for the key deserialization
  • valueType (Nil) (defaults to: nil)
    class type for the value deserialization

Returns:

Raises:

  • (ArgumentError)


37
38
39
40
41
42
43
44
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 37

def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil)
  if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil)
  elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType))
  end
  raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})"
end

Instance Method Details

- (self) assign(topicPartition) - (self) assign(topicPartitions) - (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }

Manually assign a list of partition to this consumer.

Due to internal buffering of messages, when reassigning the old set of partitions may remain in effect (as observed by the record handler)} until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new set of partitions.

Overloads:

  • - (self) assign(topicPartition)

    Parameters:

    • topicPartition (Hash)
      partition which want assigned
  • - (self) assign(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      partitions which want assigned
  • - (self) assign(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      partition which want assigned

    Yields:

    • handler called on operation completed
  • - (self) assign(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      partitions which want assigned

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 200

def assign(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling assign(#{param_1})"
end

- (self) assignment { ... }

Get the set of partitions currently assigned to this consumer.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


219
220
221
222
223
224
225
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 219

def assignment
  if block_given?
    @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling assignment()"
end

- (self) batch_handler { ... }

Set the handler to be used when batches of messages are fetched from the Kafka server. Batch handlers need to take care not to block the event loop when dealing with large batches. It is better to process records individually using the record handler.

Yields:

  • handler called when batches of messages are fetched

Returns:

  • (self)

Raises:

  • (ArgumentError)


411
412
413
414
415
416
417
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 411

def batch_handler
  if block_given?
    @j_del.java_method(:batchHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecords, nil, nil)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling batch_handler()"
end

- (void) beginning_offsets(topicPartition = nil) { ... }

This method returns an undefined value.

Get the first offset for the given partitions.

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    the partition to get the earliest offset.

Yields:

  • handler called on operation completed. Returns the earliest available offset for the given partition

Raises:

  • (ArgumentError)


455
456
457
458
459
460
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 455

def beginning_offsets(topicPartition=nil)
  if topicPartition.class == Hash && block_given?
    return @j_del.java_method(:beginningOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling beginning_offsets(#{topicPartition})"
end

- (void) close { ... }

This method returns an undefined value.

Close the consumer

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


421
422
423
424
425
426
427
428
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 421

def close
  if !block_given?
    return @j_del.java_method(:close, []).call()
  elsif block_given?
    return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling close()"
end

- (void) commit { ... }

This method returns an undefined value.

Commit current offsets for all the subscribed list of topics and partition.

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


376
377
378
379
380
381
382
383
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 376

def commit
  if !block_given?
    return @j_del.java_method(:commit, []).call()
  elsif block_given?
    return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling commit()"
end

- (void) committed(topicPartition = nil) { ... }

This method returns an undefined value.

Get the last committed offset for the given partition (whether the commit happened by this process or another).

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    topic partition for getting last committed offset

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


388
389
390
391
392
393
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 388

def committed(topicPartition=nil)
  if topicPartition.class == Hash && block_given?
    return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling committed(#{topicPartition})"
end

- (self) end_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


137
138
139
140
141
142
143
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 137

def end_handler
  if block_given?
    @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield })
    return self
  end
  raise ArgumentError, "Invalid arguments when calling end_handler()"
end

- (void) end_offsets(topicPartition = nil) { ... }

This method returns an undefined value.

Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    the partition to get the end offset.

Yields:

  • handler called on operation completed. The end offset for the given partition.

Raises:

  • (ArgumentError)


466
467
468
469
470
471
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 466

def end_offsets(topicPartition=nil)
  if topicPartition.class == Hash && block_given?
    return @j_del.java_method(:endOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling end_offsets(#{topicPartition})"
end

- (self) exception_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


47
48
49
50
51
52
53
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 47

def exception_handler
  if block_given?
    @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling exception_handler()"
end

- (self) handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 56

def handler
  if block_given?
    @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling handler()"
end

- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }

This method returns an undefined value.

Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    TopicPartition to query.
  • timestamp (Fixnum) (defaults to: nil)
    Timestamp to be used in the query.

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


445
446
447
448
449
450
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 445

def offsets_for_times(topicPartition=nil,timestamp=nil)
  if topicPartition.class == Hash && timestamp.class == Fixnum && block_given?
    return @j_del.java_method(:offsetsForTimes, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::JavaLang::Long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),timestamp,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling offsets_for_times(#{topicPartition},#{timestamp})"
end

- (self) partitions_assigned_handler { ... }

Set the handler called when topic partitions are assigned to the consumer

Yields:

  • handler called on assigned topic partitions

Returns:

  • (self)

Raises:

  • (ArgumentError)


271
272
273
274
275
276
277
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 271

def partitions_assigned_handler
  if block_given?
    @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()"
end

- (self) partitions_for(topic = nil) { ... }

Get metadata about the partitions for a given topic.

Parameters:

  • topic (String) (defaults to: nil)
    topic partition for which getting partitions info

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


398
399
400
401
402
403
404
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 398

def partitions_for(topic=nil)
  if topic.class == String && block_given?
    @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})"
end

- (self) partitions_revoked_handler { ... }

Set the handler called when topic partitions are revoked to the consumer

Yields:

  • handler called on revoked topic partitions

Returns:

  • (self)

Raises:

  • (ArgumentError)


261
262
263
264
265
266
267
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 261

def partitions_revoked_handler
  if block_given?
    @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()"
end

- (self) pause - (self) pause(topicPartition) - (self) pause(topicPartitions) - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }

Suspend fetching from the requested partitions.

Due to internal buffering of messages, the will continue to observe messages from the given topicParations until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will not see messages from the given topicParations.

Overloads:

  • - (self) pause(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition from which suspend fetching
  • - (self) pause(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which suspend fetching
  • - (self) pause(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed
  • - (self) pause(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 84

def pause(param_1=nil)
  if !block_given? && param_1 == nil
    @j_del.java_method(:pause, []).call()
    return self
  elsif param_1.class == Hash && !block_given?
    @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling pause(#{param_1})"
end

- (void) paused { ... }

This method returns an undefined value.

Get the set of partitions that were previously paused by a call to pause(Set).

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


252
253
254
255
256
257
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 252

def paused
  if block_given?
    return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling paused()"
end

- (::VertxKafkaClient::KafkaConsumer) poll_timeout(timeout = nil)

Sets the poll timeout (in ms) for the underlying native Kafka Consumer. Defaults to 1000. Setting timeout to a lower value results in a more 'responsive' client, because it will block for a shorter period if no data is available in the assigned partition and therefore allows subsequent actions to be executed with a shorter delay. At the same time, the client will poll more frequently and thus will potentially create a higher load on the Kafka Broker.

Parameters:

  • timeout (Fixnum) (defaults to: nil)
    The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the native Kafka consumer's buffer, else returns empty. Must not be negative.

Returns:

Raises:

  • (ArgumentError)


478
479
480
481
482
483
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 478

def poll_timeout(timeout=nil)
  if timeout.class == Fixnum && !block_given?
    return ::Vertx::Util::Utils.safe_create(@j_del.java_method(:pollTimeout, [Java::long.java_class]).call(timeout),::VertxKafkaClient::KafkaConsumer, nil, nil)
  end
  raise ArgumentError, "Invalid arguments when calling poll_timeout(#{timeout})"
end

- (void) position(partition = nil) { ... }

This method returns an undefined value.

Get the offset of the next record that will be fetched (if a record with that offset exists).

Parameters:

  • partition (Hash) (defaults to: nil)
    The partition to get the position for

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


433
434
435
436
437
438
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 433

def position(partition=nil)
  if partition.class == Hash && block_given?
    return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(partition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling position(#{partition})"
end

- (self) resume - (self) resume(topicPartition) - (self) resume(topicPartitions) - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }

Resume specified partitions which have been paused with pause.

Overloads:

  • - (self) resume(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition from which resume fetching
  • - (self) resume(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which resume fetching
  • - (self) resume(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed
  • - (self) resume(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 116

def resume(param_1=nil)
  if !block_given? && param_1 == nil
    @j_del.java_method(:resume, []).call()
    return self
  elsif param_1.class == Hash && !block_given?
    @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling resume(#{param_1})"
end

- (self) seek(topicPartition = nil, offset = nil) { ... }

Overrides the fetch offsets that the consumer will use on the next poll.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    topic partition for which seek
  • offset (Fixnum) (defaults to: nil)
    offset to seek inside the topic partition

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


291
292
293
294
295
296
297
298
299
300
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 291

def seek(topicPartition=nil,offset=nil)
  if topicPartition.class == Hash && offset.class == Fixnum && !block_given?
    @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset)
    return self
  elsif topicPartition.class == Hash && offset.class == Fixnum && block_given?
    @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek(#{topicPartition},#{offset})"
end

- (self) seekToBeginning(topicPartition) - (self) seekToBeginning(topicPartitions) - (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

Seek to the first offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Overloads:

  • - (self) seekToBeginning(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek
  • - (self) seekToBeginning(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek
  • - (self) seekToBeginning(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 321

def seek_to_beginning(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{param_1})"
end

- (self) seekToEnd(topicPartition) - (self) seekToEnd(topicPartitions) - (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }

Seek to the last offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Overloads:

  • - (self) seekToEnd(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek
  • - (self) seekToEnd(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek
  • - (self) seekToEnd(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToEnd(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 357

def seek_to_end(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek_to_end(#{param_1})"
end

- (self) subscribe(topic) - (self) subscribe(topics) - (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }

Subscribe to the given list of topics to get dynamically assigned partitions.

Due to internal buffering of messages, when changing the subscribed topics the old set of topics may remain in effect (as observed by the record handler}) until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new set of topics.

Overloads:

  • - (self) subscribe(topic)

    Parameters:

    • topic (String)
      topic to subscribe to
  • - (self) subscribe(topics)

    Parameters:

    • topics (Set<String>)
      topics to subscribe to
  • - (self) subscribe(topic, completionHandler) { ... }

    Parameters:

    • topic (String)
      topic to subscribe to

    Yields:

    • handler called on operation completed
  • - (self) subscribe(topics, completionHandler) { ... }

    Parameters:

    • topics (Set<String>)
      topics to subscribe to

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 164

def subscribe(param_1=nil)
  if param_1.class == String && !block_given?
    @j_del.java_method(:subscribe, [Java::java.lang.String.java_class]).call(param_1)
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }))
    return self
  elsif param_1.class == String && block_given?
    @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(param_1,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling subscribe(#{param_1})"
end

- (self) subscription { ... }

Get the current subscription.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


242
243
244
245
246
247
248
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 242

def subscription
  if block_given?
    @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling subscription()"
end

- (self) unsubscribe { ... }

Unsubscribe from topics currently subscribed with subscribe.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


229
230
231
232
233
234
235
236
237
238
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 229

def unsubscribe
  if !block_given?
    @j_del.java_method(:unsubscribe, []).call()
    return self
  elsif block_given?
    @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling unsubscribe()"
end