Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Consumer

Returns a new instance of Consumer.

Parameters:

[View source]

17
18
19
20
21
22
23
24
# File 'lib/kafka/consumer.rb', line 17

def initialize(config)
  # Initialize the client
  @client = Kafka::FFI::Consumer.new(config)

  # Redirect the main event queue so calls to consumer_poll will fire
  # callbacks instead of having to have a separate poller thread.
  @client.poll_set_consumer
end

Instance Attribute Details

#clientKafka::FFI::Consumer (readonly)

Returns the backing Kafka::FFI::Consumer.


14
15
16
# File 'lib/kafka/consumer.rb', line 14

def client
  @client
end

Instance Method Details

#assignmentsHash{String => Array<Integer>}

Retrieves the set of topic + partition assignments for the consumer.

Examples:

consumer.assignment # => { "topic" => [1,2,3] }

Returns:

  • (Hash{String => Array<Integer>})

    List of partition assignments keyed by the topic name.

[View source]

45
46
47
# File 'lib/kafka/consumer.rb', line 45

def assignments
  @client.assignment
end

#closeObject

Note:

After calling #close it is unsafe to call any other method on the Consumer.

Gracefully shutdown the consumer and it's connections.

[View source]

73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/kafka/consumer.rb', line 73

def close
  # @see https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#high-level-kafkaconsumer

  # Gracefully shutdown the consumer, leaving the consumer group,
  # committing any remaining offsets, and releasing resources back to the
  # system.
  #
  # This will effectively call #close on the Client automatically. Trying
  # to follow the documentation and calling #close before #destroy caused
  # warnings due to brokers disconnecting but just calling #destroy fixes
  # that.
  @client.destroy
end

#commit(msg, async: false) ⇒ Object

Parameters:

  • msg (Consumer::Message)
[View source]

58
59
60
61
62
63
64
65
66
67
# File 'lib/kafka/consumer.rb', line 58

def commit(msg, async: false)
  list = Kafka::FFI::TopicPartitionList.new(1)

  list.add(msg.topic, msg.partition)
  list.set_offset(msg.topic, msg.partition, msg.offset + 1)

  @client.commit(list, async)
ensure
  list.destroy
end

#poll(timeout: 250, &block) ⇒ Object

Poll the consumer for waiting message.

Parameters:

  • timeout (Integer) (defaults to: 250)

    Time to wait in milliseconds for a message to be available.

[View source]

53
54
55
# File 'lib/kafka/consumer.rb', line 53

def poll(timeout: 250, &block)
  @client.consumer_poll(timeout, &block)
end

#subscribe(topic, *rest) ⇒ Object

Subscribe the consumer to the given list of topics. Once the subscriptions have become active and partitions assigned, calls to #poll will yield messages for the subscribed topics.

subscribe will set the list of subscriptions, removing any that are not included in the most recent call.

Parameters:

  • topic (String, Array<String>)

    Topics to subscribe to

[View source]

34
35
36
# File 'lib/kafka/consumer.rb', line 34

def subscribe(topic, *rest)
  @client.subscribe(topic, *rest)
end