Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

READY: Add various socket timeouts #270

Merged
merged 11 commits into from
Jun 9, 2016
18 changes: 17 additions & 1 deletion lib/riak/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Client
HOST_REGEX = /^(?:(?:(?:[a-zA-Z\d](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.)*(?:[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.?|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\[(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?:(?:[a-fA-F\d]{1,4}:)*[a-fA-F\d]{1,4})?::(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))?)\])$/n

# Valid constructor options.
VALID_OPTIONS = [:nodes, :client_id, :protobuffs_backend, :authentication, :max_retries] | Node::VALID_OPTIONS
VALID_OPTIONS = [:nodes, :client_id, :protobuffs_backend, :authentication, :max_retries, :connect_timeout, :read_timeout, :write_timeout] | Node::VALID_OPTIONS

# Network errors.
NETWORK_ERRORS = [
Expand All @@ -48,6 +48,7 @@ class Client
Errno::ENETDOWN,
Errno::ENETRESET,
Errno::ENETUNREACH,
Errno::ETIMEDOUT,
SocketError,
SystemCallError,
Riak::ProtobuffsFailedHeader,
Expand Down Expand Up @@ -76,6 +77,15 @@ class Client
# @return [Integer] The maximum number of retries in case of NETWORK_ERRORS
attr_accessor :max_retries

# @return [Numeric] The connect timeout, in seconds
attr_reader :connect_timeout

# @return [Numeric] The read timeout, in seconds
attr_reader :read_timeout

# @return [Numeric] The write timeout, in seconds
attr_reader :write_timeout

# Creates a client connection to Riak
# @param [Hash] options configuration options for the client
# @option options [Array] :nodes A list of nodes this client connects to.
Expand All @@ -88,6 +98,9 @@ class Client
# @option options [Fixnum, String] :client_id (rand(MAX_CLIENT_ID)) The internal client ID used by Riak to route responses
# @option options [String, Symbol] :protobuffs_backend (:Beefcake) which Protocol Buffers backend to use
# @option options [Fixnum] :max_retries (2) The maximum number of retries in case of NETWORK_ERRORS
# @option options [Numeric] :connect_timeout (nil) The connect timeout, in seconds
# @option options [Numeric] :read_timeout (nil) The read timeout, in seconds
# @option options [Numeric] :write_timeout (nil) The write timeout, in seconds
# @raise [ArgumentError] raised if any invalid options are given
def initialize(options = {})
if options.include? :port
Expand All @@ -110,6 +123,9 @@ def initialize(options = {})
self.multiget_threads = options[:multiget_threads]
@authentication = options[:authentication] && options[:authentication].symbolize_keys
self.max_retries = options[:max_retries] || 2
@connect_timeout = options[:connect_timeout]
@read_timeout = options[:read_timeout]
@write_timeout = options[:write_timeout]
end

# Is security enabled?
Expand Down
33 changes: 30 additions & 3 deletions lib/riak/client/beefcake/protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ class Client
class BeefcakeProtobuffsBackend < ProtobuffsBackend
class Protocol
include Riak::Util::Translation
attr_reader :socket
attr_reader :socket, :read_timeout, :write_timeout

def initialize(socket)
# @param [Socket]
# @param [Hash] options
# @option options [Numeric] :read_timeout (nil) The read timeout, in seconds
# @option options [Numeric] :write_timeout (nil) The write timeout, in seconds
def initialize(socket, options = {})
@socket = socket
@read_timeout = options[:read_timeout]
@write_timeout = options[:write_timeout]
end

# Encodes and writes a Riak-formatted message, including protocol buffer
Expand All @@ -32,7 +38,25 @@ def write(code, message = nil)

payload = header + serialized

socket.write payload
if write_timeout
begin
loop do
bytes_written = socket.write_nonblock(payload)
# write_nonblock doesn't guarantee to write all data at once,
# so check if there are bytes left to be written
break if bytes_written >= payload.bytesize
payload.slice!(0, bytes_written)
end
rescue IO::WaitWritable, Errno::EINTR
# wait with the retry until socket is writable again
if !IO.select(nil, [socket], nil, write_timeout)
raise Errno::ETIMEDOUT, 'write timeout'
end
retry
end
else
socket.write(payload)
end
socket.flush
end

Expand All @@ -41,6 +65,9 @@ def write(code, message = nil)
#
# @return [Array<Symbol, String>]
def receive
if read_timeout && !IO.select([socket], nil, nil, read_timeout)
raise Errno::ETIMEDOUT, 'read timeout'
end
header = socket.read 5

raise ProtobuffsFailedHeader.new if header.nil?
Expand Down
14 changes: 8 additions & 6 deletions lib/riak/client/beefcake/socket.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'socket'
require 'openssl'
require 'cert_validator'
require 'riak/client/beefcake/messages'
Expand All @@ -13,21 +14,22 @@ class BeefcakeSocket
# Only create class methods, don't initialize
class << self
def new(host, port, options = {})
return start_tcp_socket(host, port) if options[:authentication].blank?
return start_tls_socket(host, port, options[:authentication])
return start_tcp_socket(host, port, options) if options[:authentication].blank?
return start_tls_socket(host, port, options)
end

private
def start_tcp_socket(host, port)
TCPSocket.new(host, port).tap do |sock|
def start_tcp_socket(host, port, options = {})
Socket.tcp(host, port, connect_timeout: options[:connect_timeout]).tap do |sock|
sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
end
end

def start_tls_socket(host, port, authentication)
def start_tls_socket(host, port, options)
authentication = options[:authentication]
raise Riak::UserConfigurationError.new if authentication[:username]

tcp = start_tcp_socket(host, port)
tcp = start_tcp_socket(host, port, options)
TlsInitiator.new(tcp, host, authentication).tls_socket
end

Expand Down
13 changes: 11 additions & 2 deletions lib/riak/client/beefcake_protobuffs_backend.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ def self.configured?
end

def protocol
p = Protocol.new socket
p = Protocol.new(
socket,
read_timeout: client.read_timeout,
write_timeout: client.write_timeout
)
in_request = false
result = nil
begin
Expand All @@ -45,7 +49,12 @@ def protocol
end

def new_socket
BeefcakeSocket.new @node.host, @node.pb_port, authentication: client.authentication
BeefcakeSocket.new(
@node.host,
@node.pb_port,
authentication: client.authentication,
connect_timeout: client.connect_timeout
)
end

def ping
Expand Down
45 changes: 45 additions & 0 deletions spec/integration/riak/protobuffs/timeouts_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
require 'spec_helper'

describe 'Protocol Buffers', test_client: true do
describe 'timeouts' do
it 'raises error on connect timeout' do
config = test_client_configuration.dup

# unroutable TEST-NET (https://tools.ietf.org/html/rfc5737)
config[:host] = '192.0.2.0'

config[:connect_timeout] = 0.0001
client = Riak::Client.new(config)

expect do
client.ping
end.to raise_error RuntimeError, /Operation timed out/
end

it 'raises error on read timeout' do
config = test_client_configuration.dup
config[:read_timeout] = 0.0001
client = Riak::Client.new(config)

expect do
client.ping
end.to raise_error RuntimeError, /Operation timed out/
end

it 'raises error on write timeout' do
config = test_client_configuration.dup
config[:write_timeout] = 0.0001
client = Riak::Client.new(config)

bucket = client.bucket('timeouts')
first = bucket.new 'first'
# write enough data to grow beyond socket buffer capacity
first.data = SecureRandom.urlsafe_base64(10_000_000)
first.content_type = 'text/plain'

expect do
first.store
end.to raise_error RuntimeError, /Operation timed out/
end
end
end
12 changes: 12 additions & 0 deletions spec/riak/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
expect(client.nodes.first.host).to eq("riak1.basho.com")
end

<<<<<<< HEAD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops 😉

it "maps port to unset nodes, and does not create localhost node" do
client = Riak::Client.new nodes: [
{host: 'riak1.basho.com'},
Expand All @@ -63,6 +64,17 @@
client = Riak::Client.new :max_retries => 42
expect(client.max_retries).to eq(42)
end

it "accepts timeouts" do
client = Riak::Client.new(
:connect_timeout => 1,
:read_timeout => 2,
:write_timeout => 3
)
expect(client.connect_timeout).to eq(1)
expect(client.read_timeout).to eq(2)
expect(client.write_timeout).to eq(3)
end
end

it "exposes a Stamp object" do
Expand Down