Class: Kafka::Config
- Inherits:
-
Object
- Object
- Kafka::Config
- Defined in:
- lib/kafka/config.rb
Instance Method Summary collapse
-
#get(key) ⇒ nil
Retrieve the configured value for the key.
-
#initialize(opts = {}) ⇒ Config
constructor
Create a new Config for initializing a Kafka Consumer or Producer.
- #on_consume(&block) ⇒ Object
-
#on_delivery_report(&block) ⇒ Object
Callback for the delivery status of a message published to the Kafka cluster.
-
#on_error(&block) ⇒ Object
Callback for errors from the cluster.
-
#on_log(&block) ⇒ Object
Callback for log messages.
-
#on_offset_commit(&block) ⇒ Object
Callback for result of automatic or manual offset commits.
-
#on_stats(&block) ⇒ Object
Callback for connetion stats.
-
#on_throttle(&block) ⇒ Object
Callback for when Brokers throttle a client.
-
#set(key, val) ⇒ Object
Set configratuon option
key
tovalue
. -
#to_ffi ⇒ Kafka::FFI::Config
Allocate and configure a new Kafka::FFI::Config that mirrors this Config.
Constructor Details
#initialize(opts = {}) ⇒ Config
Create a new Config for initializing a Kafka Consumer or Producer. This config is reusable and can be used to configure multiple Consumers or Producers.
14 15 16 17 18 19 20 21 22 |
# File 'lib/kafka/config.rb', line 14 def initialize(opts = {}) @opts = {} @callbacks = {} # Use #set to rekey the options as strings and type check the value. opts.each_pair do |key, val| set(key, val) end end |
Instance Method Details
#get(key) ⇒ nil
Retrieve the configured value for the key.
28 29 30 |
# File 'lib/kafka/config.rb', line 28 def get(key) @opts[key.to_s] end |
#on_consume(&block) ⇒ Object
Consumer only
67 68 69 |
# File 'lib/kafka/config.rb', line 67 def on_consume(&block) @callbacks[:consume] = block end |
#on_delivery_report(&block) ⇒ Object
Producer only
Callback for the delivery status of a message published to the Kafka cluster.
60 61 62 |
# File 'lib/kafka/config.rb', line 60 def on_delivery_report(&block) @callbacks[:delivery_report] = block end |
#on_error(&block) ⇒ Object
Callback for errors from the cluster. Most errors are informational and should be ignored as librdkafka will attempt to recover. However fatal errors can be reported which should cause the system to gracefully shutdown.
86 87 88 |
# File 'lib/kafka/config.rb', line 86 def on_error(&block) @callbacks[:error] = block end |
#on_log(&block) ⇒ Object
Callback for log messages
100 101 102 |
# File 'lib/kafka/config.rb', line 100 def on_log(&block) @callbacks[:log] = block end |
#on_offset_commit(&block) ⇒ Object
Consumer only
Callback for result of automatic or manual offset commits.
76 77 78 |
# File 'lib/kafka/config.rb', line 76 def on_offset_commit(&block) @callbacks[:offset_commit] = block end |
#on_stats(&block) ⇒ Object
Callback for connetion stats
107 108 109 |
# File 'lib/kafka/config.rb', line 107 def on_stats(&block) @callbacks[:stats] = block end |
#on_throttle(&block) ⇒ Object
Callback for when Brokers throttle a client
93 94 95 |
# File 'lib/kafka/config.rb', line 93 def on_throttle(&block) @callbacks[:throttle] = block end |
#set(key, val) ⇒ Object
Set configratuon option key
to value
.
40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/kafka/config.rb', line 40 def set(key, val) key = key.to_s @opts[key] = case val when String, Integer, true, false, nil val else raise TypeError, "#{key}'s value must be a String, Integer, true, or false" end nil end |
#to_ffi ⇒ Kafka::FFI::Config
Allocate and configure a new Kafka::FFI::Config that mirrors this Config. The returned Kafka::FFI::Config should be either passed to initialize a new Client or eventually destroyed. Once passed to a Client, the Config is now owned by the Client and should not be modified or destroyed.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/kafka/config.rb', line 117 def to_ffi conf = Kafka::FFI.rd_kafka_conf_new @opts.each do |name, value| conf.set(name, value) end # Omitted callbacks: # - background_event - Requires lower level usage # - rebalance - Requires knowing the rebalance semantics # - all socket - Unknown need at this level # - ssl_cert_verify - Currently not needed # - oauthbearer_token_refresh - Unable to test @callbacks.each do |name, callback| case name when :delivery_report then conf.set_dr_msg_cb(&callback) when :consume then conf.set_consume_cb(&callback) when :offset_commit then conf.set_offset_commit_cb(&callback) when :error then conf.set_error_cb(&callback) when :throttle then conf.set_throttle_cb(&callback) when :log then conf.set_log_cb(&callback) when :stats then conf.set_stats_cb(&callback) end end conf end |