Skip to content

Commit

Permalink
Merge pull request #939 from ashleysommer/keepalive_timeout
Browse files Browse the repository at this point in the history
Split RequestTimeout, ResponseTimeout, and KeepAliveTimeout into different timeouts
  • Loading branch information
r0fls authored Oct 1, 2017
2 parents 2fb4697 + 8eb59ad commit 15fd490
Show file tree
Hide file tree
Showing 7 changed files with 585 additions and 44 deletions.
2 changes: 2 additions & 0 deletions sanic/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,8 @@ def _helper(self, host=None, port=None, debug=False,
'request_handler': self.handle_request,
'error_handler': self.error_handler,
'request_timeout': self.config.REQUEST_TIMEOUT,
'response_timeout': self.config.RESPONSE_TIMEOUT,
'keep_alive_timeout': self.config.KEEP_ALIVE_TIMEOUT,
'request_max_size': self.config.REQUEST_MAX_SIZE,
'keep_alive': self.config.KEEP_ALIVE,
'loop': loop,
Expand Down
8 changes: 8 additions & 0 deletions sanic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,15 @@ def __init__(self, defaults=None, load_env=True, keep_alive=True):
"""
self.REQUEST_MAX_SIZE = 100000000 # 100 megabytes
self.REQUEST_TIMEOUT = 60 # 60 seconds
self.RESPONSE_TIMEOUT = 60 # 60 seconds
self.KEEP_ALIVE = keep_alive
# Apache httpd server default keepalive timeout = 5 seconds
# Nginx server default keepalive timeout = 75 seconds
# Nginx performance tuning guidelines uses keepalive = 15 seconds
# IE client hard keepalive limit = 60 seconds
# Firefox client hard keepalive limit = 115 seconds

self.KEEP_ALIVE_TIMEOUT = 5 # 5 seconds
self.WEBSOCKET_MAX_SIZE = 2 ** 20 # 1 megabytes
self.WEBSOCKET_MAX_QUEUE = 32
self.GRACEFUL_SHUTDOWN_TIMEOUT = 15.0 # 15 sec
Expand Down
14 changes: 14 additions & 0 deletions sanic/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ class ServerError(SanicException):
pass


@add_status_code(503)
class ServiceUnavailable(SanicException):
"""The server is currently unavailable (because it is overloaded or
down for maintenance). Generally, this is a temporary state."""
pass


class URLBuildError(ServerError):
pass

Expand All @@ -170,6 +177,13 @@ def __init__(self, message, path, relative_url):

@add_status_code(408)
class RequestTimeout(SanicException):
"""The Web server (running the Web site) thinks that there has been too
long an interval of time between 1) the establishment of an IP
connection (socket) between the client and the server and
2) the receipt of any data on that socket, so the server has dropped
the connection. The socket connection has actually been lost - the Web
server has 'timed out' on that particular socket connection.
"""
pass


Expand Down
137 changes: 111 additions & 26 deletions sanic/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from sanic.response import HTTPResponse
from sanic.request import Request
from sanic.exceptions import (
RequestTimeout, PayloadTooLarge, InvalidUsage, ServerError)
RequestTimeout, PayloadTooLarge, InvalidUsage, ServerError,
ServiceUnavailable)

current_time = None

Expand Down Expand Up @@ -63,16 +64,19 @@ class HttpProtocol(asyncio.Protocol):
# request params
'parser', 'request', 'url', 'headers',
# request config
'request_handler', 'request_timeout', 'request_max_size',
'request_class', 'is_request_stream', 'router',
'request_handler', 'request_timeout', 'response_timeout',
'keep_alive_timeout', 'request_max_size', 'request_class',
'is_request_stream', 'router',
# enable or disable access log / error log purpose
'has_log',
# connection management
'_total_request_size', '_timeout_handler', '_last_communication_time',
'_is_stream_handler')
'_total_request_size', '_request_timeout_handler',
'_response_timeout_handler', '_keep_alive_timeout_handler',
'_last_request_time', '_last_response_time', '_is_stream_handler')

def __init__(self, *, loop, request_handler, error_handler,
signal=Signal(), connections=set(), request_timeout=60,
response_timeout=60, keep_alive_timeout=15,
request_max_size=None, request_class=None, has_log=True,
keep_alive=True, is_request_stream=False, router=None,
state=None, debug=False, **kwargs):
Expand All @@ -89,13 +93,18 @@ def __init__(self, *, loop, request_handler, error_handler,
self.request_handler = request_handler
self.error_handler = error_handler
self.request_timeout = request_timeout
self.response_timeout = response_timeout
self.keep_alive_timeout = keep_alive_timeout
self.request_max_size = request_max_size
self.request_class = request_class or Request
self.is_request_stream = is_request_stream
self._is_stream_handler = False
self._total_request_size = 0
self._timeout_handler = None
self._request_timeout_handler = None
self._response_timeout_handler = None
self._keep_alive_timeout_handler = None
self._last_request_time = None
self._last_response_time = None
self._request_handler_task = None
self._request_stream_task = None
self._keep_alive = keep_alive
Expand All @@ -118,22 +127,32 @@ def keep_alive(self):

def connection_made(self, transport):
self.connections.add(self)
self._timeout_handler = self.loop.call_later(
self.request_timeout, self.connection_timeout)
self._request_timeout_handler = self.loop.call_later(
self.request_timeout, self.request_timeout_callback)
self.transport = transport
self._last_request_time = current_time

def connection_lost(self, exc):
self.connections.discard(self)
self._timeout_handler.cancel()

def connection_timeout(self):
# Check if
if self._request_timeout_handler:
self._request_timeout_handler.cancel()
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
if self._keep_alive_timeout_handler:
self._keep_alive_timeout_handler.cancel()

def request_timeout_callback(self):
# See the docstring in the RequestTimeout exception, to see
# exactly what this timeout is checking for.
# Check if elapsed time since request initiated exceeds our
# configured maximum request timeout value
time_elapsed = current_time - self._last_request_time
if time_elapsed < self.request_timeout:
time_left = self.request_timeout - time_elapsed
self._timeout_handler = (
self.loop.call_later(time_left, self.connection_timeout))
self._request_timeout_handler = (
self.loop.call_later(time_left,
self.request_timeout_callback)
)
else:
if self._request_stream_task:
self._request_stream_task.cancel()
Expand All @@ -144,6 +163,36 @@ def connection_timeout(self):
except RequestTimeout as exception:
self.write_error(exception)

def response_timeout_callback(self):
# Check if elapsed time since response was initiated exceeds our
# configured maximum request timeout value
time_elapsed = current_time - self._last_request_time
if time_elapsed < self.response_timeout:
time_left = self.response_timeout - time_elapsed
self._response_timeout_handler = (
self.loop.call_later(time_left,
self.response_timeout_callback)
)
else:
try:
raise ServiceUnavailable('Response Timeout')
except ServiceUnavailable as exception:
self.write_error(exception)

def keep_alive_timeout_callback(self):
# Check if elapsed time since last response exceeds our configured
# maximum keep alive timeout value
time_elapsed = current_time - self._last_response_time
if time_elapsed < self.keep_alive_timeout:
time_left = self.keep_alive_timeout - time_elapsed
self._keep_alive_timeout_handler = (
self.loop.call_later(time_left,
self.keep_alive_timeout_callback)
)
else:
log.info('KeepAlive Timeout. Closing connection.')
self.transport.close()

# -------------------------------------------- #
# Parsing
# -------------------------------------------- #
Expand Down Expand Up @@ -206,6 +255,11 @@ def on_headers_complete(self):
method=self.parser.get_method().decode(),
transport=self.transport
)
# Remove any existing KeepAlive handler here,
# It will be recreated if required on the new request.
if self._keep_alive_timeout_handler:
self._keep_alive_timeout_handler.cancel()
self._keep_alive_timeout_handler = None
if self.is_request_stream:
self._is_stream_handler = self.router.is_stream_handler(
self.request)
Expand All @@ -221,6 +275,11 @@ def on_body(self, body):
self.request.body.append(body)

def on_message_complete(self):
# Entire request (headers and whole body) is received.
# We can cancel and remove the request timeout handler now.
if self._request_timeout_handler:
self._request_timeout_handler.cancel()
self._request_timeout_handler = None
if self.is_request_stream and self._is_stream_handler:
self._request_stream_task = self.loop.create_task(
self.request.stream.put(None))
Expand All @@ -229,6 +288,9 @@ def on_message_complete(self):
self.execute_request_handler()

def execute_request_handler(self):
self._response_timeout_handler = self.loop.call_later(
self.response_timeout, self.response_timeout_callback)
self._last_request_time = current_time
self._request_handler_task = self.loop.create_task(
self.request_handler(
self.request,
Expand All @@ -242,12 +304,15 @@ def write_response(self, response):
"""
Writes response content synchronously to the transport.
"""
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
self._response_timeout_handler = None
try:
keep_alive = self.keep_alive
self.transport.write(
response.output(
self.request.version, keep_alive,
self.request_timeout))
self.keep_alive_timeout))
if self.has_log:
netlog.info('', extra={
'status': response.status,
Expand Down Expand Up @@ -275,7 +340,10 @@ def write_response(self, response):
if not keep_alive:
self.transport.close()
else:
self._last_request_time = current_time
self._keep_alive_timeout_handler = self.loop.call_later(
self.keep_alive_timeout,
self.keep_alive_timeout_callback)
self._last_response_time = current_time
self.cleanup()

async def stream_response(self, response):
Expand All @@ -284,12 +352,14 @@ async def stream_response(self, response):
the transport to the response so the response consumer can
write to the response as needed.
"""

