Class: Kafka::FFI::Client
- Inherits:
-
OpaquePointer
- Object
- OpaquePointer
- Kafka::FFI::Client
- Defined in:
- lib/kafka/ffi/client.rb
Overview
Naming this is hard and librdkafka primarily just refers to it as "a handle" to an instance. It's more akin to an internal service and this Client talks the API to that service.
Client is a handle to a configured librdkafka instance that begins operation once created. Client is an abstract class and will provide either a Consumer or Producer based on the type being created. Each Client instance can either produce or consume messages to / from topics and cannot do both.
Instance Attribute Summary
Attributes inherited from OpaquePointer
Class Method Summary collapse
- .from_native(ptr, _ctx) ⇒ Object
-
.new(type, config = nil) ⇒ Consumer, Producer
Create a new Client of type with the given configuration.
Instance Method Summary collapse
-
#alter_configs(resources, options: nil, timeout: 5000) ⇒ nil, Array<Admin::ConfigResource>
Update the configuration for the specified resources.
-
#brokers_add(brokers) ⇒ Integer
Adds one or more brokers to the Client's list of initial bootstrap brokers.
-
#cluster_id ⇒ Object
Retrieves the Client's Cluster ID.
-
#config ⇒ Object
Retrive the current configuration used by Client.
-
#controller_id(timeout: 1000) ⇒ Integer
Retrieves the current Controller ID as reported by broker metadata.
-
#create_partitions(requests, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::TopicResult>
Create additional partition(s) for a topic on the cluster.
-
#create_topics(topics, options: nil, timeout: 5000) ⇒ nil, Admin::Result<TopicResult>
Create topics in the cluster with the given configuration.
-
#default_topic_conf_dup ⇒ TopicConfig
Create a copy of the Client's default topic configuration object.
-
#delete_topics(topics, options: nil, timeout: 5000) ⇒ nil, Array<TopicResult>
Delete a list of Topics from the cluster.
-
#describe_configs(resources, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::ConfigResource>
Get configuration for the specified resources.
-
#destroy ⇒ Object
Release all of the resources used by this Client.
-
#get_background_queue ⇒ Queue?
Get a reference to the background thread queue.
-
#get_main_queue ⇒ Queue
Get a reference to the main librdkafka event queue.
-
#group_list(group: nil, timeout: 1000) ⇒ Kafka::FFI::GroupList
List and describe client groups in the cluster.
-
#initialize(ptr) ⇒ Client
constructor
A new instance of Client.
-
#metadata(local_only: false, topic: nil, timeout: 1000) ⇒ Metadata
Retrieve metadata from the Kafka cluster.
-
#name ⇒ String
Retrieve the Kafka handle name.
-
#offsets_for_times(list, timeout: 1000) ⇒ TopicPartitionList
Look up the offsets for the given partition by timestamp.
-
#outq_len ⇒ Integer
(also: #out_queue_len)
Returns the current length of the outbound queue.
-
#pause_partitions(list) ⇒ TopicPartitionList
Pause producing or consuming of the provided list of partitions.
-
#poll(timeout: 250) ⇒ Integer
Polls for events on the the Client, causing callbacks to be fired.
-
#query_watermark_offsets(topic, partition, timeout: 1000) ⇒ Range
Query the broker for the oldest and newest offsets for the partition.
-
#resume_partitions(list) ⇒ TopicPartitionList
Resume producing or consuming of the provided list of partitions.
-
#set_log_queue(dest) ⇒ Object
Forward librdkafka and debug logs to the specified queue.
-
#topic(name, config = nil) ⇒ Topic
Create or fetch the Topic with the given name.
Methods inherited from OpaquePointer
Constructor Details
#initialize(ptr) ⇒ Client
Returns a new instance of Client.
101 102 103 104 105 |
# File 'lib/kafka/ffi/client.rb', line 101 def initialize(ptr) super(ptr) @topics = {} end |
Class Method Details
.from_native(ptr, _ctx) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/kafka/ffi/client.rb', line 69 def self.from_native(ptr, _ctx) if !ptr.is_a?(::FFI::Pointer) raise TypeError, "from_native can only convert from a ::FFI::Pointer to #{self}" end # Converting from a null pointer should return nil. Likely this was # caused by rd_kafka_new returning an error and a NULL pointer for the # Client. if ptr.null? return nil end # Build a temporary Client to pass to rd_kafka_type. There is a bit of a # chicken and egg problem here. We can't create the final class until # after we know the type. But for type safety we want to pass a Client. cfg = allocate cfg.send(:initialize, ptr) type = ::Kafka::FFI.rd_kafka_type(cfg) klass = case type when :producer then Producer when :consumer then Consumer else raise ArgumentError, "unknown Kafka client type: #{type}" end client = klass.allocate client.send(:initialize, ptr) client end |
.new(type, config = nil) ⇒ Consumer, Producer
Create a new Client of type with the given configuration.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/kafka/ffi/client.rb', line 34 def self.new(type, config = nil) error = ::FFI::MemoryPointer.new(:char, 512) # Convenience for passing in a Kafka::Config instead of building a # Kafka::FFI::Config since Kafka::Config provides a way to create a # config from a Hash. config = case config when Config, nil then config when ::Kafka::Config then config.to_ffi when Hash then ::Kafka::Config.new(config).to_ffi else raise ArgumentError, "config must be on of nil, Config, ::Kafka::Config, or Hash" end client = Kafka::FFI.rd_kafka_new(type, config, error, error.size) if client.nil? raise Error, error.read_string end if config # Store a reference to the config on the Client instance. We do this to # tie the Config's lifecycle to the Client instance in Ruby since they # are already tied in librdkafka. This ensures that any Ruby objects # referenced in the config (like callbacks) are not garbage collected. # # Using instance_variable_set to avoid exposing an API method that # could cause confusion from end users since the config cannot be # changed after initialization. client.instance_variable_set(:@config, config) end client end |
Instance Method Details
#alter_configs(resources, options: nil, timeout: 5000) ⇒ nil, Array<Admin::ConfigResource>
AlterConfigs will replace all existing configuration for the given resources, reverting all unspecified config options to their default values.
At most one :broker type ConfigResource can be specified per call to alter_configs since the changes must be sent to the broker specified in the resource.
Update the configuration for the specified resources. Updates may succeed for a subset of the provided resources while others fail. The configuration for a particular resource is update atomically, replacing values using the provided ConfigResource (set via set_config) and reverting any unspecified config options to their default values.
Application is responsible for calling #destroy on the returned results when done with the results.
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 |
# File 'lib/kafka/ffi/client.rb', line 564 def alter_configs(resources, options: nil, timeout: 5000) resources = Array(resources) # NewPartitions wants an array of Admin::ConfigResource list = ::FFI::MemoryPointer.new(:pointer, resources.length) list.write_array_of_pointer(resources.map(&:pointer)) # Queue to receive the result queue = ::Kafka::FFI::Queue.new(self) ::Kafka::FFI.rd_kafka_AlterConfigs(self, list, resources.length, options, queue) event = queue.poll(timeout: timeout) if event ::Kafka::FFI::Admin::Result.new(event) end ensure list.free queue.destroy end |
#brokers_add(brokers) ⇒ Integer
It is preferred to set brokers through the metadata.broker.list
or bootstrap.servers
config options.
Adds one or more brokers to the Client's list of initial bootstrap brokers. Additionaly brokers will be discovered automatically once the Client connects to a broker by querying the broker metadata.
421 422 423 424 425 426 427 |
# File 'lib/kafka/ffi/client.rb', line 421 def brokers_add(brokers) if brokers.is_a?(Array) brokers = brokers.join(",") end ::Kafka::FFI.rd_kafka_brokers_add(self, brokers) end |
#cluster_id ⇒ Object
requires config api.version.request
set to true
Retrieves the Client's Cluster ID
125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/kafka/ffi/client.rb', line 125 def cluster_id id = ::Kafka::FFI.rd_kafka_clusterid(self) if id.null? return nil end id.read_string ensure if !id.null? ::Kafka::FFI.rd_kafka_mem_free(self, id) end end |
#config ⇒ Object
The returned config is read-only and tied to the lifetime of the Client. Don't try to modify or destroy the config.
Retrive the current configuration used by Client.
111 112 113 |
# File 'lib/kafka/ffi/client.rb', line 111 def config ::Kafka::FFI.rd_kafka_conf(self) end |
#controller_id(timeout: 1000) ⇒ Integer
requires config api.version.request
set to true
Retrieves the current Controller ID as reported by broker metadata.
148 149 150 |
# File 'lib/kafka/ffi/client.rb', line 148 def controller_id(timeout: 1000) ::Kafka::FFI.rd_kafka_controllerid(self, timeout) end |
#create_partitions(requests, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::TopicResult>
Create additional partition(s) for a topic on the cluster.
Application is responsible for calling #destroy on the returned results when done with the results.
and possibly broker assignments for those partitions.
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 |
# File 'lib/kafka/ffi/client.rb', line 514 def create_partitions(requests, options: nil, timeout: 5000) requests = Array(requests) # NewPartitions wants an array of Admin::NewPartitions list = ::FFI::MemoryPointer.new(:pointer, requests.length) list.write_array_of_pointer(requests.map(&:pointer)) # Queue to receive the result queue = ::Kafka::FFI::Queue.new(self) ::Kafka::FFI.rd_kafka_CreatePartitions(self, list, requests.length, options, queue) event = queue.poll(timeout: timeout) if event ::Kafka::FFI::Admin::Result.new(event) end ensure list.free queue.destroy end |
#create_topics(topics, options: nil, timeout: 5000) ⇒ nil, Admin::Result<TopicResult>
Create topics in the cluster with the given configuration.
Application is responsible for calling #destroy on the returned results when done with the results.
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 |
# File 'lib/kafka/ffi/client.rb', line 446 def create_topics(topics, options: nil, timeout: 5000) topics = Array(topics) # CreateTopic wants an array of topics list = ::FFI::MemoryPointer.new(:pointer, topics.length) list.write_array_of_pointer(topics.map(&:pointer)) queue = ::Kafka::FFI::Queue.new(self) ::Kafka::FFI.rd_kafka_CreateTopics(self, list, topics.length, options, queue) event = queue.poll(timeout: timeout) if event ::Kafka::FFI::Admin::Result.new(event) end ensure list.free queue.destroy end |
#default_topic_conf_dup ⇒ TopicConfig
Create a copy of the Client's default topic configuration object. The caller is now responsible for ownership of the new config.
384 385 386 |
# File 'lib/kafka/ffi/client.rb', line 384 def default_topic_conf_dup ::Kafka::FFI.rd_kafka_default_topic_conf_dup(self) end |
#delete_topics(topics, options: nil, timeout: 5000) ⇒ nil, Array<TopicResult>
Delete a list of Topics from the cluster.
Application is responsible for calling #destroy on the returned results when done with the results.
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 |
# File 'lib/kafka/ffi/client.rb', line 479 def delete_topics(topics, options: nil, timeout: 5000) topics = Array(topics) # DeleteTopics wants an array of topics list = ::FFI::MemoryPointer.new(:pointer, topics.length) list.write_array_of_pointer(topics.map(&:pointer)) queue = ::Kafka::FFI::Queue.new(self) ::Kafka::FFI.rd_kafka_DeleteTopics(self, list, topics.length, options, queue) event = queue.poll(timeout: timeout) if event ::Kafka::FFI::Admin::Result.new(event) end ensure list.free queue.destroy end |
#describe_configs(resources, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::ConfigResource>
Get configuration for the specified resources.
Application is responsible for calling #destroy on the returned results when done with the results.
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 |
# File 'lib/kafka/ffi/client.rb', line 603 def describe_configs(resources, options: nil, timeout: 5000) resources = Array(resources) # DescribeConfigs wants an array of Admin::ConfigResource pointers list = ::FFI::MemoryPointer.new(:pointer, resources.length) list.write_array_of_pointer(resources.map(&:pointer)) # Queue to receive the result queue = ::Kafka::FFI::Queue.new(self) ::Kafka::FFI.rd_kafka_DescribeConfigs(self, list, resources.length, options, queue) event = queue.poll(timeout: timeout) if event ::Kafka::FFI::Admin::Result.new(event) end ensure list.free queue.destroy end |
#destroy ⇒ Object
Release all of the resources used by this Client. This may block until the instance has finished it's shutdown procedure. Always make sure to destory any associated resources and cleanly shutting down the instance before calling destroy.
628 629 630 631 632 633 634 635 636 637 638 |
# File 'lib/kafka/ffi/client.rb', line 628 def destroy if !pointer.null? # Clean up any cached topics before destroying the Client. @topics.each do |_, topic| ::Kafka::FFI.rd_kafka_topic_destroy(topic) end @topics.clear ::Kafka::FFI.rd_kafka_destroy(self) end end |
#get_background_queue ⇒ Queue?
The returned Queue must not be polled, forwarded, or otherwise manage by the application. It may only be used as the destination queue passed to queue-enabled APIs.
The caller must call #destroy on the Queue when finished with it
Get a reference to the background thread queue. The background queue is automatically polled by librdkafka and is fully managed internally.
265 266 267 |
# File 'lib/kafka/ffi/client.rb', line 265 def get_background_queue ::Kafka::FFI.rd_kafka_queue_get_background(self) end |
#get_main_queue ⇒ Queue
Application must call #destroy on this queue when finished.
Get a reference to the main librdkafka event queue. This is the queue that is served by rd_kafka_poll.
250 251 252 |
# File 'lib/kafka/ffi/client.rb', line 250 def get_main_queue ::Kafka::FFI.rd_kafka_queue_get_main(self) end |
#group_list(group: nil, timeout: 1000) ⇒ Kafka::FFI::GroupList
Application must call #destroy to release the list when done
List and describe client groups in the cluster.
367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/kafka/ffi/client.rb', line 367 def group_list(group: nil, timeout: 1000) list = ::FFI::MemoryPointer.new(:pointer) err = ::Kafka::FFI.rd_kafka_list_groups(self, group, list, timeout) if err != :ok raise ::Kafka::ResponseError, err end GroupList.new(list.read_pointer) ensure list.free end |
#metadata(local_only: false, topic: nil, timeout: 1000) ⇒ Metadata
Retrieve metadata from the Kafka cluster
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/kafka/ffi/client.rb', line 341 def metadata(local_only: false, topic: nil, timeout: 1000) ptr = ::FFI::MemoryPointer.new(:pointer) # Need to use a Topic reference if asking for only information about a # single topic. if topic.is_a?(String) topic = self.topic(topic) end err = ::Kafka::FFI.rd_kafka_metadata(self, local_only, topic, ptr, timeout) if err != :ok raise ::Kafka::ResponseError, err end Kafka::FFI::Metadata.new(ptr.read_pointer) ensure ptr.free end |
#name ⇒ String
Retrieve the Kafka handle name.
118 119 120 |
# File 'lib/kafka/ffi/client.rb', line 118 def name ::Kafka::FFI.rd_kafka_name(self) end |
#offsets_for_times(list, timeout: 1000) ⇒ TopicPartitionList
Look up the offsets for the given partition by timestamp. The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the timestamp set in the TopicPartitionList.
318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/kafka/ffi/client.rb', line 318 def offsets_for_times(list, timeout: 1000) if list.nil? raise ArgumentError, "list cannot be nil" end err = ::Kafka::FFI.rd_kafka_offsets_for_times(self, list, timeout) if err != :ok raise ::Kafka::ResponseError, err end list end |
#outq_len ⇒ Integer Also known as: out_queue_len
Returns the current length of the outbound queue. This is the sum of several factors including outbound messages, pending callbacks, waiting acknowledgements, etc...
An application should wait for the return value to reach 0 before terminating to make sure outstanding messages, requests, callbacks, and events are fully processed.
397 398 399 |
# File 'lib/kafka/ffi/client.rb', line 397 def outq_len ::Kafka::FFI.rd_kafka_outq_len(self) end |
#pause_partitions(list) ⇒ TopicPartitionList
Pause producing or consuming of the provided list of partitions. The list is updated to include any errors.
216 217 218 219 220 221 222 223 |
# File 'lib/kafka/ffi/client.rb', line 216 def pause_partitions(list) err = ::Kafka::FFI.rd_kafka_pause_partitions(self, list) if err != :ok raise ::Kafka::ResponseError, err end list end |
#poll(timeout: 250) ⇒ Integer
Do not call in a Consumer after poll_set_consumer has been called.
Polls for events on the the Client, causing callbacks to be fired. This is used by both the Producer and Consumer to ensure callbacks are processed in a timely manor.
-1 - Wait indefinately for an event.
203 204 205 |
# File 'lib/kafka/ffi/client.rb', line 203 def poll(timeout: 250) ::Kafka::FFI.rd_kafka_poll(self, timeout) end |
#query_watermark_offsets(topic, partition, timeout: 1000) ⇒ Range
Query the broker for the oldest and newest offsets for the partition.
295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/kafka/ffi/client.rb', line 295 def query_watermark_offsets(topic, partition, timeout: 1000) low = ::FFI::MemoryPointer.new(:int64) high = ::FFI::MemoryPointer.new(:int64) err = ::Kafka::FFI.rd_kafka_query_watermark_offsets(self, topic, partition, low, high, timeout) if err != :ok raise ::Kafka::ResponseError, err end Range.new(low.read_int64, high.read_int64, false) end |
#resume_partitions(list) ⇒ TopicPartitionList
Resume producing or consuming of the provided list of partitions.
233 234 235 236 237 238 239 240 |
# File 'lib/kafka/ffi/client.rb', line 233 def resume_partitions(list) err = ::Kafka::FFI.rd_kafka_resume_partitions(self, list) if err != :ok raise ::Kafka::ResponseError, err end list end |
#set_log_queue(dest) ⇒ Object
Forward librdkafka and debug logs to the specified queue. This allows the application to serve logg callbacks in its thread of choice.
276 277 278 279 280 281 282 283 |
# File 'lib/kafka/ffi/client.rb', line 276 def set_log_queue(dest) err = ::Kafka::FFI.rd_kafka_set_log_queue(self, dest) if err != :ok raise ::Kafka::ResponseError, err end nil end |
#topic(name, config = nil) ⇒ Topic
The returned Topic is owned by the Client and will be destroyed when the Client is destroyed.
Create or fetch the Topic with the given name. The first time topic is called for a given name, a configuration can be passed for the topic.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/kafka/ffi/client.rb', line 169 def topic(name, config = nil) topic = @topics[name] if topic if config # Make this an exception because it's probably a programmer error # that _should_ only primarily happen during development due to # misunderstanding the semantics. raise TopicAlreadyConfigured, "#{name} was already configured" end return topic end # @todo - Keep list of topics and manage their lifecycle? topic = ::Kafka::FFI.rd_kafka_topic_new(self, name, config) if topic.nil? raise ::Kafka::ResponseError, ::Kafka::FFI.rd_kafka_last_error end @topics[name] = topic topic end |