Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

READY: Add various socket timeouts #270

Merged
merged 11 commits into from
Jun 9, 2016
Merged
2 changes: 1 addition & 1 deletion .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Metrics/AbcSize:
# Offense count: 33
# Configuration parameters: CountComments.
Metrics/ClassLength:
Max: 431
Max: 440

# Offense count: 54
Metrics/CyclomaticComplexity:
Expand Down
3 changes: 1 addition & 2 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ namespace :beefcake do

directory 'tmp/riak_pb' => 'tmp' do
cd 'tmp' do
# NB: change this once TS is published
sh "git clone -b end-to-end/timeseries https://github.com/basho/riak_pb.git"
sh "git clone -b 2.1.3.0 https://github.com/basho/riak_pb.git"
end
end
end
18 changes: 17 additions & 1 deletion lib/riak/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Client
HOST_REGEX = /^(?:(?:(?:[a-zA-Z\d](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.)*(?:[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.?|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\[(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?:(?:[a-fA-F\d]{1,4}:)*[a-fA-F\d]{1,4})?::(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))?)\])$/n

# Valid constructor options.
VALID_OPTIONS = [:nodes, :client_id, :protobuffs_backend, :authentication, :max_retries] | Node::VALID_OPTIONS
VALID_OPTIONS = [:nodes, :client_id, :protobuffs_backend, :authentication, :max_retries, :connect_timeout, :read_timeout, :write_timeout] | Node::VALID_OPTIONS

# Network errors.
NETWORK_ERRORS = [
Expand All @@ -48,6 +48,7 @@ class Client
Errno::ENETDOWN,
Errno::ENETRESET,
Errno::ENETUNREACH,
Errno::ETIMEDOUT,
SocketError,
SystemCallError,
Riak::ProtobuffsFailedHeader,
Expand Down Expand Up @@ -76,6 +77,15 @@ class Client
# @return [Integer] The maximum number of retries in case of NETWORK_ERRORS
attr_accessor :max_retries

# @return [Numeric] The connect timeout, in seconds
attr_reader :connect_timeout

# @return [Numeric] The read timeout, in seconds
attr_reader :read_timeout

# @return [Numeric] The write timeout, in seconds
attr_reader :write_timeout

# Creates a client connection to Riak
# @param [Hash] options configuration options for the client
# @option options [Array] :nodes A list of nodes this client connects to.
Expand All @@ -88,6 +98,9 @@ class Client
# @option options [Fixnum, String] :client_id (rand(MAX_CLIENT_ID)) The internal client ID used by Riak to route responses
# @option options [String, Symbol] :protobuffs_backend (:Beefcake) which Protocol Buffers backend to use
# @option options [Fixnum] :max_retries (2) The maximum number of retries in case of NETWORK_ERRORS
# @option options [Numeric] :connect_timeout (nil) The connect timeout, in seconds
# @option options [Numeric] :read_timeout (nil) The read timeout, in seconds
# @option options [Numeric] :write_timeout (nil) The write timeout, in seconds
# @raise [ArgumentError] raised if any invalid options are given
def initialize(options = {})
if options.include? :port
Expand All @@ -110,6 +123,9 @@ def initialize(options = {})
self.multiget_threads = options[:multiget_threads]
@authentication = options[:authentication] && options[:authentication].symbolize_keys
self.max_retries = options[:max_retries] || 2
@connect_timeout = options[:connect_timeout]
@read_timeout = options[:read_timeout]
@write_timeout = options[:write_timeout]
end

# Is security enabled?
Expand Down
33 changes: 30 additions & 3 deletions lib/riak/client/beefcake/protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ class Client
class BeefcakeProtobuffsBackend < ProtobuffsBackend
class Protocol
include Riak::Util::Translation
attr_reader :socket
attr_reader :socket, :read_timeout, :write_timeout

def initialize(socket)
# @param [Socket]
# @param [Hash] options
# @option options [Numeric] :read_timeout (nil) The read timeout, in seconds
# @option options [Numeric] :write_timeout (nil) The write timeout, in seconds
def initialize(socket, options = {})
@socket = socket
@read_timeout = options[:read_timeout]
@write_timeout = options[:write_timeout]
end

# Encodes and writes a Riak-formatted message, including protocol buffer
Expand All @@ -32,7 +38,25 @@ def write(code, message = nil)

payload = header + serialized

socket.write payload
if write_timeout
begin
loop do
bytes_written = socket.write_nonblock(payload)
# write_nonblock doesn't guarantee to write all data at once,
# so check if there are bytes left to be written
break if bytes_written >= payload.bytesize
payload.slice!(0, bytes_written)
end
rescue IO::WaitWritable, Errno::EINTR
# wait with the retry until socket is writable again
unless IO.select(nil, [socket], nil, write_timeout)
raise Errno::ETIMEDOUT, 'write timeout'
end
retry
end
else
socket.write(payload)
end
socket.flush
end

Expand All @@ -41,6 +65,9 @@ def write(code, message = nil)
#
# @return [Array<Symbol, String>]
def receive
if read_timeout and !IO.select([socket], nil, nil, read_timeout)
raise Errno::ETIMEDOUT, 'read timeout'
end
header = socket.read 5

raise ProtobuffsFailedHeader.new if header.nil?
Expand Down
14 changes: 8 additions & 6 deletions lib/riak/client/beefcake/socket.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'socket'
require 'openssl'
require 'cert_validator'
require 'riak/client/beefcake/messages'
Expand All @@ -13,21 +14,22 @@ class BeefcakeSocket
# Only create class methods, don't initialize
class << self
def new(host, port, options = {})
return start_tcp_socket(host, port) if options[:authentication].blank?
return start_tls_socket(host, port, options[:authentication])
return start_tcp_socket(host, port, options) if options[:authentication].blank?
return start_tls_socket(host, port, options)
end

private
def start_tcp_socket(host, port)
TCPSocket.new(host, port).tap do |sock|
def start_tcp_socket(host, port, options = {})
Socket.tcp(host, port, connect_timeout: options[:connect_timeout]).tap do |sock|
sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
end
end

def start_tls_socket(host, port, authentication)
def start_tls_socket(host, port, options)
authentication = options[:authentication]
raise Riak::UserConfigurationError.new if authentication[:username]

tcp = start_tcp_socket(host, port)
tcp = start_tcp_socket(host, port, options)
TlsInitiator.new(tcp, host, authentication).tls_socket
end

Expand Down
13 changes: 11 additions & 2 deletions lib/riak/client/beefcake_protobuffs_backend.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ def self.configured?
end

def protocol
p = Protocol.new socket
p = Protocol.new(
socket,
read_timeout: client.read_timeout,
write_timeout: client.write_timeout
)
in_request = false
result = nil
begin
Expand All @@ -45,7 +49,12 @@ def protocol
end

def new_socket
BeefcakeSocket.new @node.host, @node.pb_port, authentication: client.authentication
BeefcakeSocket.new(
@node.host,
@node.pb_port,
authentication: client.authentication,
connect_timeout: client.connect_timeout
)
end

def ping
Expand Down
175 changes: 175 additions & 0 deletions spec/integration/riak/protobuffs/timeouts_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
require 'socket'
require 'spec_helper'

require 'riak/client/beefcake/messages'
require 'riak/client/beefcake/protocol'

describe 'Protocol Buffers', test_client: true, integration: true do
describe 'timeouts' do
it 'raises error on connect timeout' do
# unroutable TEST-NET (https://tools.ietf.org/html/rfc5737)
config = {}
config[:host] = '192.0.2.0'
config[:pb_port] = 65535

config[:connect_timeout] = 0.001
client = Riak::Client.new(config)

expect do
client.ping
end.to raise_error RuntimeError, /timed out/
end

it 'raises error on read timeout' do
ok_to_continue = false
quitting = false
port = 0

server = nil
thr = Thread.new {
server = TCPServer.new port
port = server.addr[1]
ok_to_continue = true
loop do
begin
Thread.start(server.accept) do |s|
loop do
p = Riak::Client::BeefcakeProtobuffsBackend::Protocol.new s
begin
msgname, body = p.receive
rescue IOError => e
break if quitting
raise
end
case msgname
when :PingReq
sleep 0.5
p.write :PingResp
else
$stderr.puts("unknown msgname: #{msgname}")
end
end
end
rescue IOError => e
break if quitting
raise
end
end
}

loop do
break if ok_to_continue
sleep 0.1
end
ok_to_continue = false

config = {}
config[:pb_port] = port
config[:client_id] = port
config[:read_timeout] = 0.001
client = Riak::Client.new(config)

max_ping_attempts = 16
ping_count = 0
loop do
begin
client.ping
ping_count += 1
break if ping_count > max_ping_attempts
rescue RuntimeError => e
break if e.message =~ /timed out/
end
sleep 0.5
end

quitting = true
server.close
thr.join

ping_count > max_ping_attempts and fail 'did not see expected timeout!'
end

it 'raises error on write timeout' do
ok_to_continue = false
quitting = false
port = 0

server = nil
thr = Thread.new {
server = TCPServer.new port
port = server.addr[1]
ok_to_continue = true
put_count = 0
loop do
begin
Thread.start(server.accept) do |s|
loop do
p = Riak::Client::BeefcakeProtobuffsBackend::Protocol.new s
begin
msgname, body = p.receive
rescue IOError => e
break if quitting
raise
end
case msgname
when :PingReq
p.write :PingResp
when :GetServerInfoReq
r = Riak::Client::BeefcakeProtobuffsBackend::RpbGetServerInfoResp.new
r.node = '[email protected]'.force_encoding('BINARY')
r.server_version = '2.1.4'.force_encoding('BINARY')
p.write :GetServerInfoResp, r
when :PutReq
r = Riak::Client::BeefcakeProtobuffsBackend::RpbPutResp.new
p.write :PutResp, r
else
$stderr.puts("unknown msgname: #{msgname}")
end
end
end
rescue IOError => e
break if quitting
raise
end
end
}

loop do
break if ok_to_continue
sleep 0.1
end
ok_to_continue = false

config = {}
config[:pb_port] = port
config[:client_id] = port
config[:write_timeout] = 0.001
client = Riak::Client.new(config)

bucket = client.bucket('timeouts')

max_store_attempts = 16
store_count = 0
loop do
begin
obj = bucket.new "obj-#{store_count}"
# write enough data to grow beyond socket buffer capacity
obj.data = SecureRandom.urlsafe_base64(10_000_000)
obj.content_type = 'text/plain'
obj.store
store_count += 1
break if store_count > max_store_attempts
rescue RuntimeError => e
break if e.message =~ /timed out/
end
sleep 0.5
end

quitting = true
server.close
thr.join

store_count > max_store_attempts and fail 'did not see expected timeout!'
end
end
end
13 changes: 12 additions & 1 deletion spec/riak/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
expect(client.client_id).to eq("AAAAAA==")
end

it "creates a client ID if not specified" do
it "creates a client ID if not specified", :integration => true do
expect(Riak::Client.new(pb_port: test_client.nodes.first.pb_port).
client_id).to_not be_nil
end
Expand Down Expand Up @@ -63,6 +63,17 @@
client = Riak::Client.new :max_retries => 42
expect(client.max_retries).to eq(42)
end

it "accepts timeouts" do
client = Riak::Client.new(
:connect_timeout => 1,
:read_timeout => 2,
:write_timeout => 3
)
expect(client.connect_timeout).to eq(1)
expect(client.read_timeout).to eq(2)
expect(client.write_timeout).to eq(3)
end
end

it "exposes a Stamp object" do
Expand Down
Loading