Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle graceful shutdown of ConsumerWorkPool when canceling a consumer #437

Merged
merged 4 commits into from
Oct 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/bunny/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ def basic_cancel(consumer_tag)
@last_basic_cancel_ok = wait_on_continuations
end

maybe_kill_consumer_work_pool! unless any_consumers?
@work_pool.shutdown(true) unless any_consumers?

@last_basic_cancel_ok
end
Expand Down
17 changes: 15 additions & 2 deletions lib/bunny/consumer_work_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ class ConsumerWorkPool
attr_reader :size
attr_reader :abort_on_exception

def initialize(size = 1, abort_on_exception = false)
def initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60)
@size = size
@abort_on_exception = abort_on_exception
@shutdown_timeout = shutdown_timeout
@shutdown_mutex = ::Mutex.new
@shutdown_conditional = ::ConditionVariable.new
@queue = ::Queue.new
@paused = false
end
Expand Down Expand Up @@ -53,14 +56,20 @@ def busy?
[email protected]?
end

def shutdown
def shutdown(wait_for_workers = false)
@running = false

@size.times do
submit do |*args|
throw :terminate
end
end

return unless wait_for_workers && @shutdown_timeout

@shutdown_mutex.synchronize do
@shutdown_conditional.wait(@shutdown_mutex, @shutdown_timeout)
end
end

def join(timeout = nil)
Expand Down Expand Up @@ -102,6 +111,10 @@ def run_loop
end
end
end

@shutdown_mutex.synchronize do
@shutdown_conditional.signal unless busy?
end
end
end
end
4 changes: 2 additions & 2 deletions lib/bunny/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -337,14 +337,14 @@ def transport_write_timeout
# opened (this operation is very fast and inexpensive).
#
# @return [Bunny::Channel] Newly opened channel
def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false)
def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60)
raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n

@channel_mutex.synchronize do
if n && (ch = @channels[n])
ch
else
ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception))
ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout))
ch.open
ch
end
Expand Down
62 changes: 62 additions & 0 deletions spec/higher_level_api/integration/basic_cancel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,66 @@
expect(delivered_data).to be_empty
end
end

context "with a worker pool shutdown timeout configured" do
let(:queue_name) { "bunny.queues.#{rand}" }

it "processes the message if processing completes within the timeout" do
delivered_data = []
consumer = nil

t = Thread.new do
ch = connection.create_channel(nil, 1, false, 5)
q = ch.queue(queue_name, :auto_delete => true, :durable => false)

consumer = Bunny::Consumer.new(ch, q)
consumer.on_delivery do |_, _, payload|
sleep 2
delivered_data << payload
end

q.subscribe_with(consumer, :block => false)
end
t.abort_on_exception = true
sleep 1.0

ch = connection.create_channel
ch.default_exchange.publish("", :routing_key => queue_name)
sleep 0.7

consumer.cancel
sleep 1.0

expect(delivered_data).to_not be_empty
end

it "kills the consumer if processing takes longer than the timeout" do
delivered_data = []
consumer = nil

t = Thread.new do
ch = connection.create_channel(nil, 1, false, 1)
q = ch.queue(queue_name, :auto_delete => true, :durable => false)

consumer = Bunny::Consumer.new(ch, q)
consumer.on_delivery do |_, _, payload|
sleep 3
delivered_data << payload
end

q.subscribe_with(consumer, :block => false)
end
t.abort_on_exception = true
sleep 1.0

ch = connection.create_channel
ch.default_exchange.publish("", :routing_key => queue_name)
sleep 0.7

consumer.cancel
sleep 1.0

expect(delivered_data).to be_empty
end
end
end