From ccb99a45e886071246ad39ea3fd7436d2914544c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=B4mulo=20A=2E=20Ceccon?= Date: Wed, 13 Jul 2022 14:20:07 -0300 Subject: [PATCH] Avoid race between multi-message ack and publish PR #617 introduced an optimization for multi-message acks but failed to protect @unconfirmed_set. In edge cases #basic_publish would attempt to modify it during iteration with Enumerable#min: /lib/ruby/3.1.0/set.rb:522:in `add': can't add a new key into hash during iteration (RuntimeError) from /lib/bunny/channel.rb:555:in `block in basic_publish' from /lib/bunny/channel.rb:554:in `synchronize' from /lib/bunny/channel.rb:554:in `basic_publish' from /lib/bunny/exchange.rb:141:in `publish' --- lib/bunny/channel.rb | 10 +++--- spec/stress/multi_message_ack_spec.rb | 48 +++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 5 deletions(-) create mode 100644 spec/stress/multi_message_ack_spec.rb diff --git a/lib/bunny/channel.rb b/lib/bunny/channel.rb index 920e26cad..924170679 100644 --- a/lib/bunny/channel.rb +++ b/lib/bunny/channel.rb @@ -1788,12 +1788,12 @@ def handle_basic_return(basic_return, properties, content) # @private def handle_ack_or_nack(delivery_tag_before_offset, multiple, nack) - delivery_tag = delivery_tag_before_offset + @delivery_tag_offset - confirmed_range_start = multiple ? @delivery_tag_offset + @unconfirmed_set.min : delivery_tag - confirmed_range_end = delivery_tag - confirmed_range = (confirmed_range_start..confirmed_range_end) - @unconfirmed_set_mutex.synchronize do + delivery_tag = delivery_tag_before_offset + @delivery_tag_offset + confirmed_range_start = multiple ? @delivery_tag_offset + @unconfirmed_set.min : delivery_tag + confirmed_range_end = delivery_tag + confirmed_range = (confirmed_range_start..confirmed_range_end) + if nack @nacked_set.merge(@unconfirmed_set & confirmed_range) end diff --git a/spec/stress/multi_message_ack_spec.rb b/spec/stress/multi_message_ack_spec.rb new file mode 100644 index 000000000..2ab4c34ff --- /dev/null +++ b/spec/stress/multi_message_ack_spec.rb @@ -0,0 +1,48 @@ +require "spec_helper" + +unless ENV["CI"] + describe "Subscription acknowledging multi-messages" do + before :all do + @connection = Bunny.new(username: "bunny_gem", password: "bunny_password", + vhost: "bunny_testbed", automatically_recover: false) + @connection.start + end + + let(:max_messages) { 100_000 } + + it "successfully completes" do + body = "." + + ch = @connection.create_channel + ch.confirm_select + + q = ch.queue("multi-messages") + + m = Mutex.new + acks = 0 + pubs = 0 + last = Time.now + + q.subscribe(manual_ack: true) do |delivery_info, _, _| + sleep(0) if rand < 0.01 + ch.ack(delivery_info.delivery_tag) + + m.synchronize do + acks += 1 + now = Time.now + if now - last > 0.5 + puts "Ack multi-message: acks=#{acks} pubs=#{pubs}" + last = now + end + end + end + + (1..max_messages).each do + q.publish(".") + m.synchronize { pubs += 1 } + end + + sleep 0.1 while acks < pubs + end + end +end