Class: Kafka::FFI::Client

Inherits:
OpaquePointer show all
Defined in:
lib/kafka/ffi/client.rb

Overview

Note:

Naming this is hard and librdkafka primarily just refers to it as "a handle" to an instance. It's more akin to an internal service and this Client talks the API to that service.

Client is a handle to a configured librdkafka instance that begins operation once created. Client is an abstract class and will provide either a Consumer or Producer based on the type being created. Each Client instance can either produce or consume messages to / from topics and cannot do both.

See Also:

Direct Known Subclasses

Consumer, Producer

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from OpaquePointer

by_ref, inherited, to_native

Constructor Details

#initialize(ptr) ⇒ Client

Returns a new instance of Client.



101
102
103
104
105
# File 'lib/kafka/ffi/client.rb', line 101

def initialize(ptr)
  super(ptr)

  @topics = {}
end

Class Method Details

.from_native(ptr, _ctx) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/kafka/ffi/client.rb', line 69

def self.from_native(ptr, _ctx)
  if !ptr.is_a?(::FFI::Pointer)
    raise TypeError, "from_native can only convert from a ::FFI::Pointer to #{self}"
  end

  # Converting from a null pointer should return nil. Likely this was
  # caused by rd_kafka_new returning an error and a NULL pointer for the
  # Client.
  if ptr.null?
    return nil
  end

  # Build a temporary Client to pass to rd_kafka_type. There is a bit of a
  # chicken and egg problem here. We can't create the final class until
  # after we know the type. But for type safety we want to pass a Client.
  cfg = allocate
  cfg.send(:initialize, ptr)
  type = ::Kafka::FFI.rd_kafka_type(cfg)

  klass =
    case type
    when :producer then Producer
    when :consumer then Consumer
    else
      raise ArgumentError, "unknown Kafka client type: #{type}"
    end

  client = klass.allocate
  client.send(:initialize, ptr)
  client
end

.new(type, config = nil) ⇒ Consumer, Producer

Create a new Client of type with the given configuration.

Parameters:

  • type (:consumer, :producer)

    Type of Kafka instance to create.

  • config (nil) (defaults to: nil)

    Use librdkafka default config

  • config (Config, Kafka::Config) (defaults to: nil)

    Configuration for the instance.

  • config (Hash{[String, Symbol] => [String, Integer, nil, Boolean]}) (defaults to: nil)

    Configuration options for the instance.

Returns:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/kafka/ffi/client.rb', line 34

def self.new(type, config = nil)
  error = ::FFI::MemoryPointer.new(:char, 512)

  # Convenience for passing in a Kafka::Config instead of building a
  # Kafka::FFI::Config since Kafka::Config provides a way to create a
  # config from a Hash.
  config =
    case config
    when Config, nil     then config
    when ::Kafka::Config then config.to_ffi
    when Hash            then ::Kafka::Config.new(config).to_ffi
    else
      raise ArgumentError, "config must be on of nil, Config, ::Kafka::Config, or Hash"
    end

  client = Kafka::FFI.rd_kafka_new(type, config, error, error.size)
  if client.nil?
    raise Error, error.read_string
  end

  if config
    # Store a reference to the config on the Client instance. We do this to
    # tie the Config's lifecycle to the Client instance in Ruby since they
    # are already tied in librdkafka. This ensures that any Ruby objects
    # referenced in the config (like callbacks) are not garbage collected.
    #
    # Using instance_variable_set to avoid exposing an API method that
    # could cause confusion from end users since the config cannot be
    # changed after initialization.
    client.instance_variable_set(:@config, config)
  end

  client
end

Instance Method Details

#alter_configs(resources, options: nil, timeout: 5000) ⇒ nil, Array<Admin::ConfigResource>

Note:

AlterConfigs will replace all existing configuration for the given resources, reverting all unspecified config options to their default values.

Note:

At most one :broker type ConfigResource can be specified per call to alter_configs since the changes must be sent to the broker specified in the resource.

Update the configuration for the specified resources. Updates may succeed for a subset of the provided resources while others fail. The configuration for a particular resource is update atomically, replacing values using the provided ConfigResource (set via set_config) and reverting any unspecified config options to their default values.

Application is responsible for calling #destroy on the returned results when done with the results.

