Class: Kafka::FFI::Producer
- Inherits:
-
Client
- Object
- OpaquePointer
- Client
- Kafka::FFI::Producer
- Defined in:
- lib/kafka/ffi/producer.rb
Instance Attribute Summary
Attributes inherited from OpaquePointer
Class Method Summary collapse
Instance Method Summary collapse
-
#flush(timeout: 1000) ⇒ Object
Wait until all outstanding produce requests are completed.
-
#produce(topic, payload, key: nil, partition: nil, headers: nil, timestamp: nil, opaque: nil) ⇒ Object
Produce and send a single message to the Kafka cluster.
-
#purge(queued: true, inflight: true, blocking: false) ⇒ Object
Purge messages currently handled by the Producer.
Methods inherited from Client
#alter_configs, #brokers_add, #cluster_id, #config, #controller_id, #create_partitions, #create_topics, #default_topic_conf_dup, #delete_topics, #describe_configs, #destroy, from_native, #get_background_queue, #get_main_queue, #group_list, #initialize, #metadata, #name, #offsets_for_times, #outq_len, #pause_partitions, #poll, #query_watermark_offsets, #resume_partitions, #set_log_queue, #topic
Methods inherited from OpaquePointer
by_ref, from_native, inherited, #initialize, to_native
Constructor Details
This class inherits a constructor from Kafka::FFI::Client
Class Method Details
.new(config = nil) ⇒ Object
8 9 10 |
# File 'lib/kafka/ffi/producer.rb', line 8 def self.new(config = nil) super(:producer, config) end |
Instance Method Details
#flush(timeout: 1000) ⇒ Object
Wait until all outstanding produce requests are completed. This should typically be done prior to destroying a producer to ensure all queued and in-flight requests are completed before terminating.
110 111 112 113 114 115 116 117 |
# File 'lib/kafka/ffi/producer.rb', line 110 def flush(timeout: 1000) err = ::Kafka::FFI.rd_kafka_flush(self, timeout) if err != :ok raise ::Kafka::ResponseError, err end nil end |
#produce(topic, payload, key: nil, partition: nil, headers: nil, timestamp: nil, opaque: nil) ⇒ Object
Produce and send a single message to the Kafka cluster.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/kafka/ffi/producer.rb', line 36 def produce(topic, payload, key: nil, partition: nil, headers: nil, timestamp: nil, opaque: nil) args = [ # Ensure librdkafka copies the payload into its own memory since the # string backing it could be garbage collected. :vtype, :msgflags, :int, Kafka::FFI::RD_KAFKA_MSG_F_COPY, ] if payload args.append(:vtype, :value, :buffer_in, payload, :size_t, payload.bytesize) end # The partitioning key is optional if key args.append(:vtype, :key, :buffer_in, key, :size_t, key.bytesize) end # Partition will default to being auto assigned by the configured # partitioning strategy. if partition args.append(:vtype, :partition, :int32, partition) end # Headers are optional and can be passed as either a reference to a # Header object or individual key/value pairs. This only supports the # Header object because supporting key + valu if headers args.append(:vtype, :headers, :pointer, headers.pointer) end case topic when Topic args.append(:vtype, :rkt, :pointer, topic.pointer) when String args.append(:vtype, :topic, :string, topic) else raise ArgumentError, "topic must be either a Topic or String" end if opaque args.append(:vtype, :opaque, :pointer, opaque.pointer) end if timestamp ts = case timestamp when Time then ((timestamp.to_i * 1000) + (timestamp.nsec / 1000)) when Integer then timestamp else raise ArgumentError, "timestamp must be nil, a Time, or an Integer" end args.append(:vtype, :timestamp, :int64, ts) end # Add the sentinel value to denote the end of the argument list. args.append(:vtype, :end) err = ::Kafka::FFI.rd_kafka_producev(self, *args) if err != :ok # The only documented error is RD_KAFKA_RESP_ERR__CONFLICT should both # HEADER and HEADERS keys be passed in. There is no way for HEADER to # be passed to producev based on the above implementation. raise ::Kafka::ResponseError, err end nil end |
#purge(queued: true, inflight: true, blocking: false) ⇒ Object
Purge messages currently handled by the Producer. By default this will purge all queued and inflight messages asyncronously.
130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/kafka/ffi/producer.rb', line 130 def purge(queued: true, inflight: true, blocking: false) mask = 0 mask |= RD_KAFKA_PURGE_F_QUEUE if queued mask |= RD_KAFKA_PURGE_F_INFLIGHT if inflight mask |= RD_KAFKA_PURGE_F_NON_BLOCKING if blocking err = ::Kafka::FFI.rd_kafka_purge(self, mask) if err != :ok raise ::Kafka::ResponseError, err end nil end |