Skip to content

Commit

Permalink
Recover exchanges and their bindings
Browse files Browse the repository at this point in the history
Introduce test helper in recovery spec to expect loss, then recovery of
a predicate.

Issue ruby-amqp#410
  • Loading branch information
camelpunch committed Jun 13, 2016
1 parent 6eea36e commit ee31af6
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 14 deletions.
8 changes: 8 additions & 0 deletions lib/bunny/exchange.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def initialize(channel, type, name, opts = {})
@internal = @options[:internal]
@arguments = @options[:arguments]

@bindings = Set.new

declare! unless opts[:no_declare] || predeclared? || (@name == AMQ::Protocol::EMPTY_STRING)

@channel.register_exchange(self)
Expand Down Expand Up @@ -171,6 +173,7 @@ def delete(opts = {})
# @api public
def bind(source, opts = {})
@channel.exchange_bind(source, self, opts)
@bindings.add(source: source, opts: opts)

self
end
Expand All @@ -191,6 +194,7 @@ def bind(source, opts = {})
# @api public
def unbind(source, opts = {})
@channel.exchange_unbind(source, self, opts)
@bindings.delete(source: source, opts: opts)

self
end
Expand All @@ -217,6 +221,10 @@ def wait_for_confirms
# @private
def recover_from_network_failure
declare! unless predefined?

@bindings.each do |b|
bind(b[:source], b[:opts])
end
end


Expand Down
50 changes: 36 additions & 14 deletions spec/higher_level_api/integration/connection_recovery_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,30 +189,31 @@
end
end

it "recovers exchange bindings" do
it "recovers exchanges and their bindings" do
with_open do |c|
ch = c.create_channel
source = ch.fanout("amq.fanout")
destination = ch.fanout("bunny.tests.recovery.fanout")
routing_key = ""
source = ch.fanout("source.exchange.recovery.example", auto_delete: true)
destination = ch.fanout("destination.exchange.recovery.example", auto_delete: true)

destination.bind(source)

# Exchanges won't get auto-deleted on connection loss unless they have
# had an exclusive queue bound to them.
dst_queue = ch.queue("", exclusive: true)
dst_queue.bind(destination, routing_key: "")

src_queue = ch.queue("", exclusive: true)
src_queue.bind(source, routing_key: "")

close_all_connections!
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
expect(ch).to be_open
wait_on_loss_and_recovery_of { exchange_names_in_vhost("/").include?(source.name) }

ch.confirm_select
q = ch.queue("", :exclusive => true)
q.bind(destination, :routing_key => routing_key)

source.publish("msg", :routing_key => routing_key)
source.publish("msg", routing_key: "")
ch.wait_for_confirms
sleep 0.5
expect(q.message_count).to eq 1
q.delete
expect(dst_queue.message_count).to eq 1
end
end

Expand Down Expand Up @@ -316,6 +317,10 @@
end
end

def exchange_names_in_vhost(vhost)
http_client.list_exchanges(vhost).map {|e| e["name"]}
end

def close_all_connections!
http_client.list_connections.each do |conn_info|
close_ignoring_permitted_exceptions(conn_info.name)
Expand All @@ -331,6 +336,23 @@ def wait_for_recovery
sleep 1.5
end

def wait_on_loss_and_recovery_of(&probe)
poll_while probe
poll_until probe
end

def poll_while(probe)
Timeout::timeout(30) {
sleep 0.1 while probe[]
}
end

def poll_until(probe)
Timeout::timeout(30) {
sleep 0.1 until probe[]
}
end

def with_open(c = Bunny.new(:network_recovery_interval => 0.2, :recover_from_connection_close => true), &block)
begin
c.start
Expand Down
39 changes: 39 additions & 0 deletions spec/lib/bunny/exchange_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
require_relative '../../../lib/bunny/channel'
require_relative '../../../lib/bunny/exchange'

module Bunny
describe Exchange do
context "recovery" do
it "recovers exchange bindings, unless already unbound" do
ch = instance_double(Bunny::Channel,
exchange_declare: nil,
register_exchange: nil)
src1 = Exchange.new(ch, "direct", "src1")
src2 = Exchange.new(ch, "direct", "src2")
src3 = Exchange.new(ch, "direct", "src3")
dst = Exchange.new(ch, "direct", "dst")

original_binds_count = 5
expected_rebinds_count = 3
expected_total_binds = original_binds_count + expected_rebinds_count
allow(ch).to receive(:exchange_bind).exactly(expected_total_binds).times

dst.bind(src1, routing_key: "abc")
dst.bind(src2, routing_key: "def")
dst.bind(src2, routing_key: "ghi")
dst.bind(src3, routing_key: "jkl")
dst.bind(src3, routing_key: "jkl", arguments: ["foo", "bar"])

allow(ch).to receive(:exchange_unbind).twice
dst.unbind(src2, routing_key: "def")
dst.unbind(src3, routing_key: "jkl", arguments: ["foo", "bar"])

expect(ch).to receive(:exchange_bind).with(src1, dst, routing_key: "abc")
expect(ch).to receive(:exchange_bind).with(src2, dst, routing_key: "ghi")
expect(ch).to receive(:exchange_bind).with(src3, dst, routing_key: "jkl")

dst.recover_from_network_failure
end
end
end
end

0 comments on commit ee31af6

Please sign in to comment.