Class: Kafka::FFI::TopicConfig
- Inherits:
-
OpaquePointer
- Object
- OpaquePointer
- Kafka::FFI::TopicConfig
- Defined in:
- lib/kafka/ffi/topic_config.rb
Overview
TopicConfig can be passed to Topic.new to configure how the client interacts with the Topic.
Instance Attribute Summary
Attributes inherited from OpaquePointer
Class Method Summary collapse
Instance Method Summary collapse
-
#destroy ⇒ Object
Free all resources used by the topic config.
-
#dup ⇒ TopicConfig
Duplicate the current config.
-
#get(key) ⇒ String, :unknown
Get the current config value for the given key.
-
#set(key, value) ⇒ Object
Set the config option at
key
tovalue
. -
#set_partitioner_cb {|topic, key, partition_count| ... } ⇒ Object
Sets a custom partitioner callback that is called for each message to determine which partition to publish the message to.
Methods inherited from OpaquePointer
by_ref, from_native, inherited, #initialize, to_native
Constructor Details
This class inherits a constructor from Kafka::FFI::OpaquePointer
Class Method Details
.new ⇒ Object
10 11 12 |
# File 'lib/kafka/ffi/topic_config.rb', line 10 def self.new Kafka::FFI.rd_kafka_topic_conf_new end |
Instance Method Details
#destroy ⇒ Object
Never call #destroy on a Config that has been passed to Kafka::FFI.rd_kafka_topic_new since the handle will take ownership of the config.
Free all resources used by the topic config.
120 121 122 123 124 |
# File 'lib/kafka/ffi/topic_config.rb', line 120 def destroy if !pointer.null? ::Kafka::FFI.rd_kafka_topic_conf_destroy(self) end end |
#dup ⇒ TopicConfig
Duplicate the current config
80 81 82 |
# File 'lib/kafka/ffi/topic_config.rb', line 80 def dup ::Kafka::FFI.rd_kafka_topic_conf_dup(self) end |
#get(key) ⇒ String, :unknown
Get the current config value for the given key.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/kafka/ffi/topic_config.rb', line 50 def get(key) key = key.to_s # Will contain the size of the value at key size = ::FFI::MemoryPointer.new(:size_t) # Make an initial request for the size of buffer we need to allocate. # When trying to make a guess at the potential size the code would often # segfault due to rd_kafka_conf_get reallocating the buffer. err = ::Kafka::FFI.rd_kafka_topic_conf_get(self, key, ::FFI::Pointer::NULL, size) if err != :ok return err end # Allocate a string long enough to contain the whole value. value = ::FFI::MemoryPointer.new(:char, size.read(:size_t)) err = ::Kafka::FFI.rd_kafka_topic_conf_get(self, key, value, size) if err != :ok return err end value.read_string ensure size.free if size value.free if value end |
#set(key, value) ⇒ Object
Set the config option at key
to value
. The configuration options
match those used by librdkafka (and the Java client).
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/kafka/ffi/topic_config.rb', line 24 def set(key, value) key = key.to_s value = value.to_s error = ::FFI::MemoryPointer.new(:char, 512) result = ::Kafka::FFI.rd_kafka_topic_conf_set(self, key, value, error, error.size) # See config_result enum in ffi.rb case result when :ok nil when :unknown raise Kafka::FFI::UnknownConfigKey.new(key, value, error.read_string) when :invalid raise Kafka::FFI::InvalidConfigValue.new(key, value, error.read_string) end ensure error.free if error end |
#set_partitioner_cb {|topic, key, partition_count| ... } ⇒ Object
Sets a custom partitioner callback that is called for each message to determine which partition to publish the message to.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/kafka/ffi/topic_config.rb', line 99 def set_partitioner_cb if !block_given? raise ArgumentError, "set_partitioner_cb must be called with a block" end # @todo How do we guarantee the block does not get garbage collected? # @todo Support opaque pointers? cb = ::FFI::Function.new(:int, [:pointer, :string, :size_t, :int32, :pointer, :pointer]) do |topic, key, _, partitions, _, _| topic = Topic.new(topic) yield(topic, key, partitions) end ::Kafka::FFI.rd_kafka_topic_conf_set_partitioner_cb(self, cb) end |