-
Notifications
You must be signed in to change notification settings - Fork 303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for reliable shutdowns of ConsumerWorkPool #341
Conversation
…iable. The shutdown uses a combination of a Mutex and a ConditionalVariable with a sane default timeout (10s). One can optionally specify a per-channel "shutdown timeout" to configure this. The reordering of statements in the channel.close method ensures a shutdown of executing tasks (and perhaps forcefully killing the pool) before killing the connection , therby eliminating indeterministic state of the shutdown of consumer procs int the worker thread pool
@dhananjaysathe CI failures seem directly related. Please investigate. Also, please adhere to the existing code style, e.g. we have a = 1 and not a=1 |
@@ -165,7 +165,7 @@ class Channel | |||
# @param [Bunny::Session] connection AMQP 0.9.1 connection | |||
# @param [Integer] id Channel id, pass nil to make Bunny automatically allocate it | |||
# @param [Bunny::ConsumerWorkPool] work_pool Thread pool for delivery processing, by default of size 1 | |||
def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1)) | |||
def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1,10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry to nitpick but please add a space between arguments, like we do everywhere else
@dhananjaysathe so there are tests that rapidly open and close connections and channels. This change makes them effectively time out, and using one-off channels a lot more time consuming. I suggest that we change the default to what it was before (no grace period), document this new value and see if we can reduce the wait later. FWIW, this is the first time this issue is brought up in years of Bunny's multi-year history. |
@dhananjaysathe I'm happy to include this feature if we can keep the default for some time (and update the tests). |
Was anything like this ever implemented that I cannot find? I would love to initiate a clean shutdown where the consumers would stop pulling new messages and finish any existing work within a configured timeout. |
No, it's still not done.
|
I am working on it now :)! I have it working so far (it required some differences from the original PR as I found the original did not actually work for me as handling of basic_cancel called kill and never actually called shutdown on the worker pool). I basically am doing something like this: in consumer_work_pool, shutdown becomes shutdown(wait_for_workers = false) Then in the channel when we handle basic_cancel instead of calling kill on the worker pool I call worker_pool.shutdown(true), the timeout is configurable when creating the channel and it defaults to NO timeout, and all other paths that shut down the worker pool like closing the connection do not wait for the workers to finish (which makes sense as the workers cannot ack if the connection is closed). Just want to write some specs around it and will get a PR across first thing tomorrow morning. |
Sounds reasonable. Thank you very much! On 7 October 2016 at 09:08:36, Stefan Sedich ([email protected]) wrote:
MK Staff Software Engineer, Pivotal/RabbitMQ |
Make the shutdown mechanism for the default ConsumerWorkPool more reliable. The shutdown uses a combination of a Mutex and a ConditionalVariable with a sane default timeout (10s). One can optionally specify a per-channel "shutdown timeout" to configure this. The reordering of statements in the channel.close method ensures a shutdown of executing tasks (and perhaps forcefully killing the pool) before killing the connection , thereby eliminating in-deterministic state of the shutdown of consumer procs int the worker thread pool
@michaelklishin based on our earlier discussions this seems like a reliable way to achieve the same effect without relying on self-pipes, I used the ConditionalVariable primitive and mutex to achieve the same effect as the continuation_queue(which seemed like an overkill). This is the optimal solution IMHO