Class: VertxKafkaClient::KafkaProducer

Inherits:
Object
  • Object
show all
Includes:
Vertx::WriteStream
Defined in:
/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb

Overview

Vert.x Kafka producer.

The provides global control over writing a record.

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (::VertxKafkaClient::KafkaProducer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)

Create a new KafkaProducer instance

Parameters:

  • vertx (::Vertx::Vertx) (defaults to: nil)
    Vert.x instance to use
  • config (Hash{String => String}) (defaults to: nil)
    Kafka producer configuration
  • keyType (Nil) (defaults to: nil)
    class type for the key serialization
  • valueType (Nil) (defaults to: nil)
    class type for the value serialization

Returns:

Raises:

  • (ArgumentError)

61
62
63
64
65
66
67
68
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 61

def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil)
  if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaProducer, nil, nil)
  elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaProducer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType))
  end
  raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})"
end

+ (::VertxKafkaClient::KafkaProducer) create_shared(vertx = nil, name = nil, config = nil, keyType = nil, valueType = nil)

Get or create a KafkaProducer instance which shares its stream with any other KafkaProducer created with the same name

Parameters:

  • vertx (::Vertx::Vertx) (defaults to: nil)
    Vert.x instance to use
  • name (String) (defaults to: nil)
    the producer name to identify it
  • config (Hash{String => String}) (defaults to: nil)
    Kafka producer configuration
  • keyType (Nil) (defaults to: nil)
    class type for the key serialization
  • valueType (Nil) (defaults to: nil)
    class type for the value serialization

Returns:

Raises:

  • (ArgumentError)

47
48
49
50
51
52
53
54
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 47

def self.create_shared(vertx=nil,name=nil,config=nil,keyType=nil,valueType=nil)
  if vertx.class.method_defined?(:j_del) && name.class == String && config.class == Hash && !block_given? && keyType == nil && valueType == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:createShared, [Java::IoVertxCore::Vertx.java_class,Java::java.lang.String.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,name,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaProducer, nil, nil)
  elsif vertx.class.method_defined?(:j_del) && name.class == String && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:createShared, [Java::IoVertxCore::Vertx.java_class,Java::java.lang.String.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,name,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaProducer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType))
  end
  raise ArgumentError, "Invalid arguments when calling create_shared(#{vertx},#{name},#{config},#{keyType},#{valueType})"
end

Instance Method Details

- (void) close(timeout = nil) { ... }

This method returns an undefined value.

Close the producer

Parameters:

  • timeout (Fixnum) (defaults to: nil)
    timeout to wait for closing

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)

155
156
157
158
159
160
161
162
163
164
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 155

def close(timeout=nil)
  if !block_given? && timeout == nil
    return @j_del.java_method(:close, []).call()
  elsif block_given? && timeout == nil
    return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  elsif timeout.class == Fixnum && block_given?
    return @j_del.java_method(:close, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(timeout,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling close(#{timeout})"
end

- (self) drain_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)

109
110
111
112
113
114
115
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 109

def drain_handler
  if block_given?
    @j_del.java_method(:drainHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield })
    return self
  end
  raise ArgumentError, "Invalid arguments when calling drain_handler()"
end

- (void) end(data = nil) { ... }

This method returns an undefined value.

Same as but with an handler called when the operation completes

Parameters:

Yields:

Raises:

  • (ArgumentError)

28
29
30
31
32
33
34
35
36
37
38
39
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 28

def end(data=nil)
  if !block_given? && data == nil
    return @j_del.java_method(:end, []).call()
  elsif block_given? && data == nil
    return @j_del.java_method(:end, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  elsif data.class.method_defined?(:j_del) && !block_given?
    return @j_del.java_method(:end, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class]).call(data.j_del)
  elsif data.class.method_defined?(:j_del) && block_given?
    return @j_del.java_method(:end, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(data.j_del,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling end(#{data})"
end

- (self) exception_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)

71
72
73
74
75
76
77
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 71

def exception_handler
  if block_given?
    @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling exception_handler()"
end

- (self) flush { ... }

Invoking this method makes all buffered records immediately available to write

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)

144
145
146
147
148
149
150
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 144

def flush
  if block_given?
    @j_del.java_method(:flush, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield })
    return self
  end
  raise ArgumentError, "Invalid arguments when calling flush()"
end

- (self) partitions_for(topic = nil) { ... }

Get the partition metadata for the give topic.

Parameters:

  • topic (String) (defaults to: nil)
    topic partition for which getting partitions info

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)

134
135
136
137
138
139
140
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 134

def partitions_for(topic=nil)
  if topic.class == String && block_given?
    @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})"
end

- (self) send(record = nil) { ... }

Asynchronously write a record to a topic

Parameters:

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)

120
121
122
123
124
125
126
127
128
129
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 120

def send(record=nil)
  if record.class.method_defined?(:j_del) && !block_given?
    @j_del.java_method(:send, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class]).call(record.j_del)
    return self
  elsif record.class.method_defined?(:j_del) && block_given?
    @j_del.java_method(:send, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(record.j_del,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling send(#{record})"
end

- (self) set_write_queue_max_size(i = nil)

Parameters:

  • i (Fixnum) (defaults to: nil)

Returns:

  • (self)

Raises:

  • (ArgumentError)

93
94
95
96
97
98
99
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 93

def set_write_queue_max_size(i=nil)
  if i.class == Fixnum && !block_given?
    @j_del.java_method(:setWriteQueueMaxSize, [Java::int.java_class]).call(i)
    return self
  end
  raise ArgumentError, "Invalid arguments when calling set_write_queue_max_size(#{i})"
end

- (self) write(data = nil) { ... }

Parameters:

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)

81
82
83
84
85
86
87
88
89
90
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 81

def write(data=nil)
  if data.class.method_defined?(:j_del) && !block_given?
    @j_del.java_method(:write, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class]).call(data.j_del)
    return self
  elsif data.class.method_defined?(:j_del) && block_given?
    @j_del.java_method(:write, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(data.j_del,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling write(#{data})"
end

- (true, false) write_queue_full?

Returns:

  • (true, false)

Raises:

  • (ArgumentError)

101
102
103
104
105
106
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 101

def write_queue_full?
  if !block_given?
    return @j_del.java_method(:writeQueueFull, []).call()
  end
  raise ArgumentError, "Invalid arguments when calling write_queue_full?()"
end