Class: Kafka::FFI::Admin::NewPartitions
- Inherits:
-
OpaquePointer
- Object
- OpaquePointer
- Kafka::FFI::Admin::NewPartitions
- Defined in:
- lib/kafka/ffi/admin/new_partitions.rb
Instance Attribute Summary
Attributes inherited from OpaquePointer
Class Method Summary collapse
-
.new(topic, partition_count) ⇒ Object
Allocates a new NewPartitions request for passing to CreatePartitions to increase the number of partitions for an existing topic.
Instance Method Summary collapse
-
#destroy ⇒ Object
Destroy and free the NewPartitions, releasing it's resources back to the system.
-
#set_replica_assignment(partition_index, broker_ids) ⇒ Object
Assign the partition by index, relative to existing partition count, to be replicated on the set of brokers specified by broker_ids.
Methods inherited from OpaquePointer
by_ref, from_native, inherited, #initialize, to_native
Constructor Details
This class inherits a constructor from Kafka::FFI::OpaquePointer
Class Method Details
.new(topic, partition_count) ⇒ Object
Allocates a new NewPartitions request for passing to CreatePartitions to increase the number of partitions for an existing topic.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/kafka/ffi/admin/new_partitions.rb', line 15 def self.new(topic, partition_count) error = ::FFI::MemoryPointer.new(:char, 512) if topic.nil? || topic.empty? # Check in Ruby as nil will cause a segfault as of 1.3.0 raise ArgumentError, "topic name is required" end req = ::Kafka::FFI.rd_kafka_NewPartitions_new(topic, partition_count, error, error.size) if req.nil? raise ArgumentError, error.read_string end req ensure error.free end |
Instance Method Details
#destroy ⇒ Object
Destroy and free the NewPartitions, releasing it's resources back to the system.
72 73 74 |
# File 'lib/kafka/ffi/admin/new_partitions.rb', line 72 def destroy ::Kafka::FFI.rd_kafka_NewPartitions_destroy(self) end |
#set_replica_assignment(partition_index, broker_ids) ⇒ Object
This MUST either be called for all new partitions or not at all.
Assign the partition by index, relative to existing partition count, to be replicated on the set of brokers specified by broker_ids. If called, this method must be called consecutively for each new partition being created starting with an index of 0.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/kafka/ffi/admin/new_partitions.rb', line 54 def set_replica_assignment(partition_index, broker_ids) error = ::FFI::MemoryPointer.new(:char, 512) broker_list = ::FFI::MemoryPointer.new(:int32, broker_ids.length) broker_list.write_array_of_int32(broker_ids) resp = ::Kafka::FFI.rd_kafka_NewPartitions_set_replica_assignment(self, partition_index, broker_list, broker_ids.length, error, error.size) if resp != :ok raise ::Kafka::ResponseError.new(resp, error.read_string) end nil ensure error.free end |