Class: Kafka::Producer::DeliveryReport
- Inherits:
-
Object
- Object
- Kafka::Producer::DeliveryReport
- Defined in:
- lib/kafka/producer/delivery_report.rb
Instance Attribute Summary collapse
- #error ⇒ nil, Kafka::ResponseError readonly
-
#latency ⇒ nil, Integer
readonly
Returns the number of microseconds since the message was enqueued for delivery until the message was confirmed by the cluster or permanently failed.
- #offset ⇒ nil, Integer readonly
- #partition ⇒ nil, Integer readonly
- #topic ⇒ nil, String readonly
Instance Method Summary collapse
-
#done(message) ⇒ Object
Set the response based on the message and notify anyone waiting on the result.
-
#error? ⇒ Boolean
Is the report for an error?.
-
#initialize(&block) ⇒ DeliveryReport
constructor
Initializes a new DeliveryReport.
-
#received? ⇒ Boolean
Returns true when the report has been received back from the kafka cluster.
-
#successful? ⇒ Boolean
Returns if the delivery was successful.
-
#wait(timeout: 5000) ⇒ Object
Wait for a report to be received for the delivery from the cluster.
Constructor Details
#initialize(&block) ⇒ DeliveryReport
Initializes a new DeliveryReport
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/kafka/producer/delivery_report.rb', line 37 def initialize(&block) @mutex = Mutex.new @waiter = ConditionVariable.new @error = nil @topic = nil @offset = nil @latency = nil @partition = nil @callback = block # Will be set to true by a call to #done. Fast out for any callers to # #wait that may come in after done has already been called. @done = false end |
Instance Attribute Details
#error ⇒ nil, Kafka::ResponseError (readonly)
8 9 10 |
# File 'lib/kafka/producer/delivery_report.rb', line 8 def error @error end |
#latency ⇒ nil, Integer (readonly)
Latency is in microseconds (μs) while most other timestamps are in milliseconds.
Returns the number of microseconds since the message was enqueued for delivery until the message was confirmed by the cluster or permanently failed.
31 32 33 |
# File 'lib/kafka/producer/delivery_report.rb', line 31 def latency @latency end |
#offset ⇒ nil, Integer (readonly)
16 17 18 |
# File 'lib/kafka/producer/delivery_report.rb', line 16 def offset @offset end |
#partition ⇒ nil, Integer (readonly)
20 21 22 |
# File 'lib/kafka/producer/delivery_report.rb', line 20 def partition @partition end |
#topic ⇒ nil, String (readonly)
12 13 14 |
# File 'lib/kafka/producer/delivery_report.rb', line 12 def topic @topic end |
Instance Method Details
#done(message) ⇒ Object
Set the response based on the message and notify anyone waiting on the result.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/kafka/producer/delivery_report.rb', line 81 def done(message) @mutex.synchronize do @error = message.error @offset = message.offset @topic = message.topic @partition = message.partition @latency = message.latency @done = true @waiter.broadcast remove_instance_variable(:@mutex) remove_instance_variable(:@waiter) end if @callback @callback.call(self) end end |
#error? ⇒ Boolean
Returns Is the report for an error?.
63 64 65 |
# File 'lib/kafka/producer/delivery_report.rb', line 63 def error? error.nil? end |
#received? ⇒ Boolean
Returns true when the report has been received back from the kafka cluster.
58 59 60 |
# File 'lib/kafka/producer/delivery_report.rb', line 58 def received? @done end |
#successful? ⇒ Boolean
Returns if the delivery was successful
71 72 73 |
# File 'lib/kafka/producer/delivery_report.rb', line 71 def successful? !error end |
#wait(timeout: 5000) ⇒ Object
Wait for a report to be received for the delivery from the cluster.
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/kafka/producer/delivery_report.rb', line 108 def wait(timeout: 5000) # Fast out since the delivery report has already been reported back from # the cluster. if @done return end @mutex.synchronize do # Convert from milliseconds to seconds to match Ruby's API. Takes # milliseconds to be consistent with librdkafka APIs. if timeout timeout /= 1000.0 end @waiter.wait(@mutex, timeout) # No report was received for the message before we timed out waiting. if !@done raise ::Kafka::ResponseError, ::Kafka::FFI::RD_KAFKA_RESP_ERR__TIMED_OUT end end nil end |