Skip to content

Commit

Permalink
Don't perform timeout control for basic.publish, fixes #101
Browse files Browse the repository at this point in the history
This increases throughput roughly by 350%. Applications that
cannot afford lose a single message should be using publisher
confirms anyway.

We can also introduce an alternative publishing function
should the need arise.
  • Loading branch information
Michael Klishin committed Feb 14, 2013
1 parent 73632b9 commit da6f972
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 8 deletions.
17 changes: 9 additions & 8 deletions lib/bunny/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -509,14 +509,15 @@ def basic_publish(payload, exchange, routing_key, opts = {})
@next_publish_seq_no += 1
end

@connection.send_frameset(AMQ::Protocol::Basic::Publish.encode(@id,
payload,
meta,
exchange_name,
routing_key,
meta[:mandatory],
false,
@connection.frame_max), self)
m = AMQ::Protocol::Basic::Publish.encode(@id,
payload,
meta,
exchange_name,
routing_key,
meta[:mandatory],
false,
@connection.frame_max)
@connection.send_frameset_without_timeout(m, self)

self
end
Expand Down
16 changes: 16 additions & 0 deletions lib/bunny/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,22 @@ def send_frameset(frames, channel)
end
end # send_frameset(frames)

# Sends multiple frames, one by one. For thread safety this method takes a channel
# object and synchronizes on it. Uses transport implementation that does not perform
# timeout control.
#
# @api private
def send_frameset_without_timeout(frames, channel)
# some developers end up sharing channels between threads and when multiple
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
# locking. Note that "single frame" methods do not need this kind of synchronization. MK.
channel.synchronize do
frames.each { |frame| @transport.send_frame_without_timeout(frame) }
end
end # send_frameset_without_timeout(frames)

protected

# @api private
Expand Down
26 changes: 26 additions & 0 deletions lib/bunny/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ def write(*args)
end
alias send_raw write

# Writes data to the socket without timeout checks
def write_without_timeout(*args)
begin
raise Bunny::ConnectionError.new("No connection: socket is nil. ", @host, @port) if !@socket
@socket.write(*args)
rescue Errno::EPIPE, Errno::EAGAIN, Bunny::ClientTimeout, Bunny::ConnectionError, IOError => e
close

@session.handle_network_failure(e)
end
end

def close(reason = nil)
@socket.close if @socket and not @socket.closed?
@socket = nil
Expand Down Expand Up @@ -139,6 +151,20 @@ def send_frame(frame)
end
end

# Sends frame to the peer without timeout control.
#
# @raise [ConnectionClosedError]
# @private
def send_frame_without_timeout(frame)
if closed?
@session.handle_network_failure(ConnectionClosedError.new(frame))
else
frame.encode_to_array.each do |component|
write_without_timeout(component)
end
end
end


def self.reacheable?(host, port, timeout)
begin
Expand Down

0 comments on commit da6f972

Please sign in to comment.