if self._response_timeout_handler:
self._response_timeout_handler.cancel()
self._response_timeout_handler = None
try:
keep_alive = self.keep_alive
response.transport = self.transport
await response.stream(
self.request.version, keep_alive, self.request_timeout)
self.request.version, keep_alive, self.keep_alive_timeout)
if self.has_log:
netlog.info('', extra={
'status': response.status,
Expand Down Expand Up @@ -317,10 +387,18 @@ async def stream_response(self, response):
if not keep_alive:
self.transport.close()
else:
self._last_request_time = current_time
self._keep_alive_timeout_handler = self.loop.call_later(
self.keep_alive_timeout,
self.keep_alive_timeout_callback)
self._last_response_time = current_time
self.cleanup()

def write_error(self, exception):
# An error _is_ a response.
# Don't throw a response timeout, when a response _is_ given.
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
self._response_timeout_handler = None
response = None
try:
response = self.error_handler.response(self.request, exception)
Expand All @@ -332,8 +410,9 @@ def write_error(self, exception):
self.request.ip if self.request else 'Unknown'))
except Exception as e:
self.bail_out(
"Writing error failed, connection closed {}".format(repr(e)),
from_error=True)
"Writing error failed, connection closed {}".format(
repr(e)), from_error=True
)
finally:
if self.has_log:
extra = dict()
Expand Down Expand Up @@ -369,6 +448,9 @@ def bail_out(self, message, from_error=False):
log.error(message)