Parameters:

  • resources (Admin::ConfigResource)

    Resource to alter configs for.

  • resources (Array<Admin::ConfigResource>)

    List of resources with their configs to update. At most one of type :broker is allowed per call.

  • options (Admin::AdminOptions) (defaults to: nil)

    Admin API request options

  • timeout (Integer) (defaults to: 5000)

    Time to wait in milliseconds for request to complete.

Returns:

See Also:

  • rd_kafka_AlterConfigs


564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
# File 'lib/kafka/ffi/client.rb', line 564

def alter_configs(resources, options: nil, timeout: 5000)
  resources = Array(resources)

  # NewPartitions wants an array of Admin::ConfigResource
  list = ::FFI::MemoryPointer.new(:pointer, resources.length)
  list.write_array_of_pointer(resources.map(&:pointer))

  # Queue to receive the result
  queue = ::Kafka::FFI::Queue.new(self)

  ::Kafka::FFI.rd_kafka_AlterConfigs(self, list, resources.length, options, queue)

  event = queue.poll(timeout: timeout)
  if event
    ::Kafka::FFI::Admin::Result.new(event)
  end
ensure
  list.free
  queue.destroy
end

#brokers_add(brokers) ⇒ Integer

Note:

It is preferred to set brokers through the metadata.broker.list or bootstrap.servers config options.

Adds one or more brokers to the Client's list of initial bootstrap brokers. Additionaly brokers will be discovered automatically once the Client connects to a broker by querying the broker metadata.

Examples:

Add multiple brokers

client.brokers_add(["kafka_1:9092", "kafka_2:9092"])

Add a single broker with protocol

client.brokers.add("PLAINTEXT://localhost:9096")

Parameters:

  • brokers (String)

    Comma separated list of broker addresses to add.

  • brokers (Array<String>)

    Array of broker addresses to add.

Returns:

  • (Integer)

    number of brokers successfully added

See Also:

  • rd_kafka_brokers_add


421
422
423
424
425
426
427
# File 'lib/kafka/ffi/client.rb', line 421

def brokers_add(brokers)
  if brokers.is_a?(Array)
    brokers = brokers.join(",")
  end

  ::Kafka::FFI.rd_kafka_brokers_add(self, brokers)
end

#cluster_idObject

Note:

requires config api.version.request set to true

Retrieves the Client's Cluster ID



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/kafka/ffi/client.rb', line 125

def cluster_id
  id = ::Kafka::FFI.rd_kafka_clusterid(self)

  if id.null?
    return nil
  end

  id.read_string
ensure
  if !id.null?
    ::Kafka::FFI.rd_kafka_mem_free(self, id)
  end
end

#configObject

Note:

The returned config is read-only and tied to the lifetime of the Client. Don't try to modify or destroy the config.

Retrive the current configuration used by Client.



111
112
113
# File 'lib/kafka/ffi/client.rb', line 111

def config
  ::Kafka::FFI.rd_kafka_conf(self)
end

#controller_id(timeout: 1000) ⇒ Integer

Note:

requires config api.version.request set to true

Retrieves the current Controller ID as reported by broker metadata.

Parameters:

  • timeout (Integer) (defaults to: 1000)

    Maximum time to wait in milliseconds. Specify 0 for a non-blocking call.

Returns:

  • (Integer)

    controller broker id or -1 if no ID could be retrieved before the timeout.



148
149
150
# File 'lib/kafka/ffi/client.rb', line 148

def controller_id(timeout: 1000)
  ::Kafka::FFI.rd_kafka_controllerid(self, timeout)
end

#create_partitions(requests, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::TopicResult>

Create additional partition(s) for a topic on the cluster.

Application is responsible for calling #destroy on the returned results when done with the results.

and possibly broker assignments for those partitions.

Parameters:

  • requests (Admin::NewPartitions)

    Details about partions to create

  • requests (Array<Admin::NewPartitions>)

    List of partition detauls.

  • options (Admin::AdminOptions) (defaults to: nil)

    Admin API request options

  • timeout (Integer) (defaults to: 5000)

    Time to wait in milliseconds for request to complete.

Returns:



514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
# File 'lib/kafka/ffi/client.rb', line 514

def create_partitions(requests, options: nil, timeout: 5000)
  requests = Array(requests)

  # NewPartitions wants an array of Admin::NewPartitions
  list = ::FFI::MemoryPointer.new(:pointer, requests.length)
  list.write_array_of_pointer(requests.map(&:pointer))

  # Queue to receive the result
  queue = ::Kafka::FFI::Queue.new(self)

  ::Kafka::FFI.rd_kafka_CreatePartitions(self, list, requests.length, options, queue)

  event = queue.poll(timeout: timeout)
  if event
    ::Kafka::FFI::Admin::Result.new(event)
  end
