diff --git a/spec/higher_level_api/integration/connection_recovery_spec.rb b/spec/higher_level_api/integration/connection_recovery_spec.rb index 1fc09c7ea..8cf0b29b8 100644 --- a/spec/higher_level_api/integration/connection_recovery_spec.rb +++ b/spec/higher_level_api/integration/connection_recovery_spec.rb @@ -3,37 +3,27 @@ describe "Connection recovery" do let(:http_client) { RabbitMQ::HTTP::Client.new("http://127.0.0.1:15672") } + let(:logger) { Logger.new($stderr).tap {|logger| logger.level = Logger::FATAL} } + let(:recovery_interval) { 0.2 } it "reconnects after grace period" do with_open do |c| close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery - expect(c).to be_open + wait_on_loss_and_recovery_of { connections.any? } end end it "reconnects after grace period (with multiple hosts)" do with_open_multi_host do |c| close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery - expect(c).to be_open + wait_on_loss_and_recovery_of { connections.any? } end end it "reconnects after grace period (with multiple hosts, including a broken one)" do with_open_multi_broken_host do |c| close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery - expect(c).to be_open + wait_on_loss_and_recovery_of { connections.any? } end end @@ -42,10 +32,8 @@ ch1 = c.create_channel ch2 = c.create_channel close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + poll_until { channels.count.zero? } + poll_until { channels.count == 2 } expect(ch1).to be_open expect(ch2).to be_open end @@ -56,10 +44,8 @@ ch1 = c.create_channel ch2 = c.create_channel close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + poll_until { channels.count.zero? } + poll_until { channels.count == 2 } expect(ch1).to be_open expect(ch2).to be_open end @@ -70,10 +56,8 @@ ch1 = c.create_channel ch2 = c.create_channel close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + poll_until { channels.count.zero? } + poll_until { channels.count == 2 } expect(ch1).to be_open expect(ch2).to be_open end @@ -86,10 +70,7 @@ expect(ch.prefetch_count).to eq 11 expect(ch.prefetch_global).to be false close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + wait_on_loss_and_recovery_of { connections.any? } expect(ch).to be_open expect(ch.prefetch_count).to eq 11 expect(ch.prefetch_global).to be false @@ -103,10 +84,7 @@ expect(ch.prefetch_count).to eq 42 expect(ch.prefetch_global).to be true close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + wait_on_loss_and_recovery_of { connections.any? } expect(ch).to be_open expect(ch.prefetch_count).to eq 42 expect(ch.prefetch_global).to be true @@ -119,10 +97,7 @@ ch.confirm_select expect(ch).to be_using_publisher_confirms close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + wait_on_loss_and_recovery_of { connections.any? } expect(ch).to be_open expect(ch).to be_using_publisher_confirms end @@ -134,10 +109,7 @@ ch.tx_select expect(ch).to be_using_tx close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + wait_on_loss_and_recovery_of { connections.any? } expect(ch).to be_open expect(ch).to be_using_tx end @@ -148,10 +120,7 @@ ch = c.create_channel q = ch.queue("bunny.tests.recovery.client-named#{rand}") close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + wait_on_loss_and_recovery_of { connections.any? } expect(ch).to be_open ensure_queue_recovery(ch, q) q.delete @@ -164,10 +133,7 @@ ch = c.create_channel q = ch.queue("", :exclusive => true) close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + wait_on_loss_and_recovery_of { connections.any? } expect(ch).to be_open ensure_queue_recovery(ch, q) end @@ -180,10 +146,7 @@ q = ch.queue("", :exclusive => true) q.bind(x) close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + wait_on_loss_and_recovery_of { connections.any? } expect(ch).to be_open ensure_queue_binding_recovery(ch, x, q) end @@ -223,11 +186,7 @@ 10.times { c.create_channel } expect(c.queue_exists?(q)).to eq false close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery - expect(c.queue_exists?(q)).to eq false + wait_on_loss_and_recovery_of { channels.any? } # make sure the connection isn't closed shortly after # due to "second 'channel.open' seen". MK. expect(c).to be_open @@ -248,15 +207,12 @@ delivered = true end close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery + wait_on_loss_and_recovery_of { connections.any? } expect(ch).to be_open q.publish("") - sleep 0.5 - expect(delivered).to eq true + + poll_until { delivered } end end @@ -266,18 +222,11 @@ with_open do |c| ch = c.create_channel q = ch.queue("", :exclusive => true) - n.times do - q.subscribe do |_, _, _| - delivered = true - end - end + n.times { q.subscribe { |_, _, _| } } close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery - sleep 1 + wait_on_loss_and_recovery_of { connections.any? } expect(ch).to be_open + sleep 0.5 expect(q.consumer_count).to eq n end @@ -295,11 +244,8 @@ qs << ch.queue("", :exclusive => true) end close_all_connections! - sleep 0.1 - expect(c).not_to be_open - - wait_for_recovery - sleep 1 + wait_on_loss_and_recovery_of { queue_names.include?(qs.first.name) } + sleep 0.5 expect(ch).to be_open qs.each do |q| @@ -309,11 +255,17 @@ end it "tries to recover for a given number of attempts" do + pending "Need a fix for https://github.com/ruby-amqp/bunny/issues/408" with_recovery_attempts_limited_to(2) do |c| close_all_connections! - expect(c).to receive(:start).exactly(2).times.and_raise(Bunny::TCPConnectionFailed.new("test")) + wait_on_loss_and_recovery_of { connections.any? } - wait_for_recovery + close_all_connections! + wait_on_loss_and_recovery_of { connections.any? } + + close_all_connections! + sleep(recovery_interval + 0.5) + expect(connections).to be_empty end end @@ -321,8 +273,20 @@ def exchange_names_in_vhost(vhost) http_client.list_exchanges(vhost).map {|e| e["name"]} end + def connections + http_client.list_connections + end + + def channels + http_client.list_channels + end + + def queue_names + http_client.list_queues.map {|q| q["name"]} + end + def close_all_connections! - http_client.list_connections.each do |conn_info| + connections.each do |conn_info| close_ignoring_permitted_exceptions(conn_info.name) end end @@ -332,76 +296,63 @@ def close_ignoring_permitted_exceptions(connection_name) rescue Bunny::ConnectionForced end - def wait_for_recovery - sleep 1.5 - end - def wait_on_loss_and_recovery_of(&probe) - poll_while probe - poll_until probe + poll_while &probe + poll_until &probe end - def poll_while(probe) - Timeout::timeout(30) { + def poll_while(&probe) + Timeout::timeout(10) { sleep 0.1 while probe[] } end - def poll_until(probe) - Timeout::timeout(30) { + def poll_until(&probe) + Timeout::timeout(10) { sleep 0.1 until probe[] } end - def with_open(c = Bunny.new(:network_recovery_interval => 0.2, :recover_from_connection_close => true), &block) - begin - c.start - block.call(c) - ensure - c.close - end + def with_open(c = Bunny.new(network_recovery_interval: recovery_interval, + recover_from_connection_close: true, + logger: logger), &block) + c.start + block.call(c) + ensure + c.close end - def with_open_multi_host( c = Bunny.new( :hosts => ["127.0.0.1", "localhost"], - :network_recovery_interval => 0.2, - :recover_from_connection_close => true), &block) - begin - c.start - block.call(c) - ensure - c.close - end + def with_open_multi_host(&block) + c = Bunny.new(hosts: ["127.0.0.1", "localhost"], + network_recovery_interval: recovery_interval, + recover_from_connection_close: true, + logger: logger) + with_open(c, &block) end - def with_open_multi_broken_host( c = Bunny.new( :hosts => ["broken", "127.0.0.1", "localhost"], - :hosts_shuffle_strategy => Proc.new { |hosts| hosts }, # We do not shuffle for these tests so we always hit the broken host - :network_recovery_interval => 0.2, - :recover_from_connection_close => true), &block) - begin - c.start - block.call(c) - ensure - c.close - end + def with_open_multi_broken_host(&block) + c = Bunny.new(hosts: ["broken", "127.0.0.1", "localhost"], + hosts_shuffle_strategy: Proc.new { |hosts| hosts }, # We do not shuffle for these tests so we always hit the broken host + network_recovery_interval: recovery_interval, + recover_from_connection_close: true, + logger: logger) + with_open(c, &block) end def with_recovery_attempts_limited_to(attempts = 3, &block) - c = Bunny.new(:recover_from_connection_close => true, :network_recovery_interval => 0.2, :recovery_attempts => attempts) - begin - c.start - block.call(c) - ensure - c.close - end + c = Bunny.new(recover_from_connection_close: true, + network_recovery_interval: recovery_interval, + recovery_attempts: attempts, + logger: logger) + with_open(c, &block) end def ensure_queue_recovery(ch, q) ch.confirm_select q.purge x = ch.default_exchange - x.publish("msg", :routing_key => q.name) + x.publish("msg", routing_key: q.name) ch.wait_for_confirms - sleep 0.5 expect(q.message_count).to eq 1 q.purge end @@ -409,9 +360,8 @@ def ensure_queue_recovery(ch, q) def ensure_queue_binding_recovery(ch, x, q, routing_key = "") ch.confirm_select q.purge - x.publish("msg", :routing_key => routing_key) + x.publish("msg", routing_key: routing_key) ch.wait_for_confirms - sleep 0.5 expect(q.message_count).to eq 1 q.purge end