diff --git a/lib/fluent/plugin/kafka_producer_ext.rb b/lib/fluent/plugin/kafka_producer_ext.rb index c58fccb..e1b70a2 100644 --- a/lib/fluent/plugin/kafka_producer_ext.rb +++ b/lib/fluent/plugin/kafka_producer_ext.rb @@ -93,6 +93,10 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor @partitioner = partitioner + + # The set of topics that are produced to. + @target_topics = Set.new + # A buffer organized by topic/partition. @buffer = MessageBuffer.new @@ -116,7 +120,8 @@ def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_ if @transaction_manager.transactional? && !@transaction_manager.in_transaction? raise 'You must trigger begin_transaction before producing messages' end - + + @target_topics.add(topic) @pending_message_queue.write(message) nil @@ -187,7 +192,7 @@ def transaction def deliver_messages_with_retries attempt = 0 - #@cluster.add_target_topics(@target_topics) + @cluster.add_target_topics(@target_topics) operation = ProduceOperation.new( cluster: @cluster,