Skip to content

Commit

Permalink
out_kafka2: Broker pool to take care of fetching metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Dipendra Singh <[email protected]>
  • Loading branch information
dipendra-singh authored and kenhys committed Jul 31, 2024
1 parent 2dece8b commit b0f3a10
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions lib/fluent/plugin/kafka_producer_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso
@max_buffer_bytesize = max_buffer_bytesize
@compressor = compressor
@partitioner = partitioner

# The set of topics that are produced to.
@target_topics = Set.new

# A buffer organized by topic/partition.
@buffer = MessageBuffer.new

Expand All @@ -116,7 +120,8 @@ def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_
if @transaction_manager.transactional? && !@transaction_manager.in_transaction?
raise 'You must trigger begin_transaction before producing messages'
end


@target_topics.add(topic)
@pending_message_queue.write(message)

nil
Expand Down Expand Up @@ -187,7 +192,7 @@ def transaction
def deliver_messages_with_retries
attempt = 0

#@cluster.add_target_topics(@target_topics)
@cluster.add_target_topics(@target_topics)

operation = ProduceOperation.new(
cluster: @cluster,
Expand Down

0 comments on commit b0f3a10

Please sign in to comment.