From 2979e03148540d4d16e797ebb1999da58843d87d Mon Sep 17 00:00:00 2001 From: Ashley Sommer Date: Mon, 11 Sep 2017 17:17:33 +1000 Subject: [PATCH 1/4] WIP - Split RequestTimeout, ResponseTimout, and KeepAliveTimeout into different timeouts, with different callbacks. --- sanic/app.py | 2 + sanic/config.py | 8 ++ sanic/exceptions.py | 14 +++ sanic/server.py | 138 ++++++++++++++++++++++++------ tests/test_keep_alive_timeout.py | 142 +++++++++++++++++++++++++++++++ tests/test_request_timeout.py | 93 ++++++++++++++++---- tests/test_response_timeout.py | 38 +++++++++ 7 files changed, 392 insertions(+), 43 deletions(-) create mode 100644 tests/test_keep_alive_timeout.py create mode 100644 tests/test_response_timeout.py diff --git a/sanic/app.py b/sanic/app.py index 20c02a5c98..4a2ea01ca9 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -745,6 +745,8 @@ def _helper(self, host=None, port=None, debug=False, 'request_handler': self.handle_request, 'error_handler': self.error_handler, 'request_timeout': self.config.REQUEST_TIMEOUT, + 'response_timeout': self.config.RESPONSE_TIMEOUT, + 'keep_alive_timeout': self.config.KEEP_ALIVE_TIMEOUT, 'request_max_size': self.config.REQUEST_MAX_SIZE, 'keep_alive': self.config.KEEP_ALIVE, 'loop': loop, diff --git a/sanic/config.py b/sanic/config.py index 0c2cc701d2..560fa2ec10 100644 --- a/sanic/config.py +++ b/sanic/config.py @@ -125,7 +125,15 @@ def __init__(self, defaults=None, load_env=True, keep_alive=True): """ self.REQUEST_MAX_SIZE = 100000000 # 100 megabytes self.REQUEST_TIMEOUT = 60 # 60 seconds + self.RESPONSE_TIMEOUT = 60 # 60 seconds self.KEEP_ALIVE = keep_alive + # Apache httpd server default keepalive timeout = 5 seconds + # Nginx server default keepalive timeout = 75 seconds + # Nginx performance tuning guidelines uses keepalive timeout = 15 seconds + # IE client hard keepalive limit = 60 seconds + # Firefox client hard keepalive limit = 115 seconds + + self.KEEP_ALIVE_TIMEOUT = 5 # 5 seconds self.WEBSOCKET_MAX_SIZE = 2 ** 20 # 1 megabytes self.WEBSOCKET_MAX_QUEUE = 32 self.GRACEFUL_SHUTDOWN_TIMEOUT = 15.0 # 15 sec diff --git a/sanic/exceptions.py b/sanic/exceptions.py index 9663ea7cf5..e2d808f79e 100644 --- a/sanic/exceptions.py +++ b/sanic/exceptions.py @@ -155,6 +155,13 @@ class ServerError(SanicException): pass +@add_status_code(503) +class ServiceUnavailable(SanicException): + """The server is currently unavailable (because it is overloaded or + down for maintenance). Generally, this is a temporary state.""" + pass + + class URLBuildError(ServerError): pass @@ -170,6 +177,13 @@ def __init__(self, message, path, relative_url): @add_status_code(408) class RequestTimeout(SanicException): + """The Web server (running the Web site) thinks that there has been too + long an interval of time between 1) the establishment of an IP + connection (socket) between the client and the server and + 2) the receipt of any data on that socket, so the server has dropped + the connection. The socket connection has actually been lost - the Web + server has 'timed out' on that particular socket connection. + """ pass diff --git a/sanic/server.py b/sanic/server.py index f62ba654d6..bcef8a9107 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -28,7 +28,8 @@ from sanic.response import HTTPResponse from sanic.request import Request from sanic.exceptions import ( - RequestTimeout, PayloadTooLarge, InvalidUsage, ServerError) + RequestTimeout, PayloadTooLarge, InvalidUsage, ServerError, + ServiceUnavailable) current_time = None @@ -63,16 +64,19 @@ class HttpProtocol(asyncio.Protocol): # request params 'parser', 'request', 'url', 'headers', # request config - 'request_handler', 'request_timeout', 'request_max_size', - 'request_class', 'is_request_stream', 'router', + 'request_handler', 'request_timeout', 'response_timeout', + 'keep_alive_timeout', 'request_max_size', 'request_class', + 'is_request_stream', 'router', # enable or disable access log / error log purpose 'has_log', # connection management - '_total_request_size', '_timeout_handler', '_last_communication_time', - '_is_stream_handler') + '_total_request_size', '_request_timeout_handler', + '_response_timeout_handler', '_keep_alive_timeout_handler', + '_last_request_time', '_last_response_time', '_is_stream_handler') def __init__(self, *, loop, request_handler, error_handler, signal=Signal(), connections=set(), request_timeout=60, + response_timeout=60, keep_alive_timeout=15, request_max_size=None, request_class=None, has_log=True, keep_alive=True, is_request_stream=False, router=None, state=None, debug=False, **kwargs): @@ -89,13 +93,18 @@ def __init__(self, *, loop, request_handler, error_handler, self.request_handler = request_handler self.error_handler = error_handler self.request_timeout = request_timeout + self.response_timeout = response_timeout + self.keep_alive_timeout = keep_alive_timeout self.request_max_size = request_max_size self.request_class = request_class or Request self.is_request_stream = is_request_stream self._is_stream_handler = False self._total_request_size = 0 - self._timeout_handler = None + self._request_timeout_handler = None + self._response_timeout_handler = None + self._keep_alive_timeout_handler = None self._last_request_time = None + self._last_response_time = None self._request_handler_task = None self._request_stream_task = None self._keep_alive = keep_alive @@ -118,22 +127,32 @@ def keep_alive(self): def connection_made(self, transport): self.connections.add(self) - self._timeout_handler = self.loop.call_later( - self.request_timeout, self.connection_timeout) + self._request_timeout_handler = self.loop.call_later( + self.request_timeout, self.request_timeout_callback) self.transport = transport self._last_request_time = current_time def connection_lost(self, exc): self.connections.discard(self) - self._timeout_handler.cancel() - - def connection_timeout(self): - # Check if + if self._request_timeout_handler: + self._request_timeout_handler.cancel() + if self._response_timeout_handler: + self._response_timeout_handler.cancel() + if self._keep_alive_timeout_handler: + self._keep_alive_timeout_handler.cancel() + + def request_timeout_callback(self): + # See the docstring in the RequestTimeout exception, to see + # exactly what this timeout is checking for. + # Check if elapsed time since request initiated exceeds our + # configured maximum request timeout value time_elapsed = current_time - self._last_request_time if time_elapsed < self.request_timeout: time_left = self.request_timeout - time_elapsed - self._timeout_handler = ( - self.loop.call_later(time_left, self.connection_timeout)) + self._request_timeout_handler = ( + self.loop.call_later(time_left, + self.request_timeout_callback) + ) else: if self._request_stream_task: self._request_stream_task.cancel() @@ -144,6 +163,37 @@ def connection_timeout(self): except RequestTimeout as exception: self.write_error(exception) + def response_timeout_callback(self): + # Check if elapsed time since response was initiated exceeds our + # configured maximum request timeout value + time_elapsed = current_time - self._last_request_time + if time_elapsed < self.response_timeout: + time_left = self.response_timeout - time_elapsed + self._response_timeout_handler = ( + self.loop.call_later(time_left, + self.response_timeout_callback) + ) + else: + try: + raise ServiceUnavailable('Response Timeout') + except ServiceUnavailable as exception: + self.write_error(exception) + + def keep_alive_timeout_callback(self): + # Check if elapsed time since last response exceeds our configured + # maximum keep alive timeout value + time_elapsed = current_time - self._last_response_time + if time_elapsed < self.keep_alive_timeout: + time_left = self.keep_alive_timeout - time_elapsed + self._keep_alive_timeout_handler = ( + self.loop.call_later(time_left, + self.keep_alive_timeout_callback) + ) + else: + log.info('KeepAlive Timeout. Closing connection.') + self.transport.close() + + # -------------------------------------------- # # Parsing # -------------------------------------------- # @@ -204,6 +254,11 @@ def on_headers_complete(self): method=self.parser.get_method().decode(), transport=self.transport ) + # Remove any existing KeepAlive handler here, + # It will be recreated if required on the new request. + if self._keep_alive_timeout_handler: + self._keep_alive_timeout_handler.cancel() + self._keep_alive_timeout_handler = None if self.is_request_stream: self._is_stream_handler = self.router.is_stream_handler( self.request) @@ -219,6 +274,11 @@ def on_body(self, body): self.request.body.append(body) def on_message_complete(self): + # Entire request (headers and whole body) is received. + # We can cancel and remove the request timeout handler now. + if self._request_timeout_handler: + self._request_timeout_handler.cancel() + self._request_timeout_handler = None if self.is_request_stream and self._is_stream_handler: self._request_stream_task = self.loop.create_task( self.request.stream.put(None)) @@ -227,6 +287,9 @@ def on_message_complete(self): self.execute_request_handler() def execute_request_handler(self): + self._response_timeout_handler = self.loop.call_later( + self.response_timeout, self.response_timeout_callback) + self._last_request_time = current_time self._request_handler_task = self.loop.create_task( self.request_handler( self.request, @@ -240,12 +303,15 @@ def write_response(self, response): """ Writes response content synchronously to the transport. """ + if self._response_timeout_handler: + self._response_timeout_handler.cancel() + self._response_timeout_handler = None try: keep_alive = self.keep_alive self.transport.write( response.output( self.request.version, keep_alive, - self.request_timeout)) + self.keep_alive_timeout)) if self.has_log: netlog.info('', extra={ 'status': response.status, @@ -273,7 +339,10 @@ def write_response(self, response): if not keep_alive: self.transport.close() else: - self._last_request_time = current_time + self._keep_alive_timeout_handler = self.loop.call_later( + self.keep_alive_timeout, + self.keep_alive_timeout_callback) + self._last_response_time = current_time self.cleanup() async def stream_response(self, response): @@ -282,12 +351,14 @@ async def stream_response(self, response): the transport to the response so the response consumer can write to the response as needed. """ - + if self._response_timeout_handler: + self._response_timeout_handler.cancel() + self._response_timeout_handler = None try: keep_alive = self.keep_alive response.transport = self.transport await response.stream( - self.request.version, keep_alive, self.request_timeout) + self.request.version, keep_alive, self.keep_alive_timeout) if self.has_log: netlog.info('', extra={ 'status': response.status, @@ -315,10 +386,18 @@ async def stream_response(self, response): if not keep_alive: self.transport.close() else: - self._last_request_time = current_time + self._keep_alive_timeout_handler = self.loop.call_later( + self.keep_alive_timeout, + self.keep_alive_timeout_callback) + self._last_response_time = current_time self.cleanup() def write_error(self, exception): + # An error _is_ a response. + # Don't throw a response timeout, when a response _is_ given. + if self._response_timeout_handler: + self._response_timeout_handler.cancel() + self._response_timeout_handler = None response = None try: response = self.error_handler.response(self.request, exception) @@ -330,8 +409,9 @@ def write_error(self, exception): self.request.ip if self.request else 'Unknown')) except Exception as e: self.bail_out( - "Writing error failed, connection closed {}".format(repr(e)), - from_error=True) + "Writing error failed, connection closed {}".format( + repr(e)), from_error=True + ) finally: if self.has_log: extra = dict() @@ -367,6 +447,9 @@ def bail_out(self, message, from_error=False): log.error(message) def cleanup(self): + """This is called when KeepAlive feature is used, + it resets the connection in order for it to be able + to handle receiving another request on the same connection.""" self.parser = None self.request = None self.url = None @@ -421,12 +504,13 @@ def trigger_events(events, loop): def serve(host, port, request_handler, error_handler, before_start=None, after_start=None, before_stop=None, after_stop=None, debug=False, - request_timeout=60, ssl=None, sock=None, request_max_size=None, - reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100, + request_timeout=60, response_timeout=60, keep_alive_timeout=60, + ssl=None, sock=None, request_max_size=None, reuse_port=False, + loop=None, protocol=HttpProtocol, backlog=100, register_sys_signals=True, run_async=False, connections=None, - signal=Signal(), request_class=None, has_log=True, keep_alive=True, - is_request_stream=False, router=None, websocket_max_size=None, - websocket_max_queue=None, state=None, + signal=Signal(), request_class=None, has_log=True, + keep_alive=True, is_request_stream=False, router=None, + websocket_max_size=None, websocket_max_queue=None, state=None, graceful_shutdown_timeout=15.0): """Start asynchronous HTTP Server on an individual process. @@ -474,6 +558,8 @@ def serve(host, port, request_handler, error_handler, before_start=None, request_handler=request_handler, error_handler=error_handler, request_timeout=request_timeout, + response_timeout=response_timeout, + keep_alive_timeout=keep_alive_timeout, request_max_size=request_max_size, request_class=request_class, has_log=has_log, diff --git a/tests/test_keep_alive_timeout.py b/tests/test_keep_alive_timeout.py new file mode 100644 index 0000000000..2803014492 --- /dev/null +++ b/tests/test_keep_alive_timeout.py @@ -0,0 +1,142 @@ +from json import JSONDecodeError +from sanic import Sanic +from time import sleep as sync_sleep +import asyncio +from sanic.response import text +from sanic.config import Config +import aiohttp +from aiohttp import TCPConnector +from sanic.testing import SanicTestClient, HOST, PORT + + +class ReuseableTCPConnector(TCPConnector): + def __init__(self, *args, **kwargs): + super(ReuseableTCPConnector, self).__init__(*args, **kwargs) + self.conn = None + + @asyncio.coroutine + def connect(self, req): + if self.conn: + return self.conn + conn = yield from super(ReuseableTCPConnector, self).connect(req) + self.conn = conn + return conn + + def close(self): + return super(ReuseableTCPConnector, self).close() + + +class ReuseableSanicTestClient(SanicTestClient): + def __init__(self, app): + super(ReuseableSanicTestClient, self).__init__(app) + self._tcp_connector = None + self._session = None + + def _sanic_endpoint_test( + self, method='get', uri='/', gather_request=True, + debug=False, server_kwargs={}, + *request_args, **request_kwargs): + results = [None, None] + exceptions = [] + + if gather_request: + def _collect_request(request): + if results[0] is None: + results[0] = request + + self.app.request_middleware.appendleft(_collect_request) + + @self.app.listener('after_server_start') + async def _collect_response(sanic, loop): + try: + response = await self._local_request( + method, uri, *request_args, + **request_kwargs) + results[-1] = response + except Exception as e: + log.error( + 'Exception:\n{}'.format(traceback.format_exc())) + exceptions.append(e) + self.app.stop() + + server = self.app.create_server(host=HOST, debug=debug, port=PORT, **server_kwargs) + self.app.listeners['after_server_start'].pop() + + if exceptions: + raise ValueError( + "Exception during request: {}".format(exceptions)) + + if gather_request: + try: + request, response = results + return request, response + except: + raise ValueError( + "Request and response object expected, got ({})".format( + results)) + else: + try: + return results[-1] + except: + raise ValueError( + "Request object expected, got ({})".format(results)) + + async def _local_request(self, method, uri, cookies=None, *args, + **kwargs): + if uri.startswith(('http:', 'https:', 'ftp:', 'ftps://' '//')): + url = uri + else: + url = 'http://{host}:{port}{uri}'.format( + host=HOST, port=PORT, uri=uri) + if self._session: + session = self._session + else: + if self._tcp_connector: + conn = self._tcp_connector + else: + conn = ReuseableTCPConnector(verify_ssl=False) + self._tcp_connector = conn + session = aiohttp.ClientSession(cookies=cookies, + connector=conn) + self._session = session + + async with getattr(session, method.lower())( + url, *args, **kwargs) as response: + try: + response.text = await response.text() + except UnicodeDecodeError: + response.text = None + + try: + response.json = await response.json() + except (JSONDecodeError, + UnicodeDecodeError, + aiohttp.ClientResponseError): + response.json = None + + response.body = await response.read() + return response + + +Config.KEEP_ALIVE_TIMEOUT = 30 +Config.KEEP_ALIVE = True +keep_alive_timeout_app = Sanic('test_request_timeout') + + +@keep_alive_timeout_app.route('/1') +async def handler(request): + return text('OK') + + +def test_keep_alive_timeout(): + client = ReuseableSanicTestClient(keep_alive_timeout_app) + headers = { + 'Connection': 'keep-alive' + } + request, response = client.get('/1', headers=headers) + assert response.status == 200 + #sync_sleep(2) + request, response = client.get('/1') + assert response.status == 200 + + diff --git a/tests/test_request_timeout.py b/tests/test_request_timeout.py index 404aec123f..e6c1f657f2 100644 --- a/tests/test_request_timeout.py +++ b/tests/test_request_timeout.py @@ -1,38 +1,97 @@ +from json import JSONDecodeError from sanic import Sanic import asyncio from sanic.response import text from sanic.exceptions import RequestTimeout from sanic.config import Config +import aiohttp +from aiohttp import TCPConnector +from sanic.testing import SanicTestClient, HOST, PORT -Config.REQUEST_TIMEOUT = 1 -request_timeout_app = Sanic('test_request_timeout') -request_timeout_default_app = Sanic('test_request_timeout_default') +class DelayableTCPConnector(TCPConnector): + class DelayableHttpRequest(object): + def __new__(cls, req, delay): + cls = super(DelayableTCPConnector.DelayableHttpRequest, cls).\ + __new__(cls) + cls.req = req + cls.delay = delay + return cls -@request_timeout_app.route('/1') -async def handler_1(request): - await asyncio.sleep(2) - return text('OK') + def __getattr__(self, item): + return getattr(self.req, item) + def send(self, *args, **kwargs): + if self.delay and self.delay > 0: + _ = yield from asyncio.sleep(self.delay) + self.req.send(*args, **kwargs) -@request_timeout_app.exception(RequestTimeout) -def handler_exception(request, exception): - return text('Request Timeout from error_handler.', 408) + def __init__(self, *args, **kwargs): + _post_connect_delay = kwargs.pop('post_connect_delay', 0) + _pre_request_delay = kwargs.pop('pre_request_delay', 0) + super(DelayableTCPConnector, self).__init__(*args, **kwargs) + self._post_connect_delay = _post_connect_delay + self._pre_request_delay = _pre_request_delay + @asyncio.coroutine + def connect(self, req): + req = DelayableTCPConnector.\ + DelayableHttpRequest(req, self._pre_request_delay) + conn = yield from super(DelayableTCPConnector, self).connect(req) + if self._post_connect_delay and self._post_connect_delay > 0: + _ = yield from asyncio.sleep(self._post_connect_delay) + return conn -def test_server_error_request_timeout(): - request, response = request_timeout_app.test_client.get('/1') - assert response.status == 408 - assert response.text == 'Request Timeout from error_handler.' + +class DelayableSanicTestClient(SanicTestClient): + def __init__(self, app, request_delay=1): + super(DelayableSanicTestClient, self).__init__(app) + self._request_delay = request_delay + + async def _local_request(self, method, uri, cookies=None, *args, + **kwargs): + if uri.startswith(('http:', 'https:', 'ftp:', 'ftps://' '//')): + url = uri + else: + url = 'http://{host}:{port}{uri}'.format( + host=HOST, port=PORT, uri=uri) + + conn = DelayableTCPConnector(pre_request_delay=self._request_delay, + verify_ssl=False) + async with aiohttp.ClientSession( + cookies=cookies, connector=conn) as session: + # Insert a delay after creating the connection + # But before sending the request. + + async with getattr(session, method.lower())( + url, *args, **kwargs) as response: + try: + response.text = await response.text() + except UnicodeDecodeError: + response.text = None + + try: + response.json = await response.json() + except (JSONDecodeError, + UnicodeDecodeError, + aiohttp.ClientResponseError): + response.json = None + + response.body = await response.read() + return response + + +Config.REQUEST_TIMEOUT = 1 +request_timeout_default_app = Sanic('test_request_timeout_default') @request_timeout_default_app.route('/1') -async def handler_2(request): - await asyncio.sleep(2) +async def handler(request): return text('OK') def test_default_server_error_request_timeout(): - request, response = request_timeout_default_app.test_client.get('/1') + client = DelayableSanicTestClient(request_timeout_default_app, 2) + request, response = client.get('/1') assert response.status == 408 assert response.text == 'Error: Request Timeout' diff --git a/tests/test_response_timeout.py b/tests/test_response_timeout.py new file mode 100644 index 0000000000..bf55a42ecc --- /dev/null +++ b/tests/test_response_timeout.py @@ -0,0 +1,38 @@ +from sanic import Sanic +import asyncio +from sanic.response import text +from sanic.exceptions import ServiceUnavailable +from sanic.config import Config + +Config.RESPONSE_TIMEOUT = 1 +response_timeout_app = Sanic('test_response_timeout') +response_timeout_default_app = Sanic('test_response_timeout_default') + + +@response_timeout_app.route('/1') +async def handler_1(request): + await asyncio.sleep(2) + return text('OK') + + +@response_timeout_app.exception(ServiceUnavailable) +def handler_exception(request, exception): + return text('Response Timeout from error_handler.', 503) + + +def test_server_error_response_timeout(): + request, response = response_timeout_app.test_client.get('/1') + assert response.status == 503 + assert response.text == 'Response Timeout from error_handler.' + + +@response_timeout_default_app.route('/1') +async def handler_2(request): + await asyncio.sleep(2) + return text('OK') + + +def test_default_server_error_response_timeout(): + request, response = response_timeout_default_app.test_client.get('/1') + assert response.status == 503 + assert response.text == 'Error: Response Timeout' From 1a74accd65cfe6a1c52d3697ab4552285fcd95c9 Mon Sep 17 00:00:00 2001 From: Ashley Sommer Date: Tue, 12 Sep 2017 13:09:42 +1000 Subject: [PATCH 2/4] finished the keepalive_timeout tests --- tests/test_keep_alive_timeout.py | 164 ++++++++++++++++++++++++++----- 1 file changed, 138 insertions(+), 26 deletions(-) diff --git a/tests/test_keep_alive_timeout.py b/tests/test_keep_alive_timeout.py index 2803014492..12e1629dfb 100644 --- a/tests/test_keep_alive_timeout.py +++ b/tests/test_keep_alive_timeout.py @@ -4,6 +4,7 @@ import asyncio from sanic.response import text from sanic.config import Config +from sanic import server import aiohttp from aiohttp import TCPConnector from sanic.testing import SanicTestClient, HOST, PORT @@ -12,33 +13,40 @@ class ReuseableTCPConnector(TCPConnector): def __init__(self, *args, **kwargs): super(ReuseableTCPConnector, self).__init__(*args, **kwargs) - self.conn = None + self.old_proto = None @asyncio.coroutine def connect(self, req): - if self.conn: - return self.conn - conn = yield from super(ReuseableTCPConnector, self).connect(req) - self.conn = conn - return conn - - def close(self): - return super(ReuseableTCPConnector, self).close() + new_conn = yield from super(ReuseableTCPConnector, self)\ + .connect(req) + if self.old_proto is not None: + if self.old_proto != new_conn.protocol: + raise RuntimeError( + "We got a new connection, wanted the same one!") + self.old_proto = new_conn.protocol + return new_conn class ReuseableSanicTestClient(SanicTestClient): - def __init__(self, app): + def __init__(self, app, loop=None): super(ReuseableSanicTestClient, self).__init__(app) + if loop is None: + loop = asyncio.get_event_loop() + self._loop = loop + self._server = None self._tcp_connector = None self._session = None + # Copied from SanicTestClient, but with some changes to reuse the + # same loop for the same app. def _sanic_endpoint_test( self, method='get', uri='/', gather_request=True, debug=False, server_kwargs={}, *request_args, **request_kwargs): + loop = self._loop results = [None, None] exceptions = [] - + do_kill_server = request_kwargs.pop('end_server', False) if gather_request: def _collect_request(request): if results[0] is None: @@ -47,26 +55,53 @@ def _collect_request(request): self.app.request_middleware.appendleft(_collect_request) @self.app.listener('after_server_start') - async def _collect_response(sanic, loop): + async def _collect_response(loop): try: + if do_kill_server: + request_kwargs['end_session'] = True response = await self._local_request( method, uri, *request_args, **request_kwargs) results[-1] = response except Exception as e: - log.error( - 'Exception:\n{}'.format(traceback.format_exc())) + import traceback + traceback.print_tb(e.__traceback__) exceptions.append(e) - self.app.stop() + #Don't stop here! self.app.stop() + + if self._server is not None: + _server = self._server + else: + _server_co = self.app.create_server(host=HOST, debug=debug, + port=PORT, **server_kwargs) + + server.trigger_events( + self.app.listeners['before_server_start'], loop) - server = self.app.create_server(host=HOST, debug=debug, port=PORT, **server_kwargs) + try: + loop._stopping = False + http_server = loop.run_until_complete(_server_co) + except Exception as e: + raise e + self._server = _server = http_server + server.trigger_events( + self.app.listeners['after_server_start'], loop) self.app.listeners['after_server_start'].pop() + if do_kill_server: + try: + _server.close() + self._server = None + loop.run_until_complete(_server.wait_closed()) + self.app.stop() + except Exception as e: + exceptions.append(e) if exceptions: raise ValueError( "Exception during request: {}".format(exceptions)) if gather_request: + self.app.request_middleware.pop() try: request, response = results return request, response @@ -81,20 +116,29 @@ async def _collect_response(sanic, loop): raise ValueError( "Request object expected, got ({})".format(results)) + # Copied from SanicTestClient, but with some changes to reuse the + # same TCPConnection and the sane ClientSession more than once. + # Note, you cannot use the same session if you are in a _different_ + # loop, so the changes above are required too. async def _local_request(self, method, uri, cookies=None, *args, **kwargs): + request_keepalive = kwargs.pop('request_keepalive', + Config.KEEP_ALIVE_TIMEOUT) if uri.startswith(('http:', 'https:', 'ftp:', 'ftps://' '//')): url = uri else: url = 'http://{host}:{port}{uri}'.format( host=HOST, port=PORT, uri=uri) + do_kill_session = kwargs.pop('end_session', False) if self._session: session = self._session else: if self._tcp_connector: conn = self._tcp_connector else: - conn = ReuseableTCPConnector(verify_ssl=False) + conn = ReuseableTCPConnector(verify_ssl=False, + keepalive_timeout= + request_keepalive) self._tcp_connector = conn session = aiohttp.ClientSession(cookies=cookies, connector=conn) @@ -115,28 +159,96 @@ async def _local_request(self, method, uri, cookies=None, *args, response.json = None response.body = await response.read() - return response + if do_kill_session: + session.close() + self._session = None + return response -Config.KEEP_ALIVE_TIMEOUT = 30 +Config.KEEP_ALIVE_TIMEOUT = 2 Config.KEEP_ALIVE = True -keep_alive_timeout_app = Sanic('test_request_timeout') +keep_alive_timeout_app_reuse = Sanic('test_ka_timeout_reuse') +keep_alive_app_client_timeout = Sanic('test_ka_client_timeout') +keep_alive_app_server_timeout = Sanic('test_ka_server_timeout') + + +@keep_alive_timeout_app_reuse.route('/1') +async def handler1(request): + return text('OK') -@keep_alive_timeout_app.route('/1') -async def handler(request): +@keep_alive_app_client_timeout.route('/1') +async def handler2(request): return text('OK') -def test_keep_alive_timeout(): - client = ReuseableSanicTestClient(keep_alive_timeout_app) +@keep_alive_app_server_timeout.route('/1') +async def handler3(request): + return text('OK') + + +def test_keep_alive_timeout_reuse(): + """If the server keep-alive timeout and client keep-alive timeout are + both longer than the delay, the client _and_ server will successfully + reuse the existing connection.""" + loop = asyncio.get_event_loop() + client = ReuseableSanicTestClient(keep_alive_timeout_app_reuse, loop) headers = { 'Connection': 'keep-alive' } request, response = client.get('/1', headers=headers) assert response.status == 200 - #sync_sleep(2) - request, response = client.get('/1') + assert response.text == 'OK' + sync_sleep(1) + request, response = client.get('/1', end_server=True) assert response.status == 200 + assert response.text == 'OK' +def test_keep_alive_client_timeout(): + """If the server keep-alive timeout is longer than the client + keep-alive timeout, client will try to create a new connection here.""" + loop = asyncio.get_event_loop() + client = ReuseableSanicTestClient(keep_alive_app_client_timeout, + loop) + headers = { + 'Connection': 'keep-alive' + } + request, response = client.get('/1', headers=headers, + request_keepalive=1) + assert response.status == 200 + assert response.text == 'OK' + sync_sleep(3) + exception = None + try: + request, response = client.get('/1', end_server=True) + except ValueError as e: + exception = e + assert exception is not None + assert isinstance(exception, ValueError) + assert "got a new connection" in exception.args[0] + + +def test_keep_alive_server_timeout(): + """If the client keep-alive timeout is longer than the server + keep-alive timeout, the client will get a 'Connection reset' error.""" + loop = asyncio.get_event_loop() + client = ReuseableSanicTestClient(keep_alive_app_server_timeout, + loop) + headers = { + 'Connection': 'keep-alive' + } + request, response = client.get('/1', headers=headers, + request_keepalive=5) + assert response.status == 200 + assert response.text == 'OK' + sync_sleep(3) + exception = None + try: + request, response = client.get('/1', end_server=True) + except ValueError as e: + exception = e + assert exception is not None + assert isinstance(exception, ValueError) + assert "Connection reset" in exception.args[0] + From 173f94216a797041b54a8c4a84ca8e530dbdda4b Mon Sep 17 00:00:00 2001 From: Ashley Sommer Date: Tue, 12 Sep 2017 13:40:43 +1000 Subject: [PATCH 3/4] Fixed the delays, and expected responses, in the keepalive_timeout tests --- tests/test_keep_alive_timeout.py | 50 ++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/tests/test_keep_alive_timeout.py b/tests/test_keep_alive_timeout.py index 12e1629dfb..09c51d00ea 100644 --- a/tests/test_keep_alive_timeout.py +++ b/tests/test_keep_alive_timeout.py @@ -1,7 +1,7 @@ from json import JSONDecodeError from sanic import Sanic -from time import sleep as sync_sleep import asyncio +from asyncio import sleep as aio_sleep from sanic.response import text from sanic.config import Config from sanic import server @@ -63,10 +63,8 @@ async def _collect_response(loop): method, uri, *request_args, **request_kwargs) results[-1] = response - except Exception as e: - import traceback - traceback.print_tb(e.__traceback__) - exceptions.append(e) + except Exception as e2: + exceptions.append(e2) #Don't stop here! self.app.stop() if self._server is not None: @@ -81,8 +79,8 @@ async def _collect_response(loop): try: loop._stopping = False http_server = loop.run_until_complete(_server_co) - except Exception as e: - raise e + except Exception as e1: + raise e1 self._server = _server = http_server server.trigger_events( self.app.listeners['after_server_start'], loop) @@ -94,8 +92,8 @@ async def _collect_response(loop): self._server = None loop.run_until_complete(_server.wait_closed()) self.app.stop() - except Exception as e: - exceptions.append(e) + except Exception as e3: + exceptions.append(e3) if exceptions: raise ValueError( "Exception during request: {}".format(exceptions)) @@ -137,11 +135,13 @@ async def _local_request(self, method, uri, cookies=None, *args, conn = self._tcp_connector else: conn = ReuseableTCPConnector(verify_ssl=False, + loop=self._loop, keepalive_timeout= request_keepalive) self._tcp_connector = conn session = aiohttp.ClientSession(cookies=cookies, - connector=conn) + connector=conn, + loop=self._loop) self._session = session async with getattr(session, method.lower())( @@ -191,7 +191,8 @@ def test_keep_alive_timeout_reuse(): """If the server keep-alive timeout and client keep-alive timeout are both longer than the delay, the client _and_ server will successfully reuse the existing connection.""" - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) client = ReuseableSanicTestClient(keep_alive_timeout_app_reuse, loop) headers = { 'Connection': 'keep-alive' @@ -199,7 +200,7 @@ def test_keep_alive_timeout_reuse(): request, response = client.get('/1', headers=headers) assert response.status == 200 assert response.text == 'OK' - sync_sleep(1) + loop.run_until_complete(aio_sleep(1)) request, response = client.get('/1', end_server=True) assert response.status == 200 assert response.text == 'OK' @@ -208,7 +209,8 @@ def test_keep_alive_timeout_reuse(): def test_keep_alive_client_timeout(): """If the server keep-alive timeout is longer than the client keep-alive timeout, client will try to create a new connection here.""" - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) client = ReuseableSanicTestClient(keep_alive_app_client_timeout, loop) headers = { @@ -218,10 +220,11 @@ def test_keep_alive_client_timeout(): request_keepalive=1) assert response.status == 200 assert response.text == 'OK' - sync_sleep(3) + loop.run_until_complete(aio_sleep(2)) exception = None try: - request, response = client.get('/1', end_server=True) + request, response = client.get('/1', end_server=True, + request_keepalive=1) except ValueError as e: exception = e assert exception is not None @@ -231,24 +234,29 @@ def test_keep_alive_client_timeout(): def test_keep_alive_server_timeout(): """If the client keep-alive timeout is longer than the server - keep-alive timeout, the client will get a 'Connection reset' error.""" - loop = asyncio.get_event_loop() + keep-alive timeout, the client will either a 'Connection reset' error + _or_ a new connection. Depending on how the event-loop handles the + broken server connection.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) client = ReuseableSanicTestClient(keep_alive_app_server_timeout, loop) headers = { 'Connection': 'keep-alive' } request, response = client.get('/1', headers=headers, - request_keepalive=5) + request_keepalive=60) assert response.status == 200 assert response.text == 'OK' - sync_sleep(3) + loop.run_until_complete(aio_sleep(3)) exception = None try: - request, response = client.get('/1', end_server=True) + request, response = client.get('/1', request_keepalive=60, + end_server=True) except ValueError as e: exception = e assert exception is not None assert isinstance(exception, ValueError) - assert "Connection reset" in exception.args[0] + assert "Connection reset" in exception.args[0] or \ + "got a new connection" in exception.args[0] From 8eb59ad4dc27261351ea9d1eb7f645ee4f0a9a40 Mon Sep 17 00:00:00 2001 From: Ashley Sommer Date: Wed, 13 Sep 2017 10:18:36 +1000 Subject: [PATCH 4/4] Fixed error where the RequestTimeout test wasn't actually testing the correct behaviour Fixed error where KeepAliveTimeout wasn't being triggered in the test suite, when using uvloop Fixed test cases when using other asyncio loops such as uvloop Fixed Flake8 linting errors --- sanic/config.py | 2 +- sanic/server.py | 1 - tests/test_keep_alive_timeout.py | 13 +++- tests/test_request_timeout.py | 102 +++++++++++++++++++++++++------ 4 files changed, 95 insertions(+), 23 deletions(-) diff --git a/sanic/config.py b/sanic/config.py index 560fa2ec10..de91280f88 100644 --- a/sanic/config.py +++ b/sanic/config.py @@ -129,7 +129,7 @@ def __init__(self, defaults=None, load_env=True, keep_alive=True): self.KEEP_ALIVE = keep_alive # Apache httpd server default keepalive timeout = 5 seconds # Nginx server default keepalive timeout = 75 seconds - # Nginx performance tuning guidelines uses keepalive timeout = 15 seconds + # Nginx performance tuning guidelines uses keepalive = 15 seconds # IE client hard keepalive limit = 60 seconds # Firefox client hard keepalive limit = 115 seconds diff --git a/sanic/server.py b/sanic/server.py index bcef8a9107..eb9864cd45 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -193,7 +193,6 @@ def keep_alive_timeout_callback(self): log.info('KeepAlive Timeout. Closing connection.') self.transport.close() - # -------------------------------------------- # # Parsing # -------------------------------------------- # diff --git a/tests/test_keep_alive_timeout.py b/tests/test_keep_alive_timeout.py index 09c51d00ea..15f6d705a0 100644 --- a/tests/test_keep_alive_timeout.py +++ b/tests/test_keep_alive_timeout.py @@ -20,10 +20,11 @@ def connect(self, req): new_conn = yield from super(ReuseableTCPConnector, self)\ .connect(req) if self.old_proto is not None: - if self.old_proto != new_conn.protocol: + if self.old_proto != new_conn._protocol: raise RuntimeError( "We got a new connection, wanted the same one!") - self.old_proto = new_conn.protocol + print(new_conn.__dict__) + self.old_proto = new_conn._protocol return new_conn @@ -64,6 +65,8 @@ async def _collect_response(loop): **request_kwargs) results[-1] = response except Exception as e2: + import traceback + traceback.print_tb(e2.__traceback__) exceptions.append(e2) #Don't stop here! self.app.stop() @@ -80,6 +83,8 @@ async def _collect_response(loop): loop._stopping = False http_server = loop.run_until_complete(_server_co) except Exception as e1: + import traceback + traceback.print_tb(e1.__traceback__) raise e1 self._server = _server = http_server server.trigger_events( @@ -93,7 +98,9 @@ async def _collect_response(loop): loop.run_until_complete(_server.wait_closed()) self.app.stop() except Exception as e3: - exceptions.append(e3) + import traceback + traceback.print_tb(e3.__traceback__) + exceptions.append(e3) if exceptions: raise ValueError( "Exception during request: {}".format(exceptions)) diff --git a/tests/test_request_timeout.py b/tests/test_request_timeout.py index e6c1f657f2..a1d8a885e1 100644 --- a/tests/test_request_timeout.py +++ b/tests/test_request_timeout.py @@ -1,8 +1,8 @@ from json import JSONDecodeError + from sanic import Sanic import asyncio from sanic.response import text -from sanic.exceptions import RequestTimeout from sanic.config import Config import aiohttp from aiohttp import TCPConnector @@ -10,21 +10,68 @@ class DelayableTCPConnector(TCPConnector): - class DelayableHttpRequest(object): + + class RequestContextManager(object): def __new__(cls, req, delay): - cls = super(DelayableTCPConnector.DelayableHttpRequest, cls).\ + cls = super(DelayableTCPConnector.RequestContextManager, cls).\ __new__(cls) cls.req = req + cls.send_task = None + cls.resp = None + cls.orig_send = getattr(req, 'send') + cls.orig_start = None cls.delay = delay + cls._acting_as = req return cls def __getattr__(self, item): - return getattr(self.req, item) - - def send(self, *args, **kwargs): + acting_as = self._acting_as + return getattr(acting_as, item) + + @asyncio.coroutine + def start(self, connection, read_until_eof=False): + if self.send_task is None: + raise RuntimeError("do a send() before you do a start()") + resp = yield from self.send_task + self.send_task = None + self.resp = resp + self._acting_as = self.resp + self.orig_start = getattr(resp, 'start') + + try: + ret = yield from self.orig_start(connection, + read_until_eof) + except Exception as e: + raise e + return ret + + def close(self): + if self.resp is not None: + self.resp.close() + if self.send_task is not None: + self.send_task.cancel() + + @asyncio.coroutine + def delayed_send(self, *args, **kwargs): + req = self.req if self.delay and self.delay > 0: + #sync_sleep(self.delay) _ = yield from asyncio.sleep(self.delay) - self.req.send(*args, **kwargs) + t = req.loop.time() + print("sending at {}".format(t), flush=True) + conn = next(iter(args)) # first arg is connection + try: + delayed_resp = self.orig_send(*args, **kwargs) + except Exception as e: + return aiohttp.ClientResponse(req.method, req.url) + return delayed_resp + + def send(self, *args, **kwargs): + gen = self.delayed_send(*args, **kwargs) + task = self.req.loop.create_task(gen) + self.send_task = task + self._acting_as = task + return self def __init__(self, *args, **kwargs): _post_connect_delay = kwargs.pop('post_connect_delay', 0) @@ -35,31 +82,37 @@ def __init__(self, *args, **kwargs): @asyncio.coroutine def connect(self, req): - req = DelayableTCPConnector.\ - DelayableHttpRequest(req, self._pre_request_delay) + d_req = DelayableTCPConnector.\ + RequestContextManager(req, self._pre_request_delay) conn = yield from super(DelayableTCPConnector, self).connect(req) if self._post_connect_delay and self._post_connect_delay > 0: - _ = yield from asyncio.sleep(self._post_connect_delay) + _ = yield from asyncio.sleep(self._post_connect_delay, + loop=self._loop) + req.send = d_req.send + t = req.loop.time() + print("Connected at {}".format(t), flush=True) return conn class DelayableSanicTestClient(SanicTestClient): - def __init__(self, app, request_delay=1): + def __init__(self, app, loop, request_delay=1): super(DelayableSanicTestClient, self).__init__(app) self._request_delay = request_delay + self._loop = None async def _local_request(self, method, uri, cookies=None, *args, **kwargs): + if self._loop is None: + self._loop = asyncio.get_event_loop() if uri.startswith(('http:', 'https:', 'ftp:', 'ftps://' '//')): url = uri else: url = 'http://{host}:{port}{uri}'.format( host=HOST, port=PORT, uri=uri) - conn = DelayableTCPConnector(pre_request_delay=self._request_delay, - verify_ssl=False) - async with aiohttp.ClientSession( - cookies=cookies, connector=conn) as session: + verify_ssl=False, loop=self._loop) + async with aiohttp.ClientSession(cookies=cookies, connector=conn, + loop=self._loop) as session: # Insert a delay after creating the connection # But before sending the request. @@ -81,17 +134,30 @@ async def _local_request(self, method, uri, cookies=None, *args, return response -Config.REQUEST_TIMEOUT = 1 +Config.REQUEST_TIMEOUT = 2 request_timeout_default_app = Sanic('test_request_timeout_default') +request_no_timeout_app = Sanic('test_request_no_timeout') @request_timeout_default_app.route('/1') -async def handler(request): +async def handler1(request): + return text('OK') + + +@request_no_timeout_app.route('/1') +async def handler2(request): return text('OK') def test_default_server_error_request_timeout(): - client = DelayableSanicTestClient(request_timeout_default_app, 2) + client = DelayableSanicTestClient(request_timeout_default_app, None, 3) request, response = client.get('/1') assert response.status == 408 assert response.text == 'Error: Request Timeout' + + +def test_default_server_error_request_dont_timeout(): + client = DelayableSanicTestClient(request_no_timeout_app, None, 1) + request, response = client.get('/1') + assert response.status == 200 + assert response.text == 'OK'