ensure
  list.free
  queue.destroy
end

#create_topics(topics, options: nil, timeout: 5000) ⇒ nil, Admin::Result<TopicResult>

Create topics in the cluster with the given configuration.

Application is responsible for calling #destroy on the returned results when done with the results.

Parameters:

  • topics (NewTopic, Array<NewTopic>)

    List of topics to create on the cluster.

  • timeout (Integer) (defaults to: 5000)

    Time in milliseconds to wait for a reply.

Returns:

  • (nil)

    Create timed out

  • (Admin::Result<TopicResult>)

    Response from the cluster with details about the creation of the list of topics or any errors.

Raises:



446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
# File 'lib/kafka/ffi/client.rb', line 446

def create_topics(topics, options: nil, timeout: 5000)
  topics = Array(topics)

  # CreateTopic wants an array of topics
  list = ::FFI::MemoryPointer.new(:pointer, topics.length)
  list.write_array_of_pointer(topics.map(&:pointer))

  queue = ::Kafka::FFI::Queue.new(self)

  ::Kafka::FFI.rd_kafka_CreateTopics(self, list, topics.length, options, queue)

  event = queue.poll(timeout: timeout)
  if event
    ::Kafka::FFI::Admin::Result.new(event)
  end
ensure
  list.free
  queue.destroy
end

#default_topic_conf_dupTopicConfig

Create a copy of the Client's default topic configuration object. The caller is now responsible for ownership of the new config.

Returns:



384
385
386
# File 'lib/kafka/ffi/client.rb', line 384

def default_topic_conf_dup
  ::Kafka::FFI.rd_kafka_default_topic_conf_dup(self)
end

#delete_topics(topics, options: nil, timeout: 5000) ⇒ nil, Array<TopicResult>

Delete a list of Topics from the cluster.

Application is responsible for calling #destroy on the returned results when done with the results.

Parameters:

  • topics (DeleteTopic)

    List of topics to delete

  • timeout (Integer) (defaults to: 5000)

    Time to wait in milliseconds for the deletion to complete.

Returns:

  • (nil)

    Delete timed out

  • (Array<TopicResult>)

    Response from the cluster with details about the deletion of the list of topics or any errors.



479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
# File 'lib/kafka/ffi/client.rb', line 479

def delete_topics(topics, options: nil, timeout: 5000)
  topics = Array(topics)

  # DeleteTopics wants an array of topics
  list = ::FFI::MemoryPointer.new(:pointer, topics.length)
  list.write_array_of_pointer(topics.map(&:pointer))

  queue = ::Kafka::FFI::Queue.new(self)

  ::Kafka::FFI.rd_kafka_DeleteTopics(self, list, topics.length, options, queue)

  event = queue.poll(timeout: timeout)
  if event
    ::Kafka::FFI::Admin::Result.new(event)
  end
ensure
  list.free
  queue.destroy
end

#describe_configs(resources, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::ConfigResource>

Get configuration for the specified resources.

Application is responsible for calling #destroy on the returned results when done with the results.

Parameters:

  • resources (Admin::ConfigResource)

    Resource to request configuration details for.

  • resources (Array<Admin::ConfigResource>)

    List of resources to get config details for.

  • options (Admin::AdminOptions) (defaults to: nil)

    Admin API request options

  • timeout (Integer) (defaults to: 5000)

    Time to wait in milliseconds for request to complete.

Returns:

See Also:

  • rd_kafka_DescribeConfigs


603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
# File 'lib/kafka/ffi/client.rb', line 603

def describe_configs(resources, options: nil, timeout: 5000)
  resources = Array(resources)

  # DescribeConfigs wants an array of Admin::ConfigResource pointers
  list = ::FFI::MemoryPointer.new(:pointer, resources.length)
  list.write_array_of_pointer(resources.map(&:pointer))

  # Queue to receive the result
  queue = ::Kafka::FFI::Queue.new(self)

  ::Kafka::FFI.rd_kafka_DescribeConfigs(self, list, resources.length, options, queue)

  event = queue.poll(timeout: timeout)
  if event
    ::Kafka::FFI::Admin::Result.new(event)
  end
ensure
  list.free
  queue.destroy
end

