Class: Kafka::FFI::Consumer
- Inherits:
-
Client
- Object
- OpaquePointer
- Client
- Kafka::FFI::Consumer
- Defined in:
- lib/kafka/ffi/consumer.rb
Instance Attribute Summary
Attributes inherited from OpaquePointer
Class Method Summary collapse
Instance Method Summary collapse
-
#assign(list) ⇒ Object
Atomically assign the set of partitions to consume.
-
#assignment ⇒ Hash{String => Array<Integer>}
(also: #assignments)
List the current partition assignment(s) for the consumer.
-
#close ⇒ Object
Close down the consumer.
-
#commit(offsets, async) ⇒ Object
Commit the set of offsets from the given TopicPartitionList.
-
#commit_message(message, async) ⇒ Object
Commit the message's offset on the broker for the message's partition.
-
#committed(list, timeout: 1000) ⇒ TopicPartitionList
Retrieve committed offsets for topics + partitions.
-
#consumer_poll(timeout) {|message| ... } ⇒ Object
Poll the consumer's queue for a waiting Message and yields that message.
-
#get_consumer_queue ⇒ Queue
Returns a reference to the consume queue.
-
#get_partition_queue(topic, partition) ⇒ Queue
Returns a reference to the partition's queue.
-
#get_watermark_offsets(topic, partition) ⇒ Array<(Integer, Integer)>
Get the last known (cached) low and high offsets for the partition.
-
#member_id ⇒ String
Retrieve the Consumer's broker assigned group Member ID.
-
#poll_set_consumer ⇒ Object
Redirect the main event queue to the Consumer's queue so the consumer doesn't need to poll from it separately for event callbacks to fire.
-
#subscribe(topic, *rest) ⇒ Object
Subscribe the consumer to receive Messages for a set of topics.
-
#subscription ⇒ Array<String>
(also: #subscriptions)
List the current topic subscriptions for the consumer.
-
#unsubscribe ⇒ Object
Unsubscribe from the current subscription set (e.g. all current subscriptions).
Methods inherited from Client
#alter_configs, #brokers_add, #cluster_id, #config, #controller_id, #create_partitions, #create_topics, #default_topic_conf_dup, #delete_topics, #describe_configs, #destroy, from_native, #get_background_queue, #get_main_queue, #group_list, #initialize, #metadata, #name, #offsets_for_times, #outq_len, #pause_partitions, #poll, #query_watermark_offsets, #resume_partitions, #set_log_queue, #topic
Methods inherited from OpaquePointer
by_ref, from_native, inherited, #initialize, to_native
Constructor Details
This class inherits a constructor from Kafka::FFI::Client
Class Method Details
.new(config = nil) ⇒ Object
10 11 12 |
# File 'lib/kafka/ffi/consumer.rb', line 10 def self.new(config = nil) super(:consumer, config) end |
Instance Method Details
#assign(list) ⇒ Object
Atomically assign the set of partitions to consume. This will replace the existing assignment.
171 172 173 174 175 176 177 178 |
# File 'lib/kafka/ffi/consumer.rb', line 171 def assign(list) err = ::Kafka::FFI.rd_kafka_assign(self, list) if err != :ok raise ::Kafka::ResponseError, err end nil end |
#assignment ⇒ Hash{String => Array<Integer>} Also known as: assignments
List the current partition assignment(s) for the consumer.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/kafka/ffi/consumer.rb', line 188 def assignment ptr = ::FFI::MemoryPointer.new(:pointer) err = ::Kafka::FFI.rd_kafka_assignment(self, ptr) if err != :ok raise ::Kafka::ResponseError, err end begin tpl = ::Kafka::FFI::TopicPartitionList.new(ptr.read_pointer) # { "topic" => [1, 2, 3] } tpl.elements.inject({}) do |memo, tp| (memo[tp.topic] ||= []) << tp.partition memo end ensure tpl.destroy end ensure ptr.free end |
#close ⇒ Object
Maximum blocking time is roughly limited to session.timeout.ms
Close down the consumer. This will block until the consumer has revoked
its assignment(s), committed offsets, and left the consumer group. The
maximum blocking time is roughly limited to the session.timeout.ms
config option.
Ensure that destroy
is called after the consumer is closed to free up
resources.
333 334 335 336 337 338 339 340 |
# File 'lib/kafka/ffi/consumer.rb', line 333 def close err = ::Kafka::FFI.rd_kafka_consumer_close(self) if err != :ok raise ::Kafka::ResponseError, err end nil end |
#commit(offsets, async) ⇒ Object
Commit the set of offsets from the given TopicPartitionList.
294 295 296 297 298 299 300 301 |
# File 'lib/kafka/ffi/consumer.rb', line 294 def commit(offsets, async) err = ::Kafka::FFI.rd_kafka_commit(self, offsets, async) if err != :ok raise ::Kafka::ResponseError, err end nil end |
#commit_message(message, async) ⇒ Object
Commit the message's offset on the broker for the message's partition.
309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/kafka/ffi/consumer.rb', line 309 def commit_message(message, async) if message.nil? || message.null? raise ArgumentError, "message cannot but nil/null" end err = ::Kafka::FFI.rd_kafka_commit_message(message, async) if err raise ::Kafka::ResponseError, err end nil end |
#committed(list, timeout: 1000) ⇒ TopicPartitionList
Retrieve committed offsets for topics + partitions. The offset field for each TopicPartition in list will be set to the stored offset or RD_KAFKA_OFFSET_INVALID in case there was no stored offset for that partition. The error field is set if there was an error with the TopicPartition.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/kafka/ffi/consumer.rb', line 230 def committed(list, timeout: 1000) if list.nil? raise ArgumentError, "list cannot be nil" end err = ::Kafka::FFI.rd_kafka_committed(self, list, timeout) if err != :ok raise ::Kafka::ResponseError, err end # Return the list that was passed in as it should now be augmented with # the committed offsets and any errors fetching said offsets. list end |
#consumer_poll(timeout) {|message| ... } ⇒ Object
Poll the consumer's queue for a waiting Message and yields that message. The yielded message must not be cached in the application as it becomes unusable once the block completes.
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/kafka/ffi/consumer.rb', line 261 def consumer_poll(timeout) if !block_given? raise ArgumentError, "consumer_poll must be passed a block" end msg = ::Kafka::FFI.rd_kafka_consumer_poll(self, timeout.to_i) # No message was available if msg.null? return nil end begin if msg.error raise msg.error end yield(msg) ensure msg.destroy end end |
#get_consumer_queue ⇒ Queue
Caller must call #destroy when done with the Queue.
Returns a reference to the consume queue. This is the queue served by consumer_poll.
61 62 63 |
# File 'lib/kafka/ffi/consumer.rb', line 61 def get_consumer_queue ::Kafka::FFI.rd_kafka_queue_get_consumer(self) end |
#get_partition_queue(topic, partition) ⇒ Queue
Caller must call #destroy when done with the Queue.
Returns a reference to the partition's queue.
71 72 73 |
# File 'lib/kafka/ffi/consumer.rb', line 71 def get_partition_queue(topic, partition) ::Kafka::FFI.rd_kafka_queue_get_partition(self, topic, partition) end |
#get_watermark_offsets(topic, partition) ⇒ Array<(Integer, Integer)>
Get the last known (cached) low and high offsets for the partition. This differs from query_watermark_offsets in that does not query the brokers.
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/kafka/ffi/consumer.rb', line 41 def get_watermark_offsets(topic, partition) low = ::FFI::MemoryPointer.new(:int64) high = ::FFI::MemoryPointer.new(:int64) err = ::Kafka::FFI.rd_kafka_get_watermark_offsets(self, topic, partition, low, high) if err != :ok raise ::Kafka::ResponseError, err end [low.read_int64, high.read_int64] end |
#member_id ⇒ String
Retrieve the Consumer's broker assigned group Member ID.
17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/kafka/ffi/consumer.rb', line 17 def member_id id = ::Kafka::FFI.rd_kafka_memberid(self) if id.null? return nil end id.read_string ensure ::Kafka::FFI.rd_kafka_mem_free(self, id) end |
#poll_set_consumer ⇒ Object
It is not permitted to call #poll after redirecting the main queue with poll_set_consumer.
Redirect the main event queue to the Consumer's queue so the consumer doesn't need to poll from it separately for event callbacks to fire.
83 84 85 86 87 88 89 90 |
# File 'lib/kafka/ffi/consumer.rb', line 83 def poll_set_consumer err = ::Kafka::FFI.rd_kafka_poll_set_consumer(self) if err != :ok raise ::Kafka::ResponseError, err end nil end |
#subscribe(topic, *rest) ⇒ Object
Subscribe the consumer to receive Messages for a set of topics. The current set of subscriptions will be replaced.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/kafka/ffi/consumer.rb', line 102 def subscribe(topic, *rest) topics = [topic, rest].flatten tpl = TopicPartitionList.new(topics.length) topics.each do |t| tpl.add(t) end err = ::Kafka::FFI.rd_kafka_subscribe(self, tpl) if err != :ok raise ::Kafka::ResponseError, err end nil ensure tpl.destroy end |
#subscription ⇒ Array<String> Also known as: subscriptions
List the current topic subscriptions for the consumer.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/kafka/ffi/consumer.rb', line 139 def subscription ptr = ::FFI::MemoryPointer.new(:pointer) err = ::Kafka::FFI.rd_kafka_subscription(self, ptr) if err != :ok raise ::Kafka::ResponseError, err end begin tpl = ::Kafka::FFI::TopicPartitionList.new(ptr.read_pointer) # Map the topic partition list to topic names. tpl.elements.map(&:topic) ensure tpl.destroy end ensure ptr.free end |
#unsubscribe ⇒ Object
Unsubscribe from the current subscription set (e.g. all current subscriptions).
124 125 126 127 128 129 130 131 |
# File 'lib/kafka/ffi/consumer.rb', line 124 def unsubscribe err = ::Kafka::FFI.rd_kafka_unsubscribe(self) if err != :ok raise ::Kafka::ResponseError, err end nil end |