From 4499d2c47e0ff54c36def0636226e7a2159e2386 Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Thu, 7 Dec 2023 11:23:01 +0200 Subject: [PATCH] Cleaner process management (#2811) --- sanic/app.py | 14 +++++++++ sanic/mixins/startup.py | 22 +++++++++++++- sanic/server/goodbye.py | 31 ++++++++++++++++++++ sanic/server/runners.py | 16 +++++----- sanic/worker/manager.py | 6 ++++ sanic/worker/multiplexer.py | 18 ++++++++++++ sanic/worker/process.py | 15 +++++++++- tests/test_headers.py | 2 +- tests/test_static.py | 4 +-- tests/typing/samples/request_custom_sanic.py | 2 +- tests/worker/test_reloader.py | 4 ++- tests/worker/test_runner.py | 7 +++-- 12 files changed, 123 insertions(+), 18 deletions(-) create mode 100644 sanic/server/goodbye.py diff --git a/sanic/app.py b/sanic/app.py index d6f104cb70..856a951b32 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -2383,6 +2383,20 @@ def ack(self) -> None: if hasattr(self, "multiplexer"): self.multiplexer.ack() + def set_serving(self, serving: bool) -> None: + """Set the serving state of the application. + + This method is used to set the serving state of the application. + It is used internally by Sanic and should not typically be called + manually. + + Args: + serving (bool): Whether the application is serving. + """ + self.state.is_running = serving + if hasattr(self, "multiplexer"): + self.multiplexer.set_serving(serving) + async def _server_event( self, concern: str, diff --git a/sanic/mixins/startup.py b/sanic/mixins/startup.py index 371be6d2c3..40c410ac90 100644 --- a/sanic/mixins/startup.py +++ b/sanic/mixins/startup.py @@ -27,6 +27,7 @@ from pathlib import Path from socket import SHUT_RDWR, socket from ssl import SSLContext +from time import sleep from typing import ( TYPE_CHECKING, Any, @@ -60,6 +61,7 @@ from sanic.server import try_use_uvloop from sanic.server.async_server import AsyncioServer from sanic.server.events import trigger_events +from sanic.server.goodbye import get_goodbye from sanic.server.loop import try_windows_loop from sanic.server.protocols.http_protocol import HttpProtocol from sanic.server.protocols.websocket_protocol import WebSocketProtocol @@ -1146,7 +1148,6 @@ def serve( app.router.reset() app.signal_router.reset() - sync_manager.shutdown() for sock in socks: try: sock.shutdown(SHUT_RDWR) @@ -1158,12 +1159,31 @@ def serve( loop.close() cls._cleanup_env_vars() cls._cleanup_apps() + + limit = 100 + while cls._get_process_states(worker_state): + sleep(0.1) + limit -= 1 + if limit <= 0: + error_logger.warning( + "Worker shutdown timed out. " + "Some processes may still be running." + ) + break + sync_manager.shutdown() unix = kwargs.get("unix") if unix: remove_unix_socket(unix) + logger.debug(get_goodbye()) if exit_code: os._exit(exit_code) + @staticmethod + def _get_process_states(worker_state) -> List[str]: + return [ + state for s in worker_state.values() if (state := s.get("state")) + ] + @classmethod def serve_single(cls, primary: Optional[Sanic] = None) -> None: """Serve a single process of a Sanic application. diff --git a/sanic/server/goodbye.py b/sanic/server/goodbye.py new file mode 100644 index 0000000000..efef8ad52e --- /dev/null +++ b/sanic/server/goodbye.py @@ -0,0 +1,31 @@ +# flake8: noqa: E501 + +import random +import sys + + +# fmt: off +ascii_phrases = { + 'Farewell', 'Bye', 'See you later', 'Take care', 'So long', 'Adieu', 'Cheerio', + 'Goodbye', 'Adios', 'Au revoir', 'Arrivederci', 'Sayonara', 'Auf Wiedersehen', + 'Do svidaniya', 'Annyeong', 'Tot ziens', 'Ha det', 'Selamat tinggal', + 'Hasta luego', 'Nos vemos', 'Salut', 'Ciao', 'A presto', + 'Dag', 'Tot later', 'Vi ses', 'Sampai jumpa', +} + +non_ascii_phrases = { + 'Tschüss', 'Zài jiàn', 'Bāi bāi', 'Míngtiān jiàn', 'Adeus', 'Tchau', 'Até logo', + 'Hejdå', 'À bientôt', 'Bis später', 'Adjø', + 'じゃね', 'またね', '안녕히 계세요', '잘 가', 'שלום', + 'להתראות', 'مع السلامة', 'إلى اللقاء', 'وداعاً', 'अलविदा', + 'फिर मिलेंगे', +} + +all_phrases = ascii_phrases | non_ascii_phrases +# fmt: on + + +def get_goodbye() -> str: # pragma: no cover + is_utf8 = sys.stdout.encoding.lower() == "utf-8" + phrases = all_phrases if is_utf8 else ascii_phrases + return random.choice(list(phrases)) # nosec: B311 diff --git a/sanic/server/runners.py b/sanic/server/runners.py index 72c7a035bd..6de82935a2 100644 --- a/sanic/server/runners.py +++ b/sanic/server/runners.py @@ -159,17 +159,15 @@ def _setup_system_signals( register_sys_signals: bool, loop: asyncio.AbstractEventLoop, ) -> None: # no cov - # Ignore SIGINT when run_multiple - if run_multiple: - signal_func(SIGINT, SIG_IGN) - os.environ["SANIC_WORKER_PROCESS"] = "true" - + signal_func(SIGINT, SIG_IGN) + signal_func(SIGTERM, SIG_IGN) + os.environ["SANIC_WORKER_PROCESS"] = "true" # Register signals for graceful termination if register_sys_signals: if OS_IS_WINDOWS: ctrlc_workaround_for_windows(app) else: - for _signal in [SIGTERM] if run_multiple else [SIGINT, SIGTERM]: + for _signal in [SIGINT, SIGTERM]: loop.add_signal_handler( _signal, partial(app.stop, terminate=False) ) @@ -180,8 +178,6 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix): try: server_logger.info("Starting worker [%s]", pid) loop.run_forever() - except KeyboardInterrupt: - pass finally: server_logger.info("Stopping worker [%s]", pid) @@ -193,6 +189,7 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix): loop.run_until_complete(after_stop()) remove_unix_socket(unix) loop.close() + server_logger.info("Worker complete [%s]", pid) def _serve_http_1( @@ -296,8 +293,11 @@ def _cleanup(): else: conn.abort() + app.set_serving(False) + _setup_system_signals(app, run_multiple, register_sys_signals, loop) loop.run_until_complete(app._server_event("init", "after")) + app.set_serving(True) _run_server_forever( loop, partial(app._server_event, "shutdown", "before"), diff --git a/sanic/worker/manager.py b/sanic/worker/manager.py index 1a0da96c0c..c3f9dd0401 100644 --- a/sanic/worker/manager.py +++ b/sanic/worker/manager.py @@ -154,6 +154,7 @@ def run(self): self.monitor() self.join() self.terminate() + self.cleanup() def start(self): """Start the worker processes.""" @@ -182,6 +183,11 @@ def terminate(self): for process in self.processes: process.terminate() + def cleanup(self): + """Cleanup the worker processes.""" + for process in self.processes: + process.exit() + def restart( self, process_names: Optional[List[str]] = None, diff --git a/sanic/worker/multiplexer.py b/sanic/worker/multiplexer.py index b877ccc179..cf2054c5b9 100644 --- a/sanic/worker/multiplexer.py +++ b/sanic/worker/multiplexer.py @@ -39,6 +39,24 @@ def ack(self): "state": ProcessState.ACKED.name, } + def set_serving(self, serving: bool) -> None: + """Set the worker to serving. + + Args: + serving (bool): Whether the worker is serving. + """ + self._state._state[self.name] = { + **self._state._state[self.name], + "serving": serving, + } + + def exit(self): + """Run cleanup at worker exit.""" + try: + del self._state._state[self.name] + except ConnectionRefusedError: + logger.debug("Monitor process has already exited.") + def restart( self, name: str = "", diff --git a/sanic/worker/process.py b/sanic/worker/process.py index 83b3556b52..a66b517ea3 100644 --- a/sanic/worker/process.py +++ b/sanic/worker/process.py @@ -67,6 +67,20 @@ def join(self): self.set_state(ProcessState.JOINED) self._current_process.join() + def exit(self): + limit = 100 + while self.is_alive() and limit > 0: + sleep(0.1) + limit -= 1 + + if not self.is_alive(): + try: + del self.worker_state[self.name] + except ConnectionRefusedError: + logger.debug("Monitor process has already exited.") + except KeyError: + logger.debug("Could not find worker state to delete.") + def terminate(self): if self.state is not ProcessState.TERMINATED: logger.debug( @@ -79,7 +93,6 @@ def terminate(self): self.set_state(ProcessState.TERMINATED, force=True) try: os.kill(self.pid, SIGINT) - del self.worker_state[self.name] except (KeyError, AttributeError, ProcessLookupError): ... diff --git a/tests/test_headers.py b/tests/test_headers.py index fbf13d25bf..3c07a98f53 100644 --- a/tests/test_headers.py +++ b/tests/test_headers.py @@ -57,7 +57,7 @@ def raised_ceiling(): # Chrome, Firefox: # Content-Disposition: form-data; name="foo%22;bar\"; filename="😀" 'form-data; name="foo%22;bar\\"; filename="😀"', - ("form-data", {"name": 'foo";bar\\', "filename": "😀"}) + ("form-data", {"name": 'foo";bar\\', "filename": "😀"}), # cgi: ('form-data', {'name': 'foo%22;bar"; filename="😀'}) # werkzeug (pre 2.3.0): ('form-data', {'name': 'foo%22;bar"; filename='}) ), diff --git a/tests/test_static.py b/tests/test_static.py index a8df59ab2b..2ee41bfa99 100644 --- a/tests/test_static.py +++ b/tests/test_static.py @@ -520,7 +520,7 @@ def test_stack_trace_on_not_found(app, static_file_directory, caplog): assert counter[("sanic.root", logging.INFO)] == 10 assert counter[("sanic.root", logging.ERROR)] == 0 assert counter[("sanic.error", logging.ERROR)] == 0 - assert counter[("sanic.server", logging.INFO)] == 2 + assert counter[("sanic.server", logging.INFO)] == 3 def test_no_stack_trace_on_not_found(app, static_file_directory, caplog): @@ -539,7 +539,7 @@ async def file_not_found(request, exception): assert counter[("sanic.root", logging.INFO)] == 10 assert counter[("sanic.root", logging.ERROR)] == 0 assert counter[("sanic.error", logging.ERROR)] == 0 - assert counter[("sanic.server", logging.INFO)] == 2 + assert counter[("sanic.server", logging.INFO)] == 3 assert response.text == "No file: /static/non_existing_file.file" diff --git a/tests/typing/samples/request_custom_sanic.py b/tests/typing/samples/request_custom_sanic.py index bdacd92bbe..cda4ae36fb 100644 --- a/tests/typing/samples/request_custom_sanic.py +++ b/tests/typing/samples/request_custom_sanic.py @@ -13,7 +13,7 @@ class CustomConfig(Config): @app.get("/") async def handler( - request: Request[Sanic[CustomConfig, SimpleNamespace], SimpleNamespace] + request: Request[Sanic[CustomConfig, SimpleNamespace], SimpleNamespace], ): reveal_type(request.ctx) reveal_type(request.app) diff --git a/tests/worker/test_reloader.py b/tests/worker/test_reloader.py index a0082014b2..5900a43bdf 100644 --- a/tests/worker/test_reloader.py +++ b/tests/worker/test_reloader.py @@ -155,7 +155,9 @@ def start(self): monkeypatch.setattr(threading.Thread, "start", orig) -def test_reloader_triggers_start_stop_listeners(app: Sanic, app_loader: AppLoader): +def test_reloader_triggers_start_stop_listeners( + app: Sanic, app_loader: AppLoader +): results = [] @app.reload_process_start diff --git a/tests/worker/test_runner.py b/tests/worker/test_runner.py index f54978d1a7..8a087e0e24 100644 --- a/tests/worker/test_runner.py +++ b/tests/worker/test_runner.py @@ -35,9 +35,10 @@ def test_run_server_forever(remove_unix_socket: Mock, do_cleanup: bool): after_stop.return_value = Mock() unix = Mock() - _run_server_forever( - loop, before_stop, after_stop, cleanup if do_cleanup else None, unix - ) + with pytest.raises(KeyboardInterrupt): + _run_server_forever( + loop, before_stop, after_stop, cleanup if do_cleanup else None, unix + ) loop.run_forever.assert_called_once_with() loop.run_until_complete.assert_has_calls(