Skip to content

Commit

Permalink
Merge pull request #505 from ttych/rdkafka-update
Browse files Browse the repository at this point in the history
out_rdkafka2: add patch for new version of rdkafka
  • Loading branch information
ashie authored Jul 31, 2024
2 parents 27efbd0 + 8a07392 commit b6b50a0
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 43 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ jobs:
- ubuntu-latest
rdkafka_versions:
- { min: '>= 0.6.0', max: '< 0.12.0' }
- { min: '>= 0.12.0', max: '>= 0.12.0' }
- { min: '>= 0.12.0', max: '< 0.14.0' }
- { min: '>= 0.14.0', max: '< 0.16.0' }
- { min: '>= 0.16.0', max: '>= 0.16.0' }
bundler_version:
- '2.5.16'
# rdkafka 0.15.2 is the last version which supports Ruby 2.7
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ You need to install rdkafka gem.
partition_key (string) :default => 'partition'
partition_key_key (string) :default => 'partition_key'
message_key_key (string) :default => 'message_key'
default_topic (string) :default => nil
use_default_for_unknown_topic (bool) :default => false
use_default_for_unknown_partition_error (bool) :default => false
default_partition_key (string) :default => nil
Expand Down
53 changes: 12 additions & 41 deletions lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,19 @@

require 'rdkafka'

# This is required for `rdkafka` version >= 0.12.0
# Overriding the close method in order to provide a time limit for when it should be forcibly closed
class Rdkafka::Producer::Client
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil)
return unless @native

# Indicate to polling thread that we're closing
@polling_thread[:closing] = true
# Wait for the polling thread to finish up
thread = @polling_thread.join(timeout)

Rdkafka::Bindings.rd_kafka_destroy(@native)

@native = nil

return !thread.nil?
end
end

class Rdkafka::Producer
# return false if producer is forcefully closed, otherwise return true
def close(timeout = nil)
rdkafka_version = Rdkafka::VERSION || '0.0.0'
# Rdkafka version >= 0.12.0 changed its internals
if Gem::Version::create(rdkafka_version) >= Gem::Version.create('0.12.0')
ObjectSpace.undefine_finalizer(self)

return @client.close(timeout)
end

@closing = true
# Wait for the polling thread to finish up
# If the broker isn't alive, the thread doesn't exit
if timeout
thr = @polling_thread.join(timeout)
return !!thr
else
@polling_thread.join
return true
end
begin
rdkafka_version = Gem::Version::create(Rdkafka::VERSION)
if rdkafka_version < Gem::Version.create('0.12.0')
require_relative 'rdkafka_patch/0_11_0'
elsif rdkafka_version == Gem::Version.create('0.12.0')
require_relative 'rdkafka_patch/0_12_0'
elsif rdkafka_version >= Gem::Version.create('0.14.0')
require_relative 'rdkafka_patch/0_14_0'
elsif rdkafka_version >= Gem::Version.create('0.16.0')
require_relative 'rdkafka_patch/0_16_0'
end
rescue LoadError, NameError
raise "unable to patch rdkafka."
end

module Fluent::Plugin
Expand Down
15 changes: 15 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_11_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class Rdkafka::Producer
# return false if producer is forcefully closed, otherwise return true
def close(timeout = nil)
@closing = true
# Wait for the polling thread to finish up
# If the broker isn't alive, the thread doesn't exit
if timeout
thr = @polling_thread.join(timeout)
return !!thr
else
@polling_thread.join
return true
end
end
end
27 changes: 27 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_12_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# This is required for `rdkafka` version >= 0.12.0
# Overriding the close method in order to provide a time limit for when it should be forcibly closed
class Rdkafka::Producer::Client
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil)
return unless @native

# Indicate to polling thread that we're closing
@polling_thread[:closing] = true
# Wait for the polling thread to finish up
thread = @polling_thread.join(timeout)

Rdkafka::Bindings.rd_kafka_destroy(@native)

@native = nil

return !thread.nil?
end
end

class Rdkafka::Producer
def close(timeout = nil)
ObjectSpace.undefine_finalizer(self)

return @client.close(timeout)
end
end
44 changes: 44 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_14_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
class Rdkafka::NativeKafka
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil, object_id=nil)
return true if closed?

synchronize do
# Indicate to the outside world that we are closing
@closing = true

thread_status = :unknown
if @polling_thread
# Indicate to polling thread that we're closing
@polling_thread[:closing] = true

# Wait for the polling thread to finish up,
# this can be aborted in practice if this
# code runs from a finalizer.
thread_status = @polling_thread.join(timeout)
end

# Destroy the client after locking both mutexes
@poll_mutex.lock

# This check prevents a race condition, where we would enter the close in two threads
# and after unlocking the primary one that hold the lock but finished, ours would be unlocked
# and would continue to run, trying to destroy inner twice
if @inner
Rdkafka::Bindings.rd_kafka_destroy(@inner)
@inner = nil
@opaque = nil
end

!thread_status.nil?
end
end
end

class Rdkafka::Producer
def close(timeout = nil)
return true if closed?
ObjectSpace.undefine_finalizer(self)
@native_kafka.close(timeout)
end
end
55 changes: 55 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_16_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
class Rdkafka::NativeKafka
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil, object_id=nil)
return true if closed?

synchronize do
# Indicate to the outside world that we are closing
@closing = true

thread_status = :unknown
if @polling_thread
# Indicate to polling thread that we're closing
@polling_thread[:closing] = true

# Wait for the polling thread to finish up,
# this can be aborted in practice if this
# code runs from a finalizer.
thread_status = @polling_thread.join(timeout)
end

# Destroy the client after locking both mutexes
@poll_mutex.lock

# This check prevents a race condition, where we would enter the close in two threads
# and after unlocking the primary one that hold the lock but finished, ours would be unlocked
# and would continue to run, trying to destroy inner twice
retun unless @inner

Rdkafka::Bindings.rd_kafka_destroy(@inner)
@inner = nil
@opaque = nil

!thread_status.nil?
end
end
end

class Rdkafka::Producer
def close(timeout = nil)
return true if closed?
ObjectSpace.undefine_finalizer(self)

@native_kafka.close(timeout) do
# We need to remove the topics references objects before we destroy the producer,
# otherwise they would leak out
@topics_refs_map.each_value do |refs|
refs.each_value do |ref|
Rdkafka::Bindings.rd_kafka_topic_destroy(ref)
end
end
end

@topics_refs_map.clear
end
end

0 comments on commit b6b50a0

Please sign in to comment.