def cleanup(self):
"""This is called when KeepAlive feature is used,
it resets the connection in order for it to be able
to handle receiving another request on the same connection."""
self.parser = None
self.request = None
self.url = None
Expand Down Expand Up @@ -423,12 +505,13 @@ def trigger_events(events, loop):

def serve(host, port, request_handler, error_handler, before_start=None,
after_start=None, before_stop=None, after_stop=None, debug=False,
request_timeout=60, ssl=None, sock=None, request_max_size=None,
reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100,
request_timeout=60, response_timeout=60, keep_alive_timeout=60,
ssl=None, sock=None, request_max_size=None, reuse_port=False,
loop=None, protocol=HttpProtocol, backlog=100,
register_sys_signals=True, run_async=False, connections=None,
signal=Signal(), request_class=None, has_log=True, keep_alive=True,
is_request_stream=False, router=None, websocket_max_size=None,
websocket_max_queue=None, state=None,
signal=Signal(), request_class=None, has_log=True,
keep_alive=True, is_request_stream=False, router=None,
websocket_max_size=None, websocket_max_queue=None, state=None,
graceful_shutdown_timeout=15.0):
"""Start asynchronous HTTP Server on an individual process.
Expand Down Expand Up @@ -476,6 +559,8 @@ def serve(host, port, request_handler, error_handler, before_start=None,
request_handler=request_handler,
error_handler=error_handler,
request_timeout=request_timeout,
response_timeout=response_timeout,
keep_alive_timeout=keep_alive_timeout,
request_max_size=request_max_size,
request_class=request_class,
has_log=has_log,
Expand Down
Loading

0 comments on commit 15fd490

Please sign in to comment.