#destroyObject

Release all of the resources used by this Client. This may block until the instance has finished it's shutdown procedure. Always make sure to destory any associated resources and cleanly shutting down the instance before calling destroy.



628
629
630
631
632
633
634
635
636
637
638
# File 'lib/kafka/ffi/client.rb', line 628

def destroy
  if !pointer.null?
    # Clean up any cached topics before destroying the Client.
    @topics.each do |_, topic|
      ::Kafka::FFI.rd_kafka_topic_destroy(topic)
    end
    @topics.clear

    ::Kafka::FFI.rd_kafka_destroy(self)
  end
end

#get_background_queueQueue?

Note:

The returned Queue must not be polled, forwarded, or otherwise manage by the application. It may only be used as the destination queue passed to queue-enabled APIs.

Note:

The caller must call #destroy on the Queue when finished with it

Get a reference to the background thread queue. The background queue is automatically polled by librdkafka and is fully managed internally.

Returns:

  • (Queue)

    Background queue

  • (nil)

    Background queue is disabled



265
266
267
# File 'lib/kafka/ffi/client.rb', line 265

def get_background_queue
  ::Kafka::FFI.rd_kafka_queue_get_background(self)
end

#get_main_queueQueue

Note:

Application must call #destroy on this queue when finished.

Get a reference to the main librdkafka event queue. This is the queue that is served by rd_kafka_poll.

Returns:

  • (Queue)

    Main client Event queue



250
251
252
# File 'lib/kafka/ffi/client.rb', line 250

def get_main_queue
  ::Kafka::FFI.rd_kafka_queue_get_main(self)
end

#group_list(group: nil, timeout: 1000) ⇒ Kafka::FFI::GroupList

Note:

Application must call #destroy to release the list when done

List and describe client groups in the cluster.

Returns:

Raises:



367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/kafka/ffi/client.rb', line 367

def group_list(group: nil, timeout: 1000)
  list = ::FFI::MemoryPointer.new(:pointer)

  err = ::Kafka::FFI.rd_kafka_list_groups(self, group, list, timeout)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  GroupList.new(list.read_pointer)
ensure
  list.free
end

#metadata(local_only: false, topic: nil, timeout: 1000) ⇒ Metadata

Retrieve metadata from the Kafka cluster

Parameters:

  • local_only (Boolean) (defaults to: false)

    Only request info about locally known topics, don't query all topics in the cluster.

  • topic (String, Topic) (defaults to: nil)

    Only request info about this topic.

  • timeout (Integer) (defaults to: 1000)

    Request timeout in milliseconds

Returns:

  • (Metadata)

    Details about the state of the cluster.

Raises:



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/kafka/ffi/client.rb', line 341

def metadata(local_only: false, topic: nil, timeout: 1000)
  ptr = ::FFI::MemoryPointer.new(:pointer)

  # Need to use a Topic reference if asking for only information about a
  # single topic.
  if topic.is_a?(String)
    topic = self.topic(topic)
  end

  err = ::Kafka::FFI.rd_kafka_metadata(self, local_only, topic, ptr, timeout)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  Kafka::FFI::Metadata.new(ptr.read_pointer)
ensure
  ptr.free
end

#nameString

Retrieve the Kafka handle name.

Returns:

  • (String)

    handle / client name



118
119
120
# File 'lib/kafka/ffi/client.rb', line 118

def name
  ::Kafka::FFI.rd_kafka_name(self)
end

#offsets_for_times(list, timeout: 1000) ⇒ TopicPartitionList

Look up the offsets for the given partition by timestamp. The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the timestamp set in the TopicPartitionList.

Parameters:

  • list (TopicPartitionList)

    List of TopicPartitions to fetch offsets for. The TopicPartitions in the list will be modified based on the results of the query.

Returns:

Raises:



318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/kafka/ffi/client.rb', line 318

def offsets_for_times(list, timeout: 1000)
  if list.nil?
    raise ArgumentError, "list cannot be nil"
  end

  err = ::Kafka::FFI.rd_kafka_offsets_for_times(self, list, timeout)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  list
end

#outq_lenInteger Also known as: out_queue_len

Returns the current length of the outbound queue. This is the sum of several factors including outbound messages, pending callbacks, waiting acknowledgements, etc...

An application should wait for the return value to reach 0 before terminating to make sure outstanding messages, requests, callbacks, and events are fully processed.

