Class: Kafka::Consumer
- Inherits:
-
Object
- Object
- Kafka::Consumer
- Defined in:
- lib/kafka/consumer.rb
Instance Attribute Summary collapse
-
#client ⇒ Kafka::FFI::Consumer
readonly
Returns the backing Kafka::FFI::Consumer.
Instance Method Summary collapse
-
#assignments ⇒ Hash{String => Array<Integer>}
Retrieves the set of topic + partition assignments for the consumer.
-
#close ⇒ Object
Gracefully shutdown the consumer and it's connections.
- #commit(msg, async: false) ⇒ Object
-
#initialize(config) ⇒ Consumer
constructor
A new instance of Consumer.
-
#poll(timeout: 250, &block) ⇒ Object
Poll the consumer for waiting message.
-
#subscribe(topic, *rest) ⇒ Object
Subscribe the consumer to the given list of topics.
Constructor Details
#initialize(config) ⇒ Consumer
Returns a new instance of Consumer.
17 18 19 20 21 22 23 24 |
# File 'lib/kafka/consumer.rb', line 17 def initialize(config) # Initialize the client @client = Kafka::FFI::Consumer.new(config) # Redirect the main event queue so calls to consumer_poll will fire # callbacks instead of having to have a separate poller thread. @client.poll_set_consumer end |
Instance Attribute Details
#client ⇒ Kafka::FFI::Consumer (readonly)
Returns the backing Kafka::FFI::Consumer.
14 15 16 |
# File 'lib/kafka/consumer.rb', line 14 def client @client end |
Instance Method Details
#assignments ⇒ Hash{String => Array<Integer>}
Retrieves the set of topic + partition assignments for the consumer.
45 46 47 |
# File 'lib/kafka/consumer.rb', line 45 def assignments @client.assignment end |
#close ⇒ Object
After calling #close it is unsafe to call any other method on the Consumer.
Gracefully shutdown the consumer and it's connections.
73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/kafka/consumer.rb', line 73 def close # @see https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#high-level-kafkaconsumer # Gracefully shutdown the consumer, leaving the consumer group, # committing any remaining offsets, and releasing resources back to the # system. # # This will effectively call #close on the Client automatically. Trying # to follow the documentation and calling #close before #destroy caused # warnings due to brokers disconnecting but just calling #destroy fixes # that. @client.destroy end |
#commit(msg, async: false) ⇒ Object
58 59 60 61 62 63 64 65 66 67 |
# File 'lib/kafka/consumer.rb', line 58 def commit(msg, async: false) list = Kafka::FFI::TopicPartitionList.new(1) list.add(msg.topic, msg.partition) list.set_offset(msg.topic, msg.partition, msg.offset + 1) @client.commit(list, async) ensure list.destroy end |
#poll(timeout: 250, &block) ⇒ Object
Poll the consumer for waiting message.
53 54 55 |
# File 'lib/kafka/consumer.rb', line 53 def poll(timeout: 250, &block) @client.consumer_poll(timeout, &block) end |
#subscribe(topic, *rest) ⇒ Object
Subscribe the consumer to the given list of topics. Once the subscriptions have become active and partitions assigned, calls to #poll will yield messages for the subscribed topics.
subscribe will set the list of subscriptions, removing any that are not included in the most recent call.
34 35 36 |
# File 'lib/kafka/consumer.rb', line 34 def subscribe(topic, *rest) @client.subscribe(topic, *rest) end |