Class: Kafka::FFI::Message
- Inherits:
-
FFI::Struct
- Object
- FFI::Struct
- Kafka::FFI::Message
- Defined in:
- lib/kafka/ffi/message.rb
Defined Under Namespace
Classes: Header
Instance Method Summary collapse
-
#destroy ⇒ Object
Frees resources used by the messages and hands ownership by to librdkafka.
-
#detach_headers ⇒ nil, Message::Headers
Get the Message's headers and detach them from the Message (setting its headers to nil).
-
#error ⇒ nil, Kafka::ResponseError
Retrieve the error associated with this message.
-
#headers ⇒ nil, Message::Headers
Get the message header list.
-
#key ⇒ nil, String
Returns the optional message key used to publish the message.
-
#latency ⇒ Integer?
Retrieve the latency since the Message was published to Kafka.
-
#offset ⇒ Integer, RD_KAFKA_OFFSET_INVALID
Returns the message's offset as published in the topic's partition.
-
#opaque ⇒ nil, FFI::Pointer
Returns the per message opaque pointer that was given to produce.
-
#partition ⇒ Integer
Returns the partition the message was published to.
-
#payload ⇒ String
Returns the message's payload.
-
#set_headers(headers) ⇒ Object
(also: #headers=)
Replace the Message's headers with a new set.
-
#timestamp ⇒ Integer?
Retrieve the timestamp for a consumed message.
-
#topic ⇒ nil, String
Returns the name of the Topic the Message was published to.
Instance Method Details
#destroy ⇒ Object
Frees resources used by the messages and hands ownership by to librdkafka. The application should call destroy when done processing the message.
199 200 201 202 203 |
# File 'lib/kafka/ffi/message.rb', line 199 def destroy if !null? ::Kafka::FFI.rd_kafka_message_destroy(self) end end |
#detach_headers ⇒ nil, Message::Headers
Get the Message's headers and detach them from the Message (setting its headers to nil). Calling detach_headers means the applicaiton is now the owner of the returned Message::Header and must eventually call #destroy when the application is done with them.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/kafka/ffi/message.rb', line 140 def detach_headers ptr = ::FFI::MemoryPointer.new(:pointer) err = ::Kafka::FFI.rd_kafka_message_detach_headers(self, ptr) case err when :ok if ptr.null? nil else Message::Headers.new(ptr) end when RD_KAFKA_RESP_ERR__NOENT # Messages does not have headers nil else raise ::Kafka::ResponseError, err end ensure ptr.free end |
#error ⇒ nil, Kafka::ResponseError
Retrieve the error associated with this message. For consumers this is used to report per-topic+partition consumer errors. For producers this is set when received in the dr_msg_cb callback to signify a fatal error publishing the message.
26 27 28 29 30 |
# File 'lib/kafka/ffi/message.rb', line 26 def error if self[:err] != :ok ::Kafka::ResponseError.new(self[:err]) end end |
#headers ⇒ nil, Message::Headers
Get the message header list
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/kafka/ffi/message.rb', line 110 def headers ptr = ::FFI::MemoryPointer.new(:pointer) err = ::Kafka::FFI.rd_kafka_message_headers(self, ptr) case err when :ok if ptr.null? nil else Message::Headers.new(ptr) end when RD_KAFKA_RESP_ERR__NOENT # Messages does not have headers nil else raise ::Kafka::ResponseError, err end ensure ptr.free end |
#key ⇒ nil, String
Returns the optional message key used to publish the message. This key is
used for partition assignment based on the partitioner
or
partitioner_cb
config options.
50 51 52 53 54 55 56 |
# File 'lib/kafka/ffi/message.rb', line 50 def key if self[:key].null? return nil end self[:key].read_string(self[:key_len]) end |
#latency ⇒ Integer?
Retrieve the latency since the Message was published to Kafka.
191 192 193 194 |
# File 'lib/kafka/ffi/message.rb', line 191 def latency latency = ::Kafka::FFI.rd_kafka_message_latency(self) latency == -1 ? nil : latency end |
#offset ⇒ Integer, RD_KAFKA_OFFSET_INVALID
Returns the message's offset as published in the topic's partition. When error != nil, offset the error occurred at.
71 72 73 |
# File 'lib/kafka/ffi/message.rb', line 71 def offset self[:offset] end |
#opaque ⇒ nil, FFI::Pointer
Using the opaque is dangerous and requires that the application maintain a reference to the object passed to produce. Failing to do so will cause segfaults due to the object having been garbage collected.
Returns the per message opaque pointer that was given to produce. This is a pointer to a Ruby object owned by the application.
88 89 90 |
# File 'lib/kafka/ffi/message.rb', line 88 def opaque self[:private] end |
#partition ⇒ Integer
Returns the partition the message was published to.
61 62 63 |
# File 'lib/kafka/ffi/message.rb', line 61 def partition self[:partition] end |
#payload ⇒ String
Returns the message's payload. When error != nil, will contain a string describing the error.
96 97 98 99 100 101 102 |
# File 'lib/kafka/ffi/message.rb', line 96 def payload if self[:payload].null? return nil end self[:payload].read_string(self[:len]) end |
#set_headers(headers) ⇒ Object Also known as: headers=
The Message takes ownership of the headers and they will be destroyed automatically with the Message.
Replace the Message's headers with a new set.
167 168 169 |
# File 'lib/kafka/ffi/message.rb', line 167 def set_headers(headers) ::Kafka::FFI.rd_kafka_set_headers(self, headers) end |
#timestamp ⇒ Integer?
Retrieve the timestamp for a consumed message.
181 182 183 184 185 |
# File 'lib/kafka/ffi/message.rb', line 181 def timestamp # @todo: Type (second param) [rd_kafka_timestamp_type_t enum] ts = ::Kafka::FFI.rd_kafka_message_timestamp(self, nil) ts == -1 ? nil : ts end |
#topic ⇒ nil, String
Returns the name of the Topic the Message was published to.
36 37 38 39 40 41 42 |
# File 'lib/kafka/ffi/message.rb', line 36 def topic if self[:rkt].nil? return nil end self[:rkt].name end |