Class: Kafka::Producer
- Inherits:
-
Object
- Object
- Kafka::Producer
- Defined in:
- lib/kafka/producer.rb,
lib/kafka/producer/delivery_report.rb
Defined Under Namespace
Classes: DeliveryReport
Instance Attribute Summary collapse
-
#client ⇒ Kafka::FFI::Producer
readonly
Returns the backing Kafka::FFI::Producer.
Instance Method Summary collapse
-
#close(timeout: 30000) ⇒ Object
Gracefully shutdown the Producer, flushing any pending deliveries, and finally releasing an memory back to the system.
-
#flush(timeout: 1000) ⇒ Object
Wait until all outstanding produce requests are completed.
-
#initialize(config) ⇒ Producer
constructor
Initialize a new Producer for the configured cluster.
-
#produce(topic, payload, key: nil, partition: nil, headers: nil, timestamp: nil, &block) {|report| ... } ⇒ DeliveryReport
Produce and publish a message to the Kafka cluster.
Constructor Details
#initialize(config) ⇒ Producer
Initialize a new Producer for the configured cluster.
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/kafka/producer.rb', line 21 def initialize(config) config = config.dup # Configure callbacks config.on_delivery_report(&method(:on_delivery_report)) @client = ::Kafka::FFI::Producer.new(config) # Periodically call poll on the client to ensure callbacks are fired. @poller = Poller.new(@client) end |
Instance Attribute Details
#client ⇒ Kafka::FFI::Producer (readonly)
Returns the backing Kafka::FFI::Producer.
16 17 18 |
# File 'lib/kafka/producer.rb', line 16 def client @client end |
Instance Method Details
#close(timeout: 30000) ⇒ Object
Once #close is call it is no longer safe to call any other method on the Producer.
Gracefully shutdown the Producer, flushing any pending deliveries, and finally releasing an memory back to the system.
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/kafka/producer.rb', line 97 def close(timeout: 30000) # @see https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#producer @poller.stop @client.flush(timeout: timeout) @client.poll # Client handles destroying cached Topics @client.destroy end |
#flush(timeout: 1000) ⇒ Object
Wait until all outstanding produce requests are completed.
85 86 87 |
# File 'lib/kafka/producer.rb', line 85 def flush(timeout: 1000) @client.flush(timeout: timeout) end |
#produce(topic, payload, key: nil, partition: nil, headers: nil, timestamp: nil, &block) {|report| ... } ⇒ DeliveryReport
Produce and publish a message to the Kafka cluster.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/kafka/producer.rb', line 59 def produce(topic, payload, key: nil, partition: nil, headers: nil, timestamp: nil, &block) report = DeliveryReport.new(&block) # Allocate a pointer to a small chunk of memory. We will use the pointer # (not the value it points to) as a key for looking up the DeliveryReport # in the callback. # # Using a MemoryPointer as a key also ensures we have a reference to the # Pointer so it doesn't get garbage collected away and it can be freed it # in the callback since the raw FFI::Pointer disallows #free as FFI # doesn't believe we allocated it. opaque = Kafka::FFI::Opaque.new(report) @client.produce(topic, payload, key: key, partition: partition, headers: headers, timestamp: timestamp, opaque: opaque) report rescue opaque.free raise end |