Class: Kafka::FFI::Event
- Inherits:
-
OpaquePointer
- Object
- OpaquePointer
- Kafka::FFI::Event
- Defined in:
- lib/kafka/ffi/event.rb
Defined Under Namespace
Classes: LogMessage
Instance Attribute Summary
Attributes inherited from OpaquePointer
Instance Method Summary collapse
-
#config_string ⇒ String?
Returns the configuration for the event or nil if the configuration property is not set or not applicable for the event type.
-
#destroy ⇒ Object
Destroy the event, releasing it's resources back to the system.
-
#error ⇒ nil, Kafka::ResponseError
Returns the error code for the event or nil if there was no error.
-
#error_is_fatal ⇒ Boolean
(also: #error_is_fatal?)
Returns true or false if the Event represents a fatal error.
-
#error_string ⇒ nil, String
Returns a description of the error or nil when there is no error.
-
#log ⇒ Event::LogMessage
Returns the log message attached to the event.
-
#messages {|message| ... } ⇒ Array<Message>?
Retrieve the set of messages.
-
#name ⇒ String
Returns the name of the event's type.
-
#stats ⇒ nil, String
Extracts stats from the event.
-
#topic_partition ⇒ TopicPartition
Returns the topic partition from the Event.
-
#topic_partition_list ⇒ TopicPartitionList
Returns the topic partition list from the Event.
-
#type ⇒ Symbol
Returns the event's type.
Methods inherited from OpaquePointer
by_ref, from_native, inherited, #initialize, to_native
Constructor Details
This class inherits a constructor from Kafka::FFI::OpaquePointer
Instance Method Details
#config_string ⇒ String?
Returns the configuration for the event or nil if the configuration property is not set or not applicable for the event type.
Events:
- RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH
101 102 103 |
# File 'lib/kafka/ffi/event.rb', line 101 def config_string ::Kafka::FFI.rd_kafka_event_string(self) end |
#destroy ⇒ Object
Is the applicaiton responsible for calling #destroy?
Destroy the event, releasing it's resources back to the system.
207 208 209 210 211 212 213 |
# File 'lib/kafka/ffi/event.rb', line 207 def destroy # It is safe to call destroy even if the Event's pointer is NULL but it # doesn't do anything so might as well guard against it just in case. if !pointer.null? ::Kafka::FFI.rd_kafka_event_destroy(self) end end |
#error ⇒ nil, Kafka::ResponseError
Returns the error code for the event or nil if there was no error.
111 112 113 114 115 116 |
# File 'lib/kafka/ffi/event.rb', line 111 def error err = ::Kafka::FFI.rd_kafka_event_error(self) if err != :ok ::Kafka::ResponseError.new(err, error_string) end end |
#error_is_fatal ⇒ Boolean Also known as: error_is_fatal?
Returns true or false if the Event represents a fatal error.
129 130 131 |
# File 'lib/kafka/ffi/event.rb', line 129 def error_is_fatal error && ::Kafka::FFI.rd_kafka_event_error_is_fatal(self) end |
#error_string ⇒ nil, String
Returns a description of the error or nil when there is no error.
122 123 124 |
# File 'lib/kafka/ffi/event.rb', line 122 def error_string ::Kafka::FFI.rd_kafka_event_error_string(self) end |
#log ⇒ Event::LogMessage
Returns the log message attached to the event.
Events:
- RD_KAFKA_EVENT_LOG
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/kafka/ffi/event.rb', line 140 def log facility = ::FFI::MemoryPointer.new(:pointer) message = ::FFI::MemoryPointer.new(:pointer) level = ::FFI::MemoryPointer.new(:pointer) exists = ::Kafka::FFI.rd_kafka_event_log(self, facility, message, level) if exists != 0 # Event type does not support log messages. return nil end LogMessage.new( facility.read_pointer.read_string, message.read_pointer.read_string, level.read_int, ) ensure facility.free message.free level.free end |
#messages {|message| ... } ⇒ Array<Message>?
Do not call #destroy on the Messages
Retrieve the set of messages. Can take a block to iterate over the set of Messages rather than return them.
Events:
- RD_KAFKA_EVENT_FETCH
- RD_KAFKA_EVENT_DR
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/kafka/ffi/event.rb', line 48 def messages # This departs from the librdkafka API due to having a collection of # methods that have funky semantics for Ruby. # @todo Messages are only on RD_KAFKA_EVENT_FETCH and RD_KAFKA_EVENT_DR. # Need to test what happens with other event types. # No block so fetch all of the messages and return them as an array. if !block_given? count = ::Kafka::FFI.rd_kafka_event_message_count(self) if count == 0 return [] end begin # Allocates enough memory for the full set but only converts as many # as were returned. # @todo Retrieve all until sum(ret) == count? ptr = ::FFI::MemoryPointer.new(:pointer, count) ret = ::Kafka::FFI.rd_kafka_event_message_array(self, ptr, count) # Map the return pointers to Messages return ptr.read_array_of_pointer(ret).map! { |p| Message.from_native(p) } ensure ptr.free end end # Block was passed so use rd_kafka_event_message_next begin ptr = ::FFI::MemoryPointer.new(:pointer) loop do msg = ::Kafka::FFI.rd_kafka_event_message_next(self) if msg.null? break end yield(msg) end ensure ptr.free end end |
#name ⇒ String
Returns the name of the event's type.
30 31 32 |
# File 'lib/kafka/ffi/event.rb', line 30 def name ::Kafka::FFI.rd_kafka_event_name(self) end |
#stats ⇒ nil, String
Extracts stats from the event
Events:
- RD_KAFKA_EVENT_STATS
169 170 171 172 173 174 175 176 177 |
# File 'lib/kafka/ffi/event.rb', line 169 def stats # Calling stats on an unsupported type causes a segfault with librdkafka # 1.3.0. if type != :stats return nil end ::Kafka::FFI.rd_kafka_event_stats(self) end |
#topic_partition ⇒ TopicPartition
The application MUST call #destroy on the TopicPartition when done.
Returns the topic partition from the Event.
Events:
- RD_KAFKA_EVENT_ERROR
200 201 202 |
# File 'lib/kafka/ffi/event.rb', line 200 def topic_partition ::Kafka::FFI.rd_kafka_event_topic_partition(self) end |
#topic_partition_list ⇒ TopicPartitionList
Application MUST NOT call #destroy on the list
Returns the topic partition list from the Event.
Events:
- RD_KAFKA_EVENT_REBALANCE
- RD_KAFKA_EVENT_OFFSET_COMMIT
188 189 190 |
# File 'lib/kafka/ffi/event.rb', line 188 def topic_partition_list ::Kafka::FFI.rd_kafka_event_topic_partition_list(self) end |
#type ⇒ Symbol
Returns the event's type
23 24 25 |
# File 'lib/kafka/ffi/event.rb', line 23 def type ::Kafka::FFI.rd_kafka_event_type(self) end |