Class: Kafka::Admin
- Inherits:
-
Object
- Object
- Kafka::Admin
- Defined in:
- lib/kafka/admin.rb
Overview
Admin provides a client for accessing the rdkafka Admin API to make changes to the cluster. The API provides was to create topics, delete topics, add new partitions for a topic, and manage configs
Instance Method Summary collapse
-
#create_topic(name, partitions, replication_factor, wait: true, validate: false, timeout: nil) ⇒ nil, TopicResult
Create a topic with the given name, number of partitions, and number of replicas per partition (replication factor).
-
#delete_topic(name, wait: true, validate: false, timeout: nil) ⇒ nil, TopicResult
Delete the topic with the given name.
-
#describe_config(type, name, wait: true, validate: false, timeout: nil) ⇒ ConfigResource
Get current config settings for the resource.
-
#destroy ⇒ Object
(also: #close)
Destroy the Client, releasing all used resources back to the system.
-
#initialize(config = nil) ⇒ Admin
constructor
Create a new Admin client for accessing the librdkafka Admin API.
-
#metadata(local_only: false, topic: nil, timeout: 1000) ⇒ Metadata
Retrieve metadata for the cluster.
Constructor Details
#initialize(config = nil) ⇒ Admin
Create a new Admin client for accessing the librdkafka Admin API.
14 15 16 17 |
# File 'lib/kafka/admin.rb', line 14 def initialize(config = nil) # Wrap a Producer since it appears to allocate the fewest resources. @client = ::Kafka::FFI::Producer.new(config) end |
Instance Method Details
#create_topic(name, partitions, replication_factor, wait: true, validate: false, timeout: nil) ⇒ nil, TopicResult
Create a topic with the given name, number of partitions, and number of replicas per partition (replication factor). Total number of partitions will be partitions x replication_factor.
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/kafka/admin.rb', line 39 def create_topic(name, partitions, replication_factor, wait: true, validate: false, timeout: nil) req = ::Kafka::FFI::Admin::NewTopic.new(name, partitions, replication_factor) opts = new_options(:create_topics, wait: wait, validate: validate, timeout: timeout) res = @client.create_topics(req, options: opts) if res res[0] end ensure opts.destroy req.destroy end |
#delete_topic(name, wait: true, validate: false, timeout: nil) ⇒ nil, TopicResult
Delete the topic with the given name
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/kafka/admin.rb', line 67 def delete_topic(name, wait: true, validate: false, timeout: nil) req = ::Kafka::FFI::Admin::DeleteTopic.new(name) opts = new_options(:create_topics, wait: wait, validate: validate, timeout: timeout) res = @client.delete_topics(req, options: opts) if res res[0] end ensure opts.destroy req.destroy end |
#describe_config(type, name, wait: true, validate: false, timeout: nil) ⇒ ConfigResource
Get current config settings for the resource.
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/kafka/admin.rb', line 89 def describe_config(type, name, wait: true, validate: false, timeout: nil) req = ::Kafka::FFI::Admin::ConfigResource.new(type, name) opts = new_options(:create_topics, wait: wait, validate: validate, timeout: timeout) res = @client.describe_configs(req, options: opts) if res res[0] end ensure opts.destroy req.destroy end |
#destroy ⇒ Object Also known as: close
Destroy the Client, releasing all used resources back to the system. It is the application's responsbility to call #destroy when done with the client.
114 115 116 |
# File 'lib/kafka/admin.rb', line 114 def destroy @client.destroy end |
#metadata(local_only: false, topic: nil, timeout: 1000) ⇒ Metadata
Retrieve metadata for the cluster
107 108 109 |
# File 'lib/kafka/admin.rb', line 107 def metadata(local_only: false, topic: nil, timeout: 1000) @client.metadata(local_only: local_only, topic: topic, timeout: timeout) end |