Class: Kafka::Admin

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/admin.rb

Overview

Admin provides a client for accessing the rdkafka Admin API to make changes to the cluster. The API provides was to create topics, delete topics, add new partitions for a topic, and manage configs

Instance Method Summary collapse

Constructor Details

#initialize(config = nil) ⇒ Admin

Create a new Admin client for accessing the librdkafka Admin API.

Parameters:



14
15
16
17
# File 'lib/kafka/admin.rb', line 14

def initialize(config = nil)
  # Wrap a Producer since it appears to allocate the fewest resources.
  @client = ::Kafka::FFI::Producer.new(config)
end

Instance Method Details

#create_topic(name, partitions, replication_factor, wait: true, validate: false, timeout: nil) ⇒ nil, TopicResult

Create a topic with the given name, number of partitions, and number of replicas per partition (replication factor). Total number of partitions will be partitions x replication_factor.

Parameters:

  • name (String)

    Name of the topic to create

  • partitions (Integer)

    Number of partitions the topic will have

  • replication_factor (Integer)

    Number of replicas per partition to have in the cluster.

  • wait (Boolean) (defaults to: true)

    Wait up to timeout milliseconds for topic creation to propogate to the cluster before returning.

  • validate (Boolean) (defaults to: false)

    Only validate the request

  • timeout (Integer) (defaults to: nil)

    Time to wait in milliseconds for each operation to complete. Total request execution time may be longer than timeout due to multiple operations being done. Defaults to socket.timeout.ms config setting.

Returns:

  • (nil)

    Create timed out

  • (TopicResult)

    Response from the cluster with details about if the topic was created or any errors.



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/kafka/admin.rb', line 39

def create_topic(name, partitions, replication_factor, wait: true, validate: false, timeout: nil)
  req = ::Kafka::FFI::Admin::NewTopic.new(name, partitions, replication_factor)
  opts = new_options(:create_topics, wait: wait, validate: validate, timeout: timeout)

  res = @client.create_topics(req, options: opts)
  if res
    res[0]
  end
ensure
  opts.destroy
  req.destroy
end

#delete_topic(name, wait: true, validate: false, timeout: nil) ⇒ nil, TopicResult

Delete the topic with the given name

Parameters:

  • name (String)

    Name of the topic to delete

  • wait (Boolean) (defaults to: true)

    Wait up to timeout milliseconds for topic creation to propogate to the cluster before returning.

  • validate (Boolean) (defaults to: false)

    Only validate the request

  • timeout (Integer) (defaults to: nil)

    Time to wait in milliseconds for each operation to complete. Total request execution time may be longer than timeout due to multiple operations being done. Defaults to socket.timeout.ms config setting.

Returns:

  • (nil)

    Delete timed out

  • (TopicResult)

    Response from the cluster with details about the deletion or any errors.



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/kafka/admin.rb', line 67

def delete_topic(name, wait: true, validate: false, timeout: nil)
  req = ::Kafka::FFI::Admin::DeleteTopic.new(name)
  opts = new_options(:create_topics, wait: wait, validate: validate, timeout: timeout)

  res = @client.delete_topics(req, options: opts)
  if res
    res[0]
  end
ensure
  opts.destroy
  req.destroy
end

#describe_config(type, name, wait: true, validate: false, timeout: nil) ⇒ ConfigResource

Get current config settings for the resource.

Examples:

Get configuration for a topic

describe_config(:topic, "events")

Parameters:

  • type (:broker, :topic, :group)

    Type of resource

  • name (String)

    Name of the resource

Returns:

  • (ConfigResource)


89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/kafka/admin.rb', line 89

def describe_config(type, name, wait: true, validate: false, timeout: nil)
  req = ::Kafka::FFI::Admin::ConfigResource.new(type, name)
  opts = new_options(:create_topics, wait: wait, validate: validate, timeout: timeout)

  res = @client.describe_configs(req, options: opts)
  if res
    res[0]
  end
ensure
  opts.destroy
  req.destroy
end

#destroyObject Also known as: close

Destroy the Client, releasing all used resources back to the system. It is the application's responsbility to call #destroy when done with the client.



114
115
116
# File 'lib/kafka/admin.rb', line 114

def destroy
  @client.destroy
end

#metadata(local_only: false, topic: nil, timeout: 1000) ⇒ Metadata

Retrieve metadata for the cluster

Returns:

  • (Metadata)

See Also:



107
108
109
# File 'lib/kafka/admin.rb', line 107

def metadata(local_only: false, topic: nil, timeout: 1000)
  @client.metadata(local_only: local_only, topic: topic, timeout: timeout)
end