Returns:

  • (Integer)

    Number of outbound items still pending



397
398
399
# File 'lib/kafka/ffi/client.rb', line 397

def outq_len
  ::Kafka::FFI.rd_kafka_outq_len(self)
end

#pause_partitions(list) ⇒ TopicPartitionList

Pause producing or consuming of the provided list of partitions. The list is updated to include any errors.

Parameters:

Returns:

  • (TopicPartitionList)

    List of partitions with errors set for any of the TopicPartitions that failed.

Raises:



216
217
218
219
220
221
222
223
# File 'lib/kafka/ffi/client.rb', line 216

def pause_partitions(list)
  err = ::Kafka::FFI.rd_kafka_pause_partitions(self, list)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  list
end

#poll(timeout: 250) ⇒ Integer

Note:

Do not call in a Consumer after poll_set_consumer has been called.

Polls for events on the the Client, causing callbacks to be fired. This is used by both the Producer and Consumer to ensure callbacks are processed in a timely manor.

-1 - Wait indefinately for an event.

Parameters:

  • timeout (Integer) (defaults to: 250)

    Time in milliseconds to wait for an event. 0 - Non-blocking call, returning immediately if there are no events.

Returns:

  • (Integer)

    Number of events served



203
204
205
# File 'lib/kafka/ffi/client.rb', line 203

def poll(timeout: 250)
  ::Kafka::FFI.rd_kafka_poll(self, timeout)
end

#query_watermark_offsets(topic, partition, timeout: 1000) ⇒ Range

Query the broker for the oldest and newest offsets for the partition.

Parameters:

  • topic (String)

    Name of the topic to get offsets for

  • partition (int)

    Partition of the topic to get offsets for

Returns:

  • (Range)

    Range of known offsets

Raises:



295
296
297
298
299
300
301
302
303
304
305
# File 'lib/kafka/ffi/client.rb', line 295

def query_watermark_offsets(topic, partition, timeout: 1000)
  low  = ::FFI::MemoryPointer.new(:int64)
  high = ::FFI::MemoryPointer.new(:int64)

  err = ::Kafka::FFI.rd_kafka_query_watermark_offsets(self, topic, partition, low, high, timeout)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  Range.new(low.read_int64, high.read_int64, false)
end

#resume_partitions(list) ⇒ TopicPartitionList

Resume producing or consuming of the provided list of partitions.

Parameters:

Returns:

  • (TopicPartitionList)

    List of partitions with errors set for any of the TopicPartitions that failed.

Raises:



233
234
235
236
237
238
239
240
# File 'lib/kafka/ffi/client.rb', line 233

def resume_partitions(list)
  err = ::Kafka::FFI.rd_kafka_resume_partitions(self, list)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  list
end

#set_log_queue(dest) ⇒ Object

Forward librdkafka and debug logs to the specified queue. This allows the application to serve logg callbacks in its thread of choice.

Parameters:

  • dest (Queue)

    Destination Queue for logs

  • dest (nil)

    Forward logs to the Client's main queue

Raises:



276
277
278
279
280
281
282
283
# File 'lib/kafka/ffi/client.rb', line 276

def set_log_queue(dest)
  err = ::Kafka::FFI.rd_kafka_set_log_queue(self, dest)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end

#topic(name, config = nil) ⇒ Topic

Note:

The returned Topic is owned by the Client and will be destroyed when the Client is destroyed.

Create or fetch the Topic with the given name. The first time topic is called for a given name, a configuration can be passed for the topic.

Parameters:

  • name (String)

    Name of the topic

  • config (TopicConfig, nil) (defaults to: nil)

    Config options for the topic. This can only be passed for the first call of topic per topic name since a Topic can only be configured at creation.

Returns:

  • (Topic)

    Topic instance error.

Raises:



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/kafka/ffi/client.rb', line 169

def topic(name, config = nil)
  topic = @topics[name]
  if topic
    if config
      # Make this an exception because it's probably a programmer error
      # that _should_ only primarily happen during development due to
      # misunderstanding the semantics.
      raise TopicAlreadyConfigured, "#{name} was already configured"
    end

    return topic
  end

  # @todo - Keep list of topics and manage their lifecycle?
  topic = ::Kafka::FFI.rd_kafka_topic_new(self, name, config)
  if topic.nil?
    raise ::Kafka::ResponseError, ::Kafka::FFI.rd_kafka_last_error
  end

  @topics[name] = topic
  topic
end