From 7a67da154e077bf4b248beb882f1d454e144ac35 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 29 Dec 2018 14:43:44 +0200 Subject: [PATCH 1/4] Rewrite worker --- aiohttp/worker.py | 45 ++++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/aiohttp/worker.py b/aiohttp/worker.py index 4681b47eadb..0657b48a5dd 100644 --- a/aiohttp/worker.py +++ b/aiohttp/worker.py @@ -6,7 +6,7 @@ import signal import sys from types import FrameType -from typing import Any, Optional # noqa +from typing import Any, Awaitable, Callable, Optional, Union # noqa from gunicorn.config import AccessLogFormat as GunicornAccessLogFormat from gunicorn.workers import base @@ -14,6 +14,7 @@ from aiohttp import web from .helpers import set_result +from .web_app import Application from .web_log import AccessLogger try: @@ -37,7 +38,6 @@ class GunicornWebWorker(base.Worker): def __init__(self, *args: Any, **kw: Any) -> None: # pragma: no cover super().__init__(*args, **kw) - self._runner = None # type: Optional[web.AppRunner] self._task = None # type: Optional[asyncio.Task[None]] self.exit_code = 0 self._notify_waiter = None # type: Optional[asyncio.Future[bool]] @@ -52,35 +52,42 @@ def init_process(self) -> None: super().init_process() def run(self) -> None: - access_log = self.log.access_log if self.cfg.accesslog else None - params = dict( - logger=self.log, - keepalive_timeout=self.cfg.keepalive, - access_log=access_log, - access_log_format=self._get_valid_log_format( - self.cfg.access_log_format)) - if asyncio.iscoroutinefunction(self.wsgi): # type: ignore - self.wsgi = self.loop.run_until_complete( - self.wsgi()) # type: ignore - self._runner = web.AppRunner(self.wsgi, **params) - self.loop.run_until_complete(self._runner.setup()) - self._task = self.loop.create_task(self._run()) + self._task = self.loop.create_task(self._run(self.wsgi)) try: # ignore all finalization problems self.loop.run_until_complete(self._task) except Exception as error: self.log.exception(error) if sys.version_info >= (3, 6): - if hasattr(self.loop, 'shutdown_asyncgens'): - self.loop.run_until_complete(self.loop.shutdown_asyncgens()) + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) self.loop.close() sys.exit(self.exit_code) - async def _run(self) -> None: + async def _run( + self, + app: Union[Application, Callable[[], Awaitable[Application]]] + ) -> None: + if isinstance(app, Application): + real_app = app + elif asyncio.iscoroutinefunction(app): + real_app = await app() + else: + raise RuntimeError("wsgi app should be either Application or " + "async function returning Application, got {}" + .format(app)) + access_log = self.log.access_log if self.cfg.accesslog else None + runner = web.AppRunner(real_app, + logger=self.log, + keepalive_timeout=self.cfg.keepalive, + access_log=access_log, + access_log_format=self._get_valid_log_format( + self.cfg.access_log_format)) + await runner.setup() + ctx = self._create_ssl_context(self.cfg) if self.cfg.is_ssl else None - runner = self._runner + runner = runner assert runner is not None server = runner.server assert server is not None From 453dc482fcd217fa902896af2932184712d831a0 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 30 Dec 2018 01:33:28 +0200 Subject: [PATCH 2/4] Work on --- aiohttp/worker.py | 17 +++++++---------- tests/test_worker.py | 41 ++--------------------------------------- 2 files changed, 9 insertions(+), 49 deletions(-) diff --git a/aiohttp/worker.py b/aiohttp/worker.py index 0657b48a5dd..7fd750b429f 100644 --- a/aiohttp/worker.py +++ b/aiohttp/worker.py @@ -52,7 +52,7 @@ def init_process(self) -> None: super().init_process() def run(self) -> None: - self._task = self.loop.create_task(self._run(self.wsgi)) + self._task = self.loop.create_task(self._run()) try: # ignore all finalization problems self.loop.run_until_complete(self._task) @@ -64,20 +64,17 @@ def run(self) -> None: sys.exit(self.exit_code) - async def _run( - self, - app: Union[Application, Callable[[], Awaitable[Application]]] - ) -> None: - if isinstance(app, Application): - real_app = app - elif asyncio.iscoroutinefunction(app): - real_app = await app() + async def _run(self) -> None: + if isinstance(self.wsgi, Application): + app = self.wsgi + elif asyncio.iscoroutinefunction(self.wsgi): + app = await self.wsgi() else: raise RuntimeError("wsgi app should be either Application or " "async function returning Application, got {}" .format(app)) access_log = self.log.access_log if self.cfg.accesslog else None - runner = web.AppRunner(real_app, + runner = web.AppRunner(app, logger=self.log, keepalive_timeout=self.cfg.keepalive, access_log=access_log, diff --git a/tests/test_worker.py b/tests/test_worker.py index d78662e0074..22a6fd56189 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -42,7 +42,8 @@ def __init__(self): self.wsgi = web.Application() -class AsyncioWorker(BaseTestWorker, base_worker.GunicornWebWorker): # type: ignore # noqa +class AsyncioWorker(BaseTestWorker, # type: ignore + base_worker.GunicornWebWorker): pass @@ -197,15 +198,11 @@ async def test__run_ok_parent_changed(worker, loop, worker.cfg.max_requests = 0 worker.cfg.is_ssl = False - worker._runner = web.AppRunner(worker.wsgi) - await worker._runner.setup() - await worker._run() worker.notify.assert_called_with() worker.log.info.assert_called_with("Parent changed, shutting down: %s", worker) - assert worker._runner.server is None async def test__run_exc(worker, loop, aiohttp_unused_port) -> None: @@ -223,9 +220,6 @@ async def test__run_exc(worker, loop, aiohttp_unused_port) -> None: worker.cfg.max_requests = 0 worker.cfg.is_ssl = False - worker._runner = web.AppRunner(worker.wsgi) - await worker._runner.setup() - def raiser(): waiter = worker._notify_waiter worker.alive = False @@ -235,37 +229,6 @@ def raiser(): await worker._run() worker.notify.assert_called_with() - assert worker._runner.server is None - - -async def test__run_ok_max_requests_exceeded(worker, loop, - aiohttp_unused_port): - skip_if_no_dict(loop) - - worker.ppid = os.getppid() - worker.alive = True - worker.servers = {} - sock = socket.socket() - addr = ('localhost', aiohttp_unused_port()) - sock.bind(addr) - worker.sockets = [sock] - worker.log = mock.Mock() - worker.loop = loop - worker.cfg.access_log_format = ACCEPTABLE_LOG_FORMAT - worker.cfg.max_requests = 10 - worker.cfg.is_ssl = False - - worker._runner = web.AppRunner(worker.wsgi) - await worker._runner.setup() - worker._runner.server.requests_count = 30 - - await worker._run() - - worker.notify.assert_called_with() - worker.log.info.assert_called_with("Max requests, shutting down: %s", - worker) - - assert worker._runner.server is None def test__create_ssl_context_without_certs_and_ciphers(worker) -> None: From 6589bc6804a12c9021b9ed4395eee45e0e8d4e1d Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 30 Dec 2018 01:39:40 +0200 Subject: [PATCH 3/4] Add changenote --- CHANGES/3471.bugfix | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 CHANGES/3471.bugfix diff --git a/CHANGES/3471.bugfix b/CHANGES/3471.bugfix new file mode 100644 index 00000000000..98e28e68465 --- /dev/null +++ b/CHANGES/3471.bugfix @@ -0,0 +1,2 @@ +Use the same task for app initialization and web server handling in gunicorn workers. +It allows to use Python3.7 context vars smoothly. From 88525c313898084a0f43ac02878aa4854af657ae Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 30 Dec 2018 17:56:07 +0200 Subject: [PATCH 4/4] Improve coverage --- aiohttp/worker.py | 6 +++--- tests/test_worker.py | 29 +++++++++++++++++++++++------ 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/aiohttp/worker.py b/aiohttp/worker.py index 7fd750b429f..73ba6e38f69 100644 --- a/aiohttp/worker.py +++ b/aiohttp/worker.py @@ -56,8 +56,8 @@ def run(self) -> None: try: # ignore all finalization problems self.loop.run_until_complete(self._task) - except Exception as error: - self.log.exception(error) + except Exception: + self.log.exception("Exception in gunicorn worker") if sys.version_info >= (3, 6): self.loop.run_until_complete(self.loop.shutdown_asyncgens()) self.loop.close() @@ -72,7 +72,7 @@ async def _run(self) -> None: else: raise RuntimeError("wsgi app should be either Application or " "async function returning Application, got {}" - .format(app)) + .format(self.wsgi)) access_log = self.log.access_log if self.cfg.accesslog else None runner = web.AppRunner(app, logger=self.log, diff --git a/tests/test_worker.py b/tests/test_worker.py index 22a6fd56189..27c75248774 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -9,7 +9,6 @@ import pytest from aiohttp import web -from aiohttp.test_utils import make_mocked_coro base_worker = pytest.importorskip('aiohttp.worker') @@ -49,7 +48,8 @@ class AsyncioWorker(BaseTestWorker, # type: ignore PARAMS = [AsyncioWorker] if uvloop is not None: - class UvloopWorker(BaseTestWorker, base_worker.GunicornUVLoopWebWorker): # type: ignore # noqa + class UvloopWorker(BaseTestWorker, # type: ignore + base_worker.GunicornUVLoopWebWorker): pass PARAMS.append(UvloopWorker) @@ -79,12 +79,13 @@ def test_run(worker, loop) -> None: worker.log = mock.Mock() worker.cfg = mock.Mock() worker.cfg.access_log_format = ACCEPTABLE_LOG_FORMAT + worker.cfg.is_ssl = False + worker.sockets = [] worker.loop = loop - worker._run = make_mocked_coro(None) with pytest.raises(SystemExit): worker.run() - assert worker._run.called + worker.log.exception.assert_not_called() assert loop.is_closed() @@ -92,6 +93,8 @@ def test_run_async_factory(worker, loop) -> None: worker.log = mock.Mock() worker.cfg = mock.Mock() worker.cfg.access_log_format = ACCEPTABLE_LOG_FORMAT + worker.cfg.is_ssl = False + worker.sockets = [] app = worker.wsgi async def make_app(): @@ -99,10 +102,24 @@ async def make_app(): worker.wsgi = make_app worker.loop = loop - worker._run = make_mocked_coro(None) + worker.alive = False + with pytest.raises(SystemExit): + worker.run() + worker.log.exception.assert_not_called() + assert loop.is_closed() + + +def test_run_not_app(worker, loop) -> None: + worker.log = mock.Mock() + worker.cfg = mock.Mock() + worker.cfg.access_log_format = ACCEPTABLE_LOG_FORMAT + + worker.loop = loop + worker.wsgi = "not-app" + worker.alive = False with pytest.raises(SystemExit): worker.run() - assert worker._run.called + worker.log.exception.assert_called_with('Exception in gunicorn worker') assert loop.is_closed()