Skip to content

Commit

Permalink
Rework recovery test
Browse files Browse the repository at this point in the history
- Wait for state changes on HTTP API, with timeouts. This is about 3x
  faster than the previous constant sleeps on my machine.
- Silence log output
- Pend out an example that was incorrectly passing
  (attempt limits: ruby-amqp#408)

Issue ruby-amqp#410
  • Loading branch information
camelpunch committed Jun 13, 2016
1 parent ee31af6 commit 1ca321b
Showing 1 changed file with 81 additions and 131 deletions.
212 changes: 81 additions & 131 deletions spec/higher_level_api/integration/connection_recovery_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,27 @@

describe "Connection recovery" do
let(:http_client) { RabbitMQ::HTTP::Client.new("http://127.0.0.1:15672") }
let(:logger) { Logger.new($stderr).tap {|logger| logger.level = Logger::FATAL} }
let(:recovery_interval) { 0.2 }

it "reconnects after grace period" do
with_open do |c|
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
expect(c).to be_open
wait_on_loss_and_recovery_of { connections.any? }
end
end

it "reconnects after grace period (with multiple hosts)" do
with_open_multi_host do |c|
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
expect(c).to be_open
wait_on_loss_and_recovery_of { connections.any? }
end
end

it "reconnects after grace period (with multiple hosts, including a broken one)" do
with_open_multi_broken_host do |c|
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
expect(c).to be_open
wait_on_loss_and_recovery_of { connections.any? }
end
end

Expand All @@ -42,10 +32,8 @@
ch1 = c.create_channel
ch2 = c.create_channel
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
poll_until { channels.count.zero? }
poll_until { channels.count == 2 }
expect(ch1).to be_open
expect(ch2).to be_open
end
Expand All @@ -56,10 +44,8 @@
ch1 = c.create_channel
ch2 = c.create_channel
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
poll_until { channels.count.zero? }
poll_until { channels.count == 2 }
expect(ch1).to be_open
expect(ch2).to be_open
end
Expand All @@ -70,10 +56,8 @@
ch1 = c.create_channel
ch2 = c.create_channel
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
poll_until { channels.count.zero? }
poll_until { channels.count == 2 }
expect(ch1).to be_open
expect(ch2).to be_open
end
Expand All @@ -86,10 +70,7 @@
expect(ch.prefetch_count).to eq 11
expect(ch.prefetch_global).to be false
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
wait_on_loss_and_recovery_of { connections.any? }
expect(ch).to be_open
expect(ch.prefetch_count).to eq 11
expect(ch.prefetch_global).to be false
Expand All @@ -103,10 +84,7 @@
expect(ch.prefetch_count).to eq 42
expect(ch.prefetch_global).to be true
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
wait_on_loss_and_recovery_of { connections.any? }
expect(ch).to be_open
expect(ch.prefetch_count).to eq 42
expect(ch.prefetch_global).to be true
Expand All @@ -119,10 +97,7 @@
ch.confirm_select
expect(ch).to be_using_publisher_confirms
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
wait_on_loss_and_recovery_of { connections.any? }
expect(ch).to be_open
expect(ch).to be_using_publisher_confirms
end
Expand All @@ -134,10 +109,7 @@
ch.tx_select
expect(ch).to be_using_tx
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
wait_on_loss_and_recovery_of { connections.any? }
expect(ch).to be_open
expect(ch).to be_using_tx
end
Expand All @@ -148,10 +120,7 @@
ch = c.create_channel
q = ch.queue("bunny.tests.recovery.client-named#{rand}")
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
wait_on_loss_and_recovery_of { connections.any? }
expect(ch).to be_open
ensure_queue_recovery(ch, q)
q.delete
Expand All @@ -164,10 +133,7 @@
ch = c.create_channel
q = ch.queue("", :exclusive => true)
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
wait_on_loss_and_recovery_of { connections.any? }
expect(ch).to be_open
ensure_queue_recovery(ch, q)
end
Expand All @@ -180,10 +146,7 @@
q = ch.queue("", :exclusive => true)
q.bind(x)
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
wait_on_loss_and_recovery_of { connections.any? }
expect(ch).to be_open
ensure_queue_binding_recovery(ch, x, q)
end
Expand Down Expand Up @@ -223,11 +186,7 @@
10.times { c.create_channel }
expect(c.queue_exists?(q)).to eq false
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
expect(c.queue_exists?(q)).to eq false
wait_on_loss_and_recovery_of { channels.any? }
# make sure the connection isn't closed shortly after
# due to "second 'channel.open' seen". MK.
expect(c).to be_open
Expand All @@ -248,15 +207,12 @@
delivered = true
end
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
wait_on_loss_and_recovery_of { connections.any? }
expect(ch).to be_open

q.publish("")
sleep 0.5
expect(delivered).to eq true

poll_until { delivered }
end
end

Expand All @@ -266,18 +222,11 @@
with_open do |c|
ch = c.create_channel
q = ch.queue("", :exclusive => true)
n.times do
q.subscribe do |_, _, _|
delivered = true
end
end
n.times { q.subscribe { |_, _, _| } }
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
sleep 1
wait_on_loss_and_recovery_of { connections.any? }
expect(ch).to be_open
sleep 0.5

expect(q.consumer_count).to eq n
end
Expand All @@ -295,11 +244,8 @@
qs << ch.queue("", :exclusive => true)
end
close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
sleep 1
wait_on_loss_and_recovery_of { queue_names.include?(qs.first.name) }
sleep 0.5
expect(ch).to be_open

qs.each do |q|
Expand All @@ -309,20 +255,38 @@
end

it "tries to recover for a given number of attempts" do
pending "Need a fix for https://github.com/ruby-amqp/bunny/issues/408"
with_recovery_attempts_limited_to(2) do |c|
close_all_connections!
expect(c).to receive(:start).exactly(2).times.and_raise(Bunny::TCPConnectionFailed.new("test"))
wait_on_loss_and_recovery_of { connections.any? }

wait_for_recovery
close_all_connections!
wait_on_loss_and_recovery_of { connections.any? }

close_all_connections!
sleep(recovery_interval + 0.5)
expect(connections).to be_empty
end
end

def exchange_names_in_vhost(vhost)
http_client.list_exchanges(vhost).map {|e| e["name"]}
end

def connections
http_client.list_connections
end

def channels
http_client.list_channels
end

def queue_names
http_client.list_queues.map {|q| q["name"]}
end

def close_all_connections!
http_client.list_connections.each do |conn_info|
connections.each do |conn_info|
close_ignoring_permitted_exceptions(conn_info.name)
end
end
Expand All @@ -332,86 +296,72 @@ def close_ignoring_permitted_exceptions(connection_name)
rescue Bunny::ConnectionForced
end

def wait_for_recovery
sleep 1.5
end

def wait_on_loss_and_recovery_of(&probe)
poll_while probe
poll_until probe
poll_while &probe
poll_until &probe
end

def poll_while(probe)
Timeout::timeout(30) {
def poll_while(&probe)
Timeout::timeout(10) {
sleep 0.1 while probe[]
}
end

def poll_until(probe)
Timeout::timeout(30) {
def poll_until(&probe)
Timeout::timeout(10) {
sleep 0.1 until probe[]
}
end

def with_open(c = Bunny.new(:network_recovery_interval => 0.2, :recover_from_connection_close => true), &block)
begin
c.start
block.call(c)
ensure
c.close
end
def with_open(c = Bunny.new(network_recovery_interval: recovery_interval,
recover_from_connection_close: true,
logger: logger), &block)
c.start
block.call(c)
ensure
c.close
end

def with_open_multi_host( c = Bunny.new( :hosts => ["127.0.0.1", "localhost"],
:network_recovery_interval => 0.2,
:recover_from_connection_close => true), &block)
begin
c.start
block.call(c)
ensure
c.close
end
def with_open_multi_host(&block)
c = Bunny.new(hosts: ["127.0.0.1", "localhost"],
network_recovery_interval: recovery_interval,
recover_from_connection_close: true,
logger: logger)
with_open(c, &block)
end

def with_open_multi_broken_host( c = Bunny.new( :hosts => ["broken", "127.0.0.1", "localhost"],
:hosts_shuffle_strategy => Proc.new { |hosts| hosts }, # We do not shuffle for these tests so we always hit the broken host
:network_recovery_interval => 0.2,
:recover_from_connection_close => true), &block)
begin
c.start
block.call(c)
ensure
c.close
end
def with_open_multi_broken_host(&block)
c = Bunny.new(hosts: ["broken", "127.0.0.1", "localhost"],
hosts_shuffle_strategy: Proc.new { |hosts| hosts }, # We do not shuffle for these tests so we always hit the broken host
network_recovery_interval: recovery_interval,
recover_from_connection_close: true,
logger: logger)
with_open(c, &block)
end

def with_recovery_attempts_limited_to(attempts = 3, &block)
c = Bunny.new(:recover_from_connection_close => true, :network_recovery_interval => 0.2, :recovery_attempts => attempts)
begin
c.start
block.call(c)
ensure
c.close
end
c = Bunny.new(recover_from_connection_close: true,
network_recovery_interval: recovery_interval,
recovery_attempts: attempts,
logger: logger)
with_open(c, &block)
end

def ensure_queue_recovery(ch, q)
ch.confirm_select
q.purge
x = ch.default_exchange
x.publish("msg", :routing_key => q.name)
x.publish("msg", routing_key: q.name)
ch.wait_for_confirms
sleep 0.5
expect(q.message_count).to eq 1
q.purge
end

def ensure_queue_binding_recovery(ch, x, q, routing_key = "")
ch.confirm_select
q.purge
x.publish("msg", :routing_key => routing_key)
x.publish("msg", routing_key: routing_key)
ch.wait_for_confirms
sleep 0.5
expect(q.message_count).to eq 1
q.purge
end
Expand Down

0 comments on commit 1ca321b

Please sign in to comment.