Skip to content

Commit

Permalink
Avoid race between multi-message ack and publish
Browse files Browse the repository at this point in the history
PR ruby-amqp#617 introduced an optimization for multi-message acks but failed
to protect @unconfirmed_set. In edge cases #basic_publish would
attempt to modify it during iteration with Enumerable#min:

    <ruby>/lib/ruby/3.1.0/set.rb:522:in `add': can't add a new key
    into hash during iteration (RuntimeError)
    from <bunny>/lib/bunny/channel.rb:555:in `block in basic_publish'
    from <bunny>/lib/bunny/channel.rb:554:in `synchronize'
    from <bunny>/lib/bunny/channel.rb:554:in `basic_publish'
    from <bunny>/lib/bunny/exchange.rb:141:in `publish'
  • Loading branch information
romuloceccon committed Jul 13, 2022
1 parent 138a8bf commit ccb99a4
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 5 deletions.
10 changes: 5 additions & 5 deletions lib/bunny/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1788,12 +1788,12 @@ def handle_basic_return(basic_return, properties, content)

# @private
def handle_ack_or_nack(delivery_tag_before_offset, multiple, nack)
delivery_tag = delivery_tag_before_offset + @delivery_tag_offset
confirmed_range_start = multiple ? @delivery_tag_offset + @unconfirmed_set.min : delivery_tag
confirmed_range_end = delivery_tag
confirmed_range = (confirmed_range_start..confirmed_range_end)

@unconfirmed_set_mutex.synchronize do
delivery_tag = delivery_tag_before_offset + @delivery_tag_offset
confirmed_range_start = multiple ? @delivery_tag_offset + @unconfirmed_set.min : delivery_tag
confirmed_range_end = delivery_tag
confirmed_range = (confirmed_range_start..confirmed_range_end)

if nack
@nacked_set.merge(@unconfirmed_set & confirmed_range)
end
Expand Down
48 changes: 48 additions & 0 deletions spec/stress/multi_message_ack_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
require "spec_helper"

unless ENV["CI"]
describe "Subscription acknowledging multi-messages" do
before :all do
@connection = Bunny.new(username: "bunny_gem", password: "bunny_password",
vhost: "bunny_testbed", automatically_recover: false)
@connection.start
end

let(:max_messages) { 100_000 }

it "successfully completes" do
body = "."

ch = @connection.create_channel
ch.confirm_select

q = ch.queue("multi-messages")

m = Mutex.new
acks = 0
pubs = 0
last = Time.now

q.subscribe(manual_ack: true) do |delivery_info, _, _|
sleep(0) if rand < 0.01
ch.ack(delivery_info.delivery_tag)

m.synchronize do
acks += 1
now = Time.now
if now - last > 0.5
puts "Ack multi-message: acks=#{acks} pubs=#{pubs}"
last = now
end
end
end

(1..max_messages).each do
q.publish(".")
m.synchronize { pubs += 1 }
end

sleep 0.1 while acks < pubs
end
end
end

0 comments on commit ccb99a4

Please sign in to comment.