Skip to content

Commit

Permalink
Make sure Bunny::Queue#pop wraps metadata into B::GetResponse and B::…
Browse files Browse the repository at this point in the history
…MessageProperties

Fixes #212.
  • Loading branch information
michaelklishin committed May 13, 2014
1 parent a820fd4 commit be34b2c
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 25 deletions.
83 changes: 83 additions & 0 deletions lib/bunny/get_response.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
require "bunny/versioned_delivery_tag"

module Bunny
# Wraps {AMQ::Protocol::Basic::GetOk} to
# provide access to the delivery properties as immutable hash as
# well as methods.
class GetResponse

#
# Behaviors
#

include Enumerable

#
# API
#

# @return [Bunny::Channel] Channel this basic.get-ok response is on
attr_reader :channel

# @private
def initialize(get_ok, consumer, channel)
@get_ok = get_ok
@hash = {
:delivery_tag => @get_ok.delivery_tag,
:redelivered => @get_ok.redelivered,
:exchange => @get_ok.exchange,
:routing_key => @get_ok.routing_key,
:channel => channel
}
@channel = channel
end

# Iterates over the delivery properties
# @see Enumerable#each
def each(*args, &block)
@hash.each(*args, &block)
end

# Accesses delivery properties by key
# @see Hash#[]
def [](k)
@hash[k]
end

# @return [Hash] Hash representation of this delivery info
def to_hash
@hash
end

# @private
def to_s
to_hash.to_s
end

# @private
def inspect
to_hash.inspect
end

# @return [String] Delivery identifier that is used to acknowledge, reject and nack deliveries
def delivery_tag
@get_ok.delivery_tag
end

# @return [Boolean] true if this delivery is a redelivery (the message was requeued at least once)
def redelivered
@get_ok.redelivered
end
alias redelivered? redelivered

# @return [String] Name of the exchange this message was published to
def exchange
@get_ok.exchange
end

# @return [String] Routing key this message was published with
def routing_key
@get_ok.routing_key
end
end
end
37 changes: 17 additions & 20 deletions lib/bunny/queue.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "bunny/compatibility"
require "bunny/get_response"

module Bunny
# Represents AMQP 0.9.1 queue.
Expand Down Expand Up @@ -229,33 +230,29 @@ def subscribe_with(consumer, opts = {:block => false})
# puts "This is the message: " + payload + "\n\n"
# conn.close
def pop(opts = {:ack => false}, &block)
delivery_info, properties, content = @channel.basic_get(@name, opts)
get_response, properties, content = @channel.basic_get(@name, opts)

if block
block.call(delivery_info, properties, content)
if properties
di = GetResponse.new(get_response, properties, @channel)
mp = MessageProperties.new(properties)

block.call(di, mp, content)
else
block.call(nil, nil, nil)
end
else
[delivery_info, properties, content]
if properties
di = GetResponse.new(get_response, properties, @channel)
mp = MessageProperties.new(properties)
[di, mp, content]
else
[nil, nil, nil]
end
end
end
alias get pop

# Version of {Bunny::Queue#pop} that returns data in legacy format
# (as a hash).
# @return [Hash]
# @deprecated
def pop_as_hash(opts = {:ack => false}, &block)
delivery_info, properties, content = @channel.basic_get(@name, opts)

result = {:header => properties, :payload => content, :delivery_details => delivery_info}

if block
block.call(result)
else
result
end
end


# Publishes a message to the queue via default exchange. Takes the same arguments
# as {Bunny::Exchange#publish}
#
Expand Down
18 changes: 13 additions & 5 deletions spec/higher_level_api/integration/basic_get_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
q = ch.queue("", :exclusive => true)
x = ch.default_exchange

x.publish("xyzzy", :routing_key => q.name)
msg = "xyzzy"
x.publish(msg, :routing_key => q.name)

sleep(0.5)
delivery_info, properties, content = q.pop
content.should == "xyzzy"
get_ok, properties, content = q.pop
expect(get_ok).to be_kind_of(Bunny::GetResponse)
expect(properties).to be_kind_of(Bunny::MessageProperties)
expect(properties.content_type).to eq("application/octet-stream")
expect(get_ok.routing_key).to eq(q.name)
expect(get_ok.delivery_tag).to be_kind_of(Bunny::VersionedDeliveryTag)
expect(content).to eq(msg)
q.message_count.should == 0

ch.close
Expand All @@ -38,8 +44,10 @@
q = ch.queue("", :exclusive => true)
q.purge

_, _, content = q.pop
content.should be_nil
get_empty, properties, content = q.pop
expect(get_empty).to eq(nil)
expect(properties).to eq(nil)
expect(content).to eq(nil)
q.message_count.should == 0

ch.close
Expand Down

0 comments on commit be34b2c

Please sign in to comment.