-
Notifications
You must be signed in to change notification settings - Fork 303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't reconfirm already acked messages when multiple is true #617
Conversation
Don't reconfirm already acked messages when multiple is true. Fixes ruby-amqp#615
Thank you for the thorough PR. Note that you can add benchmarks under My test runs on a 4-year old CPU suggest that once the number of messages published goes above 20K or so, the new implementation begins significantly outperforming. In general, this implementation should be acceptable So I'd be happy to merge once you remove the temporary tests (or move one of them to |
Cleaned up temp specs
I deleted those specs, they've been created only for this PR, to show difference in performance. Thanks for your time and quick response! |
PR ruby-amqp#617 introduced an optimization for multi-message acks but failed to protect @unconfirmed_set. In edge cases #basic_publish would attempt to modify it during iteration with Enumerable#min: <ruby>/lib/ruby/3.1.0/set.rb:522:in `add': can't add a new key into hash during iteration (RuntimeError) from <bunny>/lib/bunny/channel.rb:555:in `block in basic_publish' from <bunny>/lib/bunny/channel.rb:554:in `synchronize' from <bunny>/lib/bunny/channel.rb:554:in `basic_publish' from <bunny>/lib/bunny/exchange.rb:141:in `publish'
PR ruby-amqp#617 introduced an optimization for multi-message acks but failed to protect @unconfirmed_set. In edge cases #basic_publish would attempt to modify it during iteration with Enumerable#min: <ruby>/lib/ruby/3.1.0/set.rb:522:in `add': can't add a new key into hash during iteration (RuntimeError) from <bunny>/lib/bunny/channel.rb:555:in `block in basic_publish' from <bunny>/lib/bunny/channel.rb:554:in `synchronize' from <bunny>/lib/bunny/channel.rb:554:in `basic_publish' from <bunny>/lib/bunny/exchange.rb:141:in `publish'
PR ruby-amqp#617 introduced an optimization for multi-message acks but failed to protect @unconfirmed_set. In edge cases #basic_publish would attempt to modify it during iteration with Enumerable#min: <ruby>/lib/ruby/3.1.0/set.rb:522:in `add': can't add a new key into hash during iteration (RuntimeError) from <bunny>/lib/bunny/channel.rb:555:in `block in basic_publish' from <bunny>/lib/bunny/channel.rb:554:in `synchronize' from <bunny>/lib/bunny/channel.rb:554:in `basic_publish' from <bunny>/lib/bunny/exchange.rb:141:in `publish'
In current implementation library acks all tags starting from tag 1 (plus channel recovery offset) every time it gets
multiple
flag regardless of previously acknowledged tags. So whenmultiple
flag occured ex. at delivery_tag 1000 (after 990 aked), we receiveconfirm_select callback
1000 times. And we receive this call callback again and again (hudren thousands times) while processing just 16k messages.After some research of other language implementation (java and python) we made a little fix to resolve this issue. RabbitMQ protocol extension strgongly enforces sequenced delivery_tag generation on publishing. There is
@unconfirmed_set
filled with not-yet-acked tags (previously acked tags already removed from it) so we just need to ack tags from first (smallest) unconfirmed position till currentlry delivered withmultiple
flag to fit RabbitMQ publisher confirms protocol extension.There is additional test case to test and compare old and new behaviour. It is not for enclusion in resulting PR - just for experiments. You can change messages count to higher value when difference increased drastically.
Fixes #615