Class: Kafka::FFI::Admin::NewTopic
- Inherits:
-
OpaquePointer
- Object
- OpaquePointer
- Kafka::FFI::Admin::NewTopic
- Defined in:
- lib/kafka/ffi/admin/new_topic.rb
Instance Attribute Summary
Attributes inherited from OpaquePointer
Class Method Summary collapse
-
.new(name, partitions, replication_factor) ⇒ NewTopic
Create a new NewTopic for passing to Admin::Client#create_topics.
Instance Method Summary collapse
-
#destroy ⇒ Object
Release the memory held by NewTopic back to the system.
-
#set_config(name, value) ⇒ Object
Set the broker side topic configuration name/value pair.
-
#set_replica_assignment(partition, broker_ids) ⇒ Object
Set the broker assignment for partition to the replica set in broker_ids.
Methods inherited from OpaquePointer
by_ref, from_native, inherited, #initialize, to_native
Constructor Details
This class inherits a constructor from Kafka::FFI::OpaquePointer
Class Method Details
.new(name, partitions, replication_factor) ⇒ NewTopic
Create a new NewTopic for passing to Admin::Client#create_topics. It is the application's responsiblity to call #destroy when done with the object.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/kafka/ffi/admin/new_topic.rb', line 23 def self.new(name, partitions, replication_factor) # Allocate memory for the error message error = ::FFI::MemoryPointer.new(:char, 512) if name.nil? || name.empty? raise ArgumentError, " name is required and cannot be blank" end obj = ::Kafka::FFI.rd_kafka_NewTopic_new(name, partitions, replication_factor, error, error.size) if obj.nil? raise ArgumentError, error.read_string end obj ensure error.free end |
Instance Method Details
#destroy ⇒ Object
Release the memory held by NewTopic back to the system. This must be called by the application when it is done with the object.
85 86 87 88 89 |
# File 'lib/kafka/ffi/admin/new_topic.rb', line 85 def destroy if !pointer.null? ::Kafka::FFI.rd_kafka_NewTopic_destroy(self) end end |
#set_config(name, value) ⇒ Object
Set the broker side topic configuration name/value pair.
74 75 76 77 78 79 80 81 |
# File 'lib/kafka/ffi/admin/new_topic.rb', line 74 def set_config(name, value) err = ::Kafka::FFI.rd_kafka_NewTopic_set_config(self, name, value) if err != :ok raise ::Kafka::ResponseError, err end nil end |
#set_replica_assignment(partition, broker_ids) ⇒ Object
If called, must be call consecutively for each partition, starting at 0.
new must have been called with replication_factor of -1
Set the broker assignment for partition to the replica set in broker_ids.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/kafka/ffi/admin/new_topic.rb', line 52 def set_replica_assignment(partition, broker_ids) broker_ids = Array(broker_ids) brokers = ::FFI::MemoryPointer.new(:int32, broker_ids.size) error = ::FFI::MemoryPointer.new(:char, 512) brokers.write_array_of_int32(broker_ids) err = ::Kafka::FFI.rd_kafka_NewTopic_set_replica_assignment(self, partition, brokers, broker_ids.size, error, error.size) if err != :ok raise ::Kafka::ResponseError, err end nil ensure error.free brokers.free end |