From 68865c56e1a60c518f647ecc19b0374bd9ffe30e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AE=D1=80=D1=83=D1=81=D0=BE=D0=B2=20=D0=92=D0=BB=D0=B0?= =?UTF-8?q?=D0=B4=D0=B8=D1=81=D0=BB=D0=B0=D0=B2?= Date: Thu, 10 Jun 2021 12:59:43 +0300 Subject: [PATCH 1/5] Fixed handling acks when multiple is true, confirms only unconfirmed messages --- lib/bunny/channel.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/bunny/channel.rb b/lib/bunny/channel.rb index 607262b62..43c0f3e35 100644 --- a/lib/bunny/channel.rb +++ b/lib/bunny/channel.rb @@ -1789,7 +1789,7 @@ def handle_basic_return(basic_return, properties, content) # @private def handle_ack_or_nack(delivery_tag_before_offset, multiple, nack) delivery_tag = delivery_tag_before_offset + @delivery_tag_offset - confirmed_range_start = multiple ? @delivery_tag_offset + 1 : delivery_tag + confirmed_range_start = multiple ? @delivery_tag_offset + @unconfirmed_set.first : delivery_tag confirmed_range_end = delivery_tag confirmed_range = (confirmed_range_start..confirmed_range_end) From 66568fdac88fd07cec9b257ca35bdec8f65a6c1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AE=D1=80=D1=83=D1=81=D0=BE=D0=B2=20=D0=92=D0=BB=D0=B0?= =?UTF-8?q?=D0=B4=D0=B8=D1=81=D0=BB=D0=B0=D0=B2?= Date: Thu, 10 Jun 2021 13:00:42 +0300 Subject: [PATCH 2/5] Issue 615 benchmark --- spec/issues/issue615_spec.rb | 110 +++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 spec/issues/issue615_spec.rb diff --git a/spec/issues/issue615_spec.rb b/spec/issues/issue615_spec.rb new file mode 100644 index 000000000..02145660e --- /dev/null +++ b/spec/issues/issue615_spec.rb @@ -0,0 +1,110 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe 'multiple atribute handling on acks' do + # Monkey patch with 2 implementations of confirmed subsets + counters for testing + class Bunny::Channel + attr_accessor :multiples, :old_implementation + + def handle_ack_or_nack(delivery_tag_before_offset, multiple, nack) + range_start_testing = old_implementation ? 1 : @unconfirmed_set.first # choose between subsets + + delivery_tag = delivery_tag_before_offset + @delivery_tag_offset + confirmed_range_start = multiple ? @delivery_tag_offset + range_start_testing : delivery_tag + confirmed_range_end = delivery_tag + confirmed_range = (confirmed_range_start..confirmed_range_end) + + # just counting multiples to proof we recieved some + @multiples = @multiples.to_i + 1 if multiple + + @unconfirmed_set_mutex.synchronize do + if nack + @nacked_set.merge(@unconfirmed_set & confirmed_range) + end + + @unconfirmed_set.subtract(confirmed_range) + + @only_acks_received = (@only_acks_received && !nack) + + @confirms_continuations.push(true) if @unconfirmed_set.empty? + + if @confirms_callback + confirmed_range.each do |tag| + @confirms_callback.call(tag, false, nack) + end + end + end + end + end + + def measure + result = {} + result[:begin] = Process.clock_gettime(Process::CLOCK_MONOTONIC) + yield(result) + rescue => e + result[:exception] = e + ensure + result[:end] = Process.clock_gettime(Process::CLOCK_MONOTONIC) + result[:duration] = (result[:end] - result[:begin]).round(2) + return result + end + + before(:all) do + @connection = Bunny.new(username: 'bunny_gem', + password: 'bunny_password', + vhost: 'bunny_testbed', + automatic_recovery: false, + write_timeout: 0, + read_timeout: 0) + @connection.start + end + + after(:all) do + @connection.close if @connection.open? + end + + let(:messages) { 10_000 } + let(:channel) { @connection.create_channel } + let(:queue) { channel.queue('bunny.basic.ack.multiple-acks', auto_delete: true) } + + context 'when multiple atribute used' do + it 'faster with new implementation' do + queue.subscribe(manual_ack: false) + + channel.confirm_select + + old = measure do + channel.old_implementation = true + exchange = channel.default_exchange + + messages.times do + exchange.publish('small message', routing_key: queue.name) + end + + channel.wait_for_confirms + + expect(channel.multiples).to be >= 1 + end + puts "Old implementation duration: #{old[:duration]} seconds" + + channel.multiples = 0 # clean up + + new = measure do + channel.old_implementation = false + exchange = channel.default_exchange + + messages.times do + exchange.publish('small message', routing_key: queue.name) + end + + channel.wait_for_confirms + + expect(channel.multiples).to be >= 1 + end + puts "New implementation duration: #{new[:duration]} seconds" + + expect(new[:duration]).to be < old[:duration] + end + end +end From 4eaf480ef8c0fe1f709361efe9ffe72e5fb84cac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AE=D1=80=D1=83=D1=81=D0=BE=D0=B2=20=D0=92=D0=BB=D0=B0?= =?UTF-8?q?=D0=B4=D0=B8=D1=81=D0=BB=D0=B0=D0=B2?= Date: Thu, 10 Jun 2021 13:09:34 +0300 Subject: [PATCH 3/5] Issue 615 benchmark --- spec/issues/issue615_spec.rb | 72 ++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/spec/issues/issue615_spec.rb b/spec/issues/issue615_spec.rb index 02145660e..23dcf15ae 100644 --- a/spec/issues/issue615_spec.rb +++ b/spec/issues/issue615_spec.rb @@ -3,41 +3,6 @@ require 'spec_helper' describe 'multiple atribute handling on acks' do - # Monkey patch with 2 implementations of confirmed subsets + counters for testing - class Bunny::Channel - attr_accessor :multiples, :old_implementation - - def handle_ack_or_nack(delivery_tag_before_offset, multiple, nack) - range_start_testing = old_implementation ? 1 : @unconfirmed_set.first # choose between subsets - - delivery_tag = delivery_tag_before_offset + @delivery_tag_offset - confirmed_range_start = multiple ? @delivery_tag_offset + range_start_testing : delivery_tag - confirmed_range_end = delivery_tag - confirmed_range = (confirmed_range_start..confirmed_range_end) - - # just counting multiples to proof we recieved some - @multiples = @multiples.to_i + 1 if multiple - - @unconfirmed_set_mutex.synchronize do - if nack - @nacked_set.merge(@unconfirmed_set & confirmed_range) - end - - @unconfirmed_set.subtract(confirmed_range) - - @only_acks_received = (@only_acks_received && !nack) - - @confirms_continuations.push(true) if @unconfirmed_set.empty? - - if @confirms_callback - confirmed_range.each do |tag| - @confirms_callback.call(tag, false, nack) - end - end - end - end - end - def measure result = {} result[:begin] = Process.clock_gettime(Process::CLOCK_MONOTONIC) @@ -60,6 +25,43 @@ def measure @connection.start end + before do + # Monkey patch with 2 implementations of confirmed subsets + counters for testing + class Bunny::Channel + attr_accessor :multiples, :old_implementation + + def handle_ack_or_nack(delivery_tag_before_offset, multiple, nack) + range_start_testing = old_implementation ? 1 : @unconfirmed_set.first # choose between subsets + + delivery_tag = delivery_tag_before_offset + @delivery_tag_offset + confirmed_range_start = multiple ? @delivery_tag_offset + range_start_testing : delivery_tag + confirmed_range_end = delivery_tag + confirmed_range = (confirmed_range_start..confirmed_range_end) + + # just counting multiples to proof we recieved some + @multiples = @multiples.to_i + 1 if multiple + + @unconfirmed_set_mutex.synchronize do + if nack + @nacked_set.merge(@unconfirmed_set & confirmed_range) + end + + @unconfirmed_set.subtract(confirmed_range) + + @only_acks_received = (@only_acks_received && !nack) + + @confirms_continuations.push(true) if @unconfirmed_set.empty? + + if @confirms_callback + confirmed_range.each do |tag| + @confirms_callback.call(tag, false, nack) + end + end + end + end + end + end + after(:all) do @connection.close if @connection.open? end From 5ce1f700c810972148a556fe126a49fe898257e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AE=D1=80=D1=83=D1=81=D0=BE=D0=B2=20=D0=92=D0=BB=D0=B0?= =?UTF-8?q?=D0=B4=D0=B8=D1=81=D0=BB=D0=B0=D0=B2?= Date: Fri, 11 Jun 2021 13:26:10 +0300 Subject: [PATCH 4/5] Using min instead of first, since there is no guarantee that set would be sorted --- lib/bunny/channel.rb | 2 +- spec/issues/issue615_spec.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/bunny/channel.rb b/lib/bunny/channel.rb index 43c0f3e35..b1d4d444e 100644 --- a/lib/bunny/channel.rb +++ b/lib/bunny/channel.rb @@ -1789,7 +1789,7 @@ def handle_basic_return(basic_return, properties, content) # @private def handle_ack_or_nack(delivery_tag_before_offset, multiple, nack) delivery_tag = delivery_tag_before_offset + @delivery_tag_offset - confirmed_range_start = multiple ? @delivery_tag_offset + @unconfirmed_set.first : delivery_tag + confirmed_range_start = multiple ? @delivery_tag_offset + @unconfirmed_set.min : delivery_tag confirmed_range_end = delivery_tag confirmed_range = (confirmed_range_start..confirmed_range_end) diff --git a/spec/issues/issue615_spec.rb b/spec/issues/issue615_spec.rb index 23dcf15ae..28445cbaa 100644 --- a/spec/issues/issue615_spec.rb +++ b/spec/issues/issue615_spec.rb @@ -31,7 +31,7 @@ class Bunny::Channel attr_accessor :multiples, :old_implementation def handle_ack_or_nack(delivery_tag_before_offset, multiple, nack) - range_start_testing = old_implementation ? 1 : @unconfirmed_set.first # choose between subsets + range_start_testing = old_implementation ? 1 : @unconfirmed_set.min # choose between subsets delivery_tag = delivery_tag_before_offset + @delivery_tag_offset confirmed_range_start = multiple ? @delivery_tag_offset + range_start_testing : delivery_tag From c7ee631ae874c5c0b56ca86c057bfb8176149f25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AE=D1=80=D1=83=D1=81=D0=BE=D0=B2=20=D0=92=D0=BB=D0=B0?= =?UTF-8?q?=D0=B4=D0=B8=D1=81=D0=BB=D0=B0=D0=B2?= Date: Wed, 16 Jun 2021 11:04:02 +0300 Subject: [PATCH 5/5] Cleaned up temp specs --- spec/issues/issue615_spec.rb | 112 ----------------------------------- 1 file changed, 112 deletions(-) delete mode 100644 spec/issues/issue615_spec.rb diff --git a/spec/issues/issue615_spec.rb b/spec/issues/issue615_spec.rb deleted file mode 100644 index 28445cbaa..000000000 --- a/spec/issues/issue615_spec.rb +++ /dev/null @@ -1,112 +0,0 @@ -# frozen_string_literal: true - -require 'spec_helper' - -describe 'multiple atribute handling on acks' do - def measure - result = {} - result[:begin] = Process.clock_gettime(Process::CLOCK_MONOTONIC) - yield(result) - rescue => e - result[:exception] = e - ensure - result[:end] = Process.clock_gettime(Process::CLOCK_MONOTONIC) - result[:duration] = (result[:end] - result[:begin]).round(2) - return result - end - - before(:all) do - @connection = Bunny.new(username: 'bunny_gem', - password: 'bunny_password', - vhost: 'bunny_testbed', - automatic_recovery: false, - write_timeout: 0, - read_timeout: 0) - @connection.start - end - - before do - # Monkey patch with 2 implementations of confirmed subsets + counters for testing - class Bunny::Channel - attr_accessor :multiples, :old_implementation - - def handle_ack_or_nack(delivery_tag_before_offset, multiple, nack) - range_start_testing = old_implementation ? 1 : @unconfirmed_set.min # choose between subsets - - delivery_tag = delivery_tag_before_offset + @delivery_tag_offset - confirmed_range_start = multiple ? @delivery_tag_offset + range_start_testing : delivery_tag - confirmed_range_end = delivery_tag - confirmed_range = (confirmed_range_start..confirmed_range_end) - - # just counting multiples to proof we recieved some - @multiples = @multiples.to_i + 1 if multiple - - @unconfirmed_set_mutex.synchronize do - if nack - @nacked_set.merge(@unconfirmed_set & confirmed_range) - end - - @unconfirmed_set.subtract(confirmed_range) - - @only_acks_received = (@only_acks_received && !nack) - - @confirms_continuations.push(true) if @unconfirmed_set.empty? - - if @confirms_callback - confirmed_range.each do |tag| - @confirms_callback.call(tag, false, nack) - end - end - end - end - end - end - - after(:all) do - @connection.close if @connection.open? - end - - let(:messages) { 10_000 } - let(:channel) { @connection.create_channel } - let(:queue) { channel.queue('bunny.basic.ack.multiple-acks', auto_delete: true) } - - context 'when multiple atribute used' do - it 'faster with new implementation' do - queue.subscribe(manual_ack: false) - - channel.confirm_select - - old = measure do - channel.old_implementation = true - exchange = channel.default_exchange - - messages.times do - exchange.publish('small message', routing_key: queue.name) - end - - channel.wait_for_confirms - - expect(channel.multiples).to be >= 1 - end - puts "Old implementation duration: #{old[:duration]} seconds" - - channel.multiples = 0 # clean up - - new = measure do - channel.old_implementation = false - exchange = channel.default_exchange - - messages.times do - exchange.publish('small message', routing_key: queue.name) - end - - channel.wait_for_confirms - - expect(channel.multiples).to be >= 1 - end - puts "New implementation duration: #{new[:duration]} seconds" - - expect(new[:duration]).to be < old[:duration] - end - end -end