Class: Kafka::FFI::Config
- Inherits:
-
OpaquePointer
- Object
- OpaquePointer
- Kafka::FFI::Config
- Defined in:
- lib/kafka/ffi/config.rb
Instance Attribute Summary
Attributes inherited from OpaquePointer
Class Method Summary collapse
Instance Method Summary collapse
-
#destroy ⇒ Object
Free all resources used by the config.
-
#dup ⇒ Config
Duplicate the current config.
-
#dup_filter(*filter) ⇒ Object
Duplicate the config but do not copy any config options that match the filtered keys.
-
#get(key) ⇒ String, :unknown
Get the current config value for the given key.
-
#initialize(ptr) ⇒ Config
constructor
A new instance of Config.
-
#set(key, value) ⇒ Object
Set the config option at
key
tovalue
. -
#set_background_event_cb {|client, event, opaque| ... } ⇒ Object
(also: #background_event_cb=)
Set the callback that will be used for events published to the background queue.
- #set_closesocket_cb(&block) ⇒ Object (also: #closesocket_cb=)
- #set_connect_cb(&block) ⇒ Object (also: #connect_cb=)
-
#set_consume_cb {|message, opaque| ... } ⇒ Object
(also: #consume_cb=)
Set consume callback for use with consumer_poll.
-
#set_dr_msg_cb {|client, message, opaque| ... } ⇒ Object
(also: #dr_msg_cb=)
Set delivery report callback for the config.
-
#set_error_cb {|client, error, reason, opaque| ... } ⇒ Object
(also: #error_cb=)
Set error callback that is used by librdkafka to signal warnings and errors back to the application.
-
#set_events(events_mask) ⇒ Object
Enable event sourcing.
-
#set_log_cb {|client, level, facility, message| ... } ⇒ Object
(also: #log_cb=)
Set the logging callback.
- #set_oauthbearer_token_refresh_cb(&block) ⇒ Object (also: #oauthbearer_token_refresh_cb=)
-
#set_offset_commit_cb {|client, error, offets| ... } ⇒ Object
(also: #offset_commit_cb=)
Set offset commit callback which is called when offsets are committed by the consumer.
- #set_open_cb(&block) ⇒ Object (also: #open_cb=)
-
#set_rebalance_cb {|client, error, partitions, opaque| ... } ⇒ Object
(also: #rebalance_cb=)
Set rebalance callback for use with consumer group balancing.
- #set_socket_cb(&block) ⇒ Object (also: #socket_cb=)
-
#set_ssl_cert(cert_type, cert_enc, certificate) ⇒ Object
(also: #ssl_cert=)
Set the certificate for secure communication with the Kafka cluster.
- #set_ssl_cert_verify_cb(&block) ⇒ Object (also: #ssl_cert_verify_cb=)
-
#set_stats_cb {|client, json, json_len, opaque| ... } ⇒ Object
(also: #stats_cb=)
Set statistics callback that is triggered every
statistics.interval.ms
with a JSON document containing connection statistics. -
#set_throttle_cb {|client, broker_name, broker_id, throttle_ms, opaque| ... } ⇒ Object
(also: #throttle_cb=)
Set throttle callback that is used to forward broker throttle times to the application.
Methods inherited from OpaquePointer
by_ref, from_native, inherited, to_native
Constructor Details
#initialize(ptr) ⇒ Config
Returns a new instance of Config.
12 13 14 15 16 17 18 |
# File 'lib/kafka/ffi/config.rb', line 12 def initialize(ptr) super(ptr) # Maintain references to all of the set callbacks to avoid them being # garbage collected. @callbacks = {} end |
Class Method Details
.new ⇒ Object
8 9 10 |
# File 'lib/kafka/ffi/config.rb', line 8 def self.new Kafka::FFI.rd_kafka_conf_new end |
Instance Method Details
#destroy ⇒ Object
Never call #destroy on a Config that has been passed to Kafka::FFI.rd_kafka_new since the handle will take ownership of the config.
Free all resources used by the config.
376 377 378 379 380 |
# File 'lib/kafka/ffi/config.rb', line 376 def destroy if !pointer.null? ::Kafka::FFI.rd_kafka_conf_destroy(self) end end |
#dup ⇒ Config
Duplicate the current config
86 87 88 |
# File 'lib/kafka/ffi/config.rb', line 86 def dup ::Kafka::FFI.rd_kafka_conf_dup(self) end |
#dup_filter(*filter) ⇒ Object
Duplicate the config but do not copy any config options that match the filtered keys.
92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/kafka/ffi/config.rb', line 92 def dup_filter(*filter) ptr = ::FFI::MemoryPointer.new(:pointer, filter.length) ptr.write_array_of_pointer( filter.map { |str| ::FFI::MemoryPointer.from_string(str) }, ) ::Kafka::FFI.rd_kafka_conf_dup_filter(self, filter.length, ptr) ensure ptr.free end |
#get(key) ⇒ String, :unknown
Get the current config value for the given key.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/kafka/ffi/config.rb', line 56 def get(key) key = key.to_s # Will contain the size of the value at key size = ::FFI::MemoryPointer.new(:size_t) # Make an initial request for the size of buffer we need to allocate. # When trying to make a guess at the potential size the code would often # segfault due to rd_kafka_conf_get reallocating the buffer. err = ::Kafka::FFI.rd_kafka_conf_get(self, key, ::FFI::Pointer::NULL, size) if err != :ok return err end # Allocate a string long enough to contain the whole value. value = ::FFI::MemoryPointer.new(:char, size.read(:size_t)) err = ::Kafka::FFI.rd_kafka_conf_get(self, key, value, size) if err != :ok return err end value.read_string ensure size.free if size value.free if value end |
#set(key, value) ⇒ Object
Set the config option at key
to value
. The configuration options
match those used by librdkafka (and the Java client).
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/kafka/ffi/config.rb', line 30 def set(key, value) key = key.to_s value = value.to_s error = ::FFI::MemoryPointer.new(:char, 512) result = ::Kafka::FFI.rd_kafka_conf_set(self, key, value, error, error.size) # See config_result enum in ffi.rb case result when :ok nil when :unknown raise Kafka::FFI::UnknownConfigKey.new(key, value, error.read_string) when :invalid raise Kafka::FFI::InvalidConfigValue.new(key, value, error.read_string) end ensure error.free if error end |
#set_background_event_cb {|client, event, opaque| ... } ⇒ Object Also known as: background_event_cb=
The application is responsible for calling #destroy on the event.
The application must not call #destroy on the Client inside the callback.
Set the callback that will be used for events published to the background queue. This enables a background thread that runs internal to librdkafka and can be used as a standard receiver for APIs that take a queue.
153 154 155 156 |
# File 'lib/kafka/ffi/config.rb', line 153 def set_background_event_cb(&block) @callbacks[:background_event_cb] = block ::Kafka::FFI.rd_kafka_conf_set_background_event_cb(self, &block) end |
#set_closesocket_cb(&block) ⇒ Object Also known as: closesocket_cb=
316 317 318 319 |
# File 'lib/kafka/ffi/config.rb', line 316 def set_closesocket_cb(&block) @callbacks[:closesocket_cb] = block ::Kafka::FFI.rd_kafka_conf_set_closesocket_cb(self, &block) end |
#set_connect_cb(&block) ⇒ Object Also known as: connect_cb=
310 311 312 313 |
# File 'lib/kafka/ffi/config.rb', line 310 def set_connect_cb(&block) @callbacks[:connect_cb] = block ::Kafka::FFI.rd_kafka_conf_set_connect_cb(self, &block) end |
#set_consume_cb {|message, opaque| ... } ⇒ Object Also known as: consume_cb=
Consumer only
Set consume callback for use with consumer_poll.
186 187 188 189 |
# File 'lib/kafka/ffi/config.rb', line 186 def set_consume_cb(&block) @callbacks[:consume_cb] = block ::Kafka::FFI.rd_kafka_conf_set_consume_cb(self, &block) end |
#set_dr_msg_cb {|client, message, opaque| ... } ⇒ Object Also known as: dr_msg_cb=
Producer only
Set delivery report callback for the config. The delivery report callback will be called once for each message accepted by Producer#produce. The Message will have #error set in the event of a producer error.
The callback is called when a message is successfully produced or if librdkafka encountered a permanent failure.
173 174 175 176 |
# File 'lib/kafka/ffi/config.rb', line 173 def set_dr_msg_cb(&block) @callbacks[:dr_msg_cb] = block ::Kafka::FFI.rd_kafka_conf_set_dr_msg_cb(self, &block) end |
#set_error_cb {|client, error, reason, opaque| ... } ⇒ Object Also known as: error_cb=
Set error callback that is used by librdkafka to signal warnings and errors back to the application. These errors should generally be considered informational and non-permanent, librdkafka will try to recover from all types of errors.
243 244 245 246 |
# File 'lib/kafka/ffi/config.rb', line 243 def set_error_cb(&block) @callbacks[:error_cb] = block ::Kafka::FFI.rd_kafka_conf_set_error_cb(self, &block) end |
#set_events(events_mask) ⇒ Object
Enable event sourcing. Convenience method to set the enabled_events
option as an integer.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/kafka/ffi/config.rb', line 118 def set_events(events_mask) mask = events_mask # Support setting events if events_mask.is_a?(Array) mask = 0 enum = ::Kafka::FFI.enum_type(:event_type) events_mask.each do |val| case val when Integer then mask |= val when Symbol then mask |= (enum[val] || 0) end end end ::Kafka::FFI.rd_kafka_conf_set_events(self, mask) end |
#set_log_cb {|client, level, facility, message| ... } ⇒ Object Also known as: log_cb=
The application MUST NOT call any librdkafka APIs or do any prolonged work in a log_cb unless logs have been forwarded to a queue via set_log_queue.
Set the logging callback. By default librdkafka will print to stderr (or syslog if configured).
276 277 278 279 |
# File 'lib/kafka/ffi/config.rb', line 276 def set_log_cb(&block) @callbacks[:log_cb] = block ::Kafka::FFI.rd_kafka_conf_set_log_cb(self, &block) end |
#set_oauthbearer_token_refresh_cb(&block) ⇒ Object Also known as: oauthbearer_token_refresh_cb=
298 299 300 301 |
# File 'lib/kafka/ffi/config.rb', line 298 def set_oauthbearer_token_refresh_cb(&block) @callbacks[:oauthbearer_token_refresh_cb] = block ::Kafka::FFI.rd_kafka_conf_set_oauthbearer_token_refresh_cb(self, &block) end |
#set_offset_commit_cb {|client, error, offets| ... } ⇒ Object Also known as: offset_commit_cb=
Consumer only
Set offset commit callback which is called when offsets are committed by the consumer.
226 227 228 229 |
# File 'lib/kafka/ffi/config.rb', line 226 def set_offset_commit_cb(&block) @callbacks[:offset_commit_cb] = block ::Kafka::FFI.rd_kafka_conf_set_offset_commit_cb(self, &block) end |
#set_open_cb(&block) ⇒ Object Also known as: open_cb=
322 323 324 325 326 327 328 329 |
# File 'lib/kafka/ffi/config.rb', line 322 def set_open_cb(&block) if ::FFI::Platform.windows? raise Error, "set_open_cb is not available on Windows" end @callbacks[:open_cb] = block ::Kafka::FFI.rd_kafka_conf_set_open_cb(self, &block) end |
#set_rebalance_cb {|client, error, partitions, opaque| ... } ⇒ Object Also known as: rebalance_cb=
Consumer only
Set rebalance callback for use with consumer group balancing. Setting the rebalance callback will turn off librdkafka's automatic handling of assignment/revocation and delegates the responsibility to the application's callback.
209 210 211 212 |
# File 'lib/kafka/ffi/config.rb', line 209 def set_rebalance_cb(&block) @callbacks[:rebalance_cb] = block ::Kafka::FFI.rd_kafka_conf_set_rebalance_cb(self, &block) end |
#set_socket_cb(&block) ⇒ Object Also known as: socket_cb=
304 305 306 307 |
# File 'lib/kafka/ffi/config.rb', line 304 def set_socket_cb(&block) @callbacks[:socket_cb] = block ::Kafka::FFI.rd_kafka_conf_set_socket_cb(self, &block) end |
#set_ssl_cert(cert_type, cert_enc, certificate) ⇒ Object Also known as: ssl_cert=
The private key may require a password which must be specified with
the ssl.key.password
property prior to calling this function.
Private and public keys, in PEM format, can be set with the
ssl.key.pem
and ssl.certificate.pem
configuration properties.
Set the certificate for secure communication with the Kafka cluster.
355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
# File 'lib/kafka/ffi/config.rb', line 355 def set_ssl_cert(cert_type, cert_enc, certificate) error = ::MemoryPointer.new(:char, 512) err = ::Kafka::FFI.rd_kafka_conf_set_ssl_cert(cert_type, cert_enc, certificate, certificate.bytesize, error, error.size) if err != :ok # Property name isn't exact since this appears to have some routing # based on cert type to determine the exact key. raise ConfigError, "ssl_cert", error.read_string end nil ensure error.free end |
#set_ssl_cert_verify_cb(&block) ⇒ Object Also known as: ssl_cert_verify_cb=
332 333 334 335 |
# File 'lib/kafka/ffi/config.rb', line 332 def set_ssl_cert_verify_cb(&block) @callbacks[:ssl_cert_verify_cb] = block ::Kafka::FFI.rd_kafka_conf_set_ssl_cert_verify_cb(self, &block) end |
#set_stats_cb {|client, json, json_len, opaque| ... } ⇒ Object Also known as: stats_cb=
Set statistics callback that is triggered every statistics.interval.ms
with a JSON document containing connection statistics.
292 293 294 295 |
# File 'lib/kafka/ffi/config.rb', line 292 def set_stats_cb(&block) @callbacks[:stats_cb] = block ::Kafka::FFI.rd_kafka_conf_set_stats_cb(self, &block) end |
#set_throttle_cb {|client, broker_name, broker_id, throttle_ms, opaque| ... } ⇒ Object Also known as: throttle_cb=
Set throttle callback that is used to forward broker throttle times to the application.
258 259 260 261 |
# File 'lib/kafka/ffi/config.rb', line 258 def set_throttle_cb(&block) @callbacks[:throttle_cb] = block ::Kafka::FFI.rd_kafka_conf_set_throttle_cb(self, &block) end |