From ad2366e31c0b05905082bd7b22340eb186128be0 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 2 Feb 2018 10:37:55 +0200 Subject: [PATCH] Disable keepalives for websockets (#2701) * Get rid of yield_fixture * Don't reschedule keepalive processor immediatelly to prevent 100% CPU load * Fix #1955: Fix 100% CPU usage on HTTP GET and websocket connection just after it --- CHANGES.rst | 6 ++++++ aiohttp/web_protocol.py | 12 ++++++++--- aiohttp/web_ws.py | 2 ++ tests/test_web_protocol.py | 22 +++++++++++++------ tests/test_web_websocket_functional.py | 29 ++++++++++++++++++++++++++ 5 files changed, 62 insertions(+), 9 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index f606df08f9c..9a167d4d9a7 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -14,6 +14,12 @@ Changelog .. towncrier release notes start + +2.3.10 (XXXX-XX-XX) +=================== + +- Fix 100% CPU usage on HTTP GET and websocket connection just after it (#1955) + 2.3.9 (2018-01-16) ================== diff --git a/aiohttp/web_protocol.py b/aiohttp/web_protocol.py index 92d077b89cf..ab77c41504f 100644 --- a/aiohttp/web_protocol.py +++ b/aiohttp/web_protocol.py @@ -80,6 +80,7 @@ class RequestHandler(asyncio.streams.FlowControlMixin, asyncio.Protocol): """ _request_count = 0 _keepalive = False # keep transport open + KEEPALIVE_RESCHEDULE_DELAY = 1 def __init__(self, manager, *, loop=None, keepalive_timeout=75, # NGINX default value is 75 secs @@ -321,6 +322,9 @@ def keep_alive(self, val): :param bool val: new state. """ self._keepalive = val + if self._keepalive_handle: + self._keepalive_handle.cancel() + self._keepalive_handle = None def close(self): """Stop accepting new pipelinig messages and close @@ -352,7 +356,7 @@ def log_exception(self, *args, **kw): self.logger.exception(*args, **kw) def _process_keepalive(self): - if self._force_close: + if self._force_close or not self._keepalive: return next = self._keepalive_time + self._keepalive_timeout @@ -363,8 +367,10 @@ def _process_keepalive(self): self.force_close(send_last_heartbeat=True) return - self._keepalive_handle = self._loop.call_at( - next, self._process_keepalive) + # not all request handlers are done, + # reschedule itself to next second + self._keepalive_handle = self._loop.call_later( + self.KEEPALIVE_RESCHEDULE_DELAY, self._process_keepalive) def pause_reading(self): if not self._reading_paused: diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 1f3b19472f1..1f357802c64 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -133,6 +133,8 @@ def _post_start(self, request, protocol, writer): request._protocol, limit=2 ** 16, loop=self._loop) request.protocol.set_parser(WebSocketReader( self._reader, compress=self._compress)) + # disable HTTP keepalive for WebSocket + request.protocol.keep_alive(False) def can_prepare(self, request): if self._writer is not None: diff --git a/tests/test_web_protocol.py b/tests/test_web_protocol.py index d551436d57e..ba4fb0354f4 100644 --- a/tests/test_web_protocol.py +++ b/tests/test_web_protocol.py @@ -11,7 +11,7 @@ from aiohttp import helpers, http, streams, web -@pytest.yield_fixture +@pytest.fixture def make_srv(loop, manager): srv = None @@ -72,12 +72,12 @@ def handle(request): return wrapper -@pytest.yield_fixture +@pytest.fixture def writer(srv): return http.PayloadWriter(srv.writer, srv._loop) -@pytest.yield_fixture +@pytest.fixture def transport(buf): transport = mock.Mock() @@ -204,7 +204,7 @@ def test_connection_made(make_srv): assert not srv._force_close -def test_connection_made_with_keepaplive(make_srv, transport): +def test_connection_made_with_tcp_keepaplive(make_srv, transport): srv = make_srv() sock = mock.Mock() @@ -214,7 +214,7 @@ def test_connection_made_with_keepaplive(make_srv, transport): socket.SO_KEEPALIVE, 1) -def test_connection_made_without_keepaplive(make_srv): +def test_connection_made_without_tcp_keepaplive(make_srv): srv = make_srv(tcp_keepalive=False) sock = mock.Mock() @@ -260,6 +260,15 @@ def test_srv_keep_alive(srv): assert not srv._keepalive +def test_srv_keep_alive_disable(srv): + handle = srv._keepalive_handle = mock.Mock() + + srv.keep_alive(False) + assert not srv._keepalive + assert srv._keepalive_handle is None + handle.cancel.assert_called_with() + + def test_slow_request(make_srv): with pytest.warns(DeprecationWarning): make_srv(slow_request_timeout=0.01) @@ -583,6 +592,7 @@ def test_handle_500(srv, loop, buf, transport, request_handler): @asyncio.coroutine def test_keep_alive(make_srv, loop, transport, ceil): srv = make_srv(keepalive_timeout=0.05) + srv.KEEPALIVE_RESCHEDULE_DELAY = 0.1 srv.connection_made(transport) srv.keep_alive(True) @@ -600,7 +610,7 @@ def test_keep_alive(make_srv, loop, transport, ceil): assert srv._keepalive_handle is not None assert not transport.close.called - yield from asyncio.sleep(0.1, loop=loop) + yield from asyncio.sleep(0.2, loop=loop) assert transport.close.called assert srv._waiters[0].cancelled diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index a95e1bec5ce..4aa15206cc9 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -758,3 +758,32 @@ def handler(request): yield from ws.receive() assert cancelled + + +@asyncio.coroutine +def test_websocket_disable_keepalive(loop, test_client): + @asyncio.coroutine + def handler(request): + ws = web.WebSocketResponse() + if not ws.can_prepare(request): + return web.Response(text='OK') + assert request.protocol._keepalive + yield from ws.prepare(request) + assert not request.protocol._keepalive + assert not request.protocol._keepalive_handle + + yield from ws.send_str('OK') + yield from ws.close() + return ws + + app = web.Application() + app.router.add_route('GET', '/', handler) + client = yield from test_client(app) + + resp = yield from client.get('/') + txt = yield from resp.text() + assert txt == 'OK' + + ws = yield from client.ws_connect('/') + data = yield from ws.receive_str() + assert data == 'OK'