Skip to content

Commit

Permalink
Cleaner process management (#2811)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahopkins authored Dec 7, 2023
1 parent 00f2af2 commit 4499d2c
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 18 deletions.
14 changes: 14 additions & 0 deletions sanic/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2383,6 +2383,20 @@ def ack(self) -> None:
if hasattr(self, "multiplexer"):
self.multiplexer.ack()

def set_serving(self, serving: bool) -> None:
"""Set the serving state of the application.
This method is used to set the serving state of the application.
It is used internally by Sanic and should not typically be called
manually.
Args:
serving (bool): Whether the application is serving.
"""
self.state.is_running = serving
if hasattr(self, "multiplexer"):
self.multiplexer.set_serving(serving)

async def _server_event(
self,
concern: str,
Expand Down
22 changes: 21 additions & 1 deletion sanic/mixins/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from pathlib import Path
from socket import SHUT_RDWR, socket
from ssl import SSLContext
from time import sleep
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -60,6 +61,7 @@
from sanic.server import try_use_uvloop
from sanic.server.async_server import AsyncioServer
from sanic.server.events import trigger_events
from sanic.server.goodbye import get_goodbye
from sanic.server.loop import try_windows_loop
from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.protocols.websocket_protocol import WebSocketProtocol
Expand Down Expand Up @@ -1146,7 +1148,6 @@ def serve(
app.router.reset()
app.signal_router.reset()

sync_manager.shutdown()
for sock in socks:
try:
sock.shutdown(SHUT_RDWR)
Expand All @@ -1158,12 +1159,31 @@ def serve(
loop.close()
cls._cleanup_env_vars()
cls._cleanup_apps()

limit = 100
while cls._get_process_states(worker_state):
sleep(0.1)
limit -= 1
if limit <= 0:
error_logger.warning(
"Worker shutdown timed out. "
"Some processes may still be running."
)
break
sync_manager.shutdown()
unix = kwargs.get("unix")
if unix:
remove_unix_socket(unix)
logger.debug(get_goodbye())
if exit_code:
os._exit(exit_code)

@staticmethod
def _get_process_states(worker_state) -> List[str]:
return [
state for s in worker_state.values() if (state := s.get("state"))
]

@classmethod
def serve_single(cls, primary: Optional[Sanic] = None) -> None:
"""Serve a single process of a Sanic application.
Expand Down
31 changes: 31 additions & 0 deletions sanic/server/goodbye.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# flake8: noqa: E501

import random
import sys


# fmt: off
ascii_phrases = {
'Farewell', 'Bye', 'See you later', 'Take care', 'So long', 'Adieu', 'Cheerio',
'Goodbye', 'Adios', 'Au revoir', 'Arrivederci', 'Sayonara', 'Auf Wiedersehen',
'Do svidaniya', 'Annyeong', 'Tot ziens', 'Ha det', 'Selamat tinggal',
'Hasta luego', 'Nos vemos', 'Salut', 'Ciao', 'A presto',
'Dag', 'Tot later', 'Vi ses', 'Sampai jumpa',
}

non_ascii_phrases = {
'Tschüss', 'Zài jiàn', 'Bāi bāi', 'Míngtiān jiàn', 'Adeus', 'Tchau', 'Até logo',
'Hejdå', 'À bientôt', 'Bis später', 'Adjø',
'じゃね', 'またね', '안녕히 계세요', '잘 가', 'שלום',
'להתראות', 'مع السلامة', 'إلى اللقاء', 'وداعاً', 'अलविदा',
'फिर मिलेंगे',
}

all_phrases = ascii_phrases | non_ascii_phrases
# fmt: on


def get_goodbye() -> str: # pragma: no cover
is_utf8 = sys.stdout.encoding.lower() == "utf-8"
phrases = all_phrases if is_utf8 else ascii_phrases
return random.choice(list(phrases)) # nosec: B311
16 changes: 8 additions & 8 deletions sanic/server/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,15 @@ def _setup_system_signals(
register_sys_signals: bool,
loop: asyncio.AbstractEventLoop,
) -> None: # no cov
# Ignore SIGINT when run_multiple
if run_multiple:
signal_func(SIGINT, SIG_IGN)
os.environ["SANIC_WORKER_PROCESS"] = "true"

signal_func(SIGINT, SIG_IGN)
signal_func(SIGTERM, SIG_IGN)
os.environ["SANIC_WORKER_PROCESS"] = "true"
# Register signals for graceful termination
if register_sys_signals:
if OS_IS_WINDOWS:
ctrlc_workaround_for_windows(app)
else:
for _signal in [SIGTERM] if run_multiple else [SIGINT, SIGTERM]:
for _signal in [SIGINT, SIGTERM]:
loop.add_signal_handler(
_signal, partial(app.stop, terminate=False)
)
Expand All @@ -180,8 +178,6 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix):
try:
server_logger.info("Starting worker [%s]", pid)
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server_logger.info("Stopping worker [%s]", pid)

Expand All @@ -193,6 +189,7 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix):
loop.run_until_complete(after_stop())
remove_unix_socket(unix)
loop.close()
server_logger.info("Worker complete [%s]", pid)


def _serve_http_1(
Expand Down Expand Up @@ -296,8 +293,11 @@ def _cleanup():
else:
conn.abort()

app.set_serving(False)

_setup_system_signals(app, run_multiple, register_sys_signals, loop)
loop.run_until_complete(app._server_event("init", "after"))
app.set_serving(True)
_run_server_forever(
loop,
partial(app._server_event, "shutdown", "before"),
Expand Down
6 changes: 6 additions & 0 deletions sanic/worker/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def run(self):
self.monitor()
self.join()
self.terminate()
self.cleanup()

def start(self):
"""Start the worker processes."""
Expand Down Expand Up @@ -182,6 +183,11 @@ def terminate(self):
for process in self.processes:
process.terminate()

def cleanup(self):
"""Cleanup the worker processes."""
for process in self.processes:
process.exit()

def restart(
self,
process_names: Optional[List[str]] = None,
Expand Down
18 changes: 18 additions & 0 deletions sanic/worker/multiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ def ack(self):
"state": ProcessState.ACKED.name,
}

def set_serving(self, serving: bool) -> None:
"""Set the worker to serving.
Args:
serving (bool): Whether the worker is serving.
"""
self._state._state[self.name] = {
**self._state._state[self.name],
"serving": serving,
}

def exit(self):
"""Run cleanup at worker exit."""
try:
del self._state._state[self.name]
except ConnectionRefusedError:
logger.debug("Monitor process has already exited.")

def restart(
self,
name: str = "",
Expand Down
15 changes: 14 additions & 1 deletion sanic/worker/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ def join(self):
self.set_state(ProcessState.JOINED)
self._current_process.join()

def exit(self):
limit = 100
while self.is_alive() and limit > 0:
sleep(0.1)
limit -= 1

if not self.is_alive():
try:
del self.worker_state[self.name]
except ConnectionRefusedError:
logger.debug("Monitor process has already exited.")
except KeyError:
logger.debug("Could not find worker state to delete.")

def terminate(self):
if self.state is not ProcessState.TERMINATED:
logger.debug(
Expand All @@ -79,7 +93,6 @@ def terminate(self):
self.set_state(ProcessState.TERMINATED, force=True)
try:
os.kill(self.pid, SIGINT)
del self.worker_state[self.name]
except (KeyError, AttributeError, ProcessLookupError):
...

Expand Down
2 changes: 1 addition & 1 deletion tests/test_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def raised_ceiling():
# Chrome, Firefox:
# Content-Disposition: form-data; name="foo%22;bar\"; filename="😀"
'form-data; name="foo%22;bar\\"; filename="😀"',
("form-data", {"name": 'foo";bar\\', "filename": "😀"})
("form-data", {"name": 'foo";bar\\', "filename": "😀"}),
# cgi: ('form-data', {'name': 'foo%22;bar"; filename="😀'})
# werkzeug (pre 2.3.0): ('form-data', {'name': 'foo%22;bar"; filename='})
),
Expand Down
4 changes: 2 additions & 2 deletions tests/test_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ def test_stack_trace_on_not_found(app, static_file_directory, caplog):
assert counter[("sanic.root", logging.INFO)] == 10
assert counter[("sanic.root", logging.ERROR)] == 0
assert counter[("sanic.error", logging.ERROR)] == 0
assert counter[("sanic.server", logging.INFO)] == 2
assert counter[("sanic.server", logging.INFO)] == 3


def test_no_stack_trace_on_not_found(app, static_file_directory, caplog):
Expand All @@ -539,7 +539,7 @@ async def file_not_found(request, exception):
assert counter[("sanic.root", logging.INFO)] == 10
assert counter[("sanic.root", logging.ERROR)] == 0
assert counter[("sanic.error", logging.ERROR)] == 0
assert counter[("sanic.server", logging.INFO)] == 2
assert counter[("sanic.server", logging.INFO)] == 3
assert response.text == "No file: /static/non_existing_file.file"


Expand Down
2 changes: 1 addition & 1 deletion tests/typing/samples/request_custom_sanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class CustomConfig(Config):

@app.get("/")
async def handler(
request: Request[Sanic[CustomConfig, SimpleNamespace], SimpleNamespace]
request: Request[Sanic[CustomConfig, SimpleNamespace], SimpleNamespace],
):
reveal_type(request.ctx)
reveal_type(request.app)
4 changes: 3 additions & 1 deletion tests/worker/test_reloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ def start(self):
monkeypatch.setattr(threading.Thread, "start", orig)


def test_reloader_triggers_start_stop_listeners(app: Sanic, app_loader: AppLoader):
def test_reloader_triggers_start_stop_listeners(
app: Sanic, app_loader: AppLoader
):
results = []

@app.reload_process_start
Expand Down
7 changes: 4 additions & 3 deletions tests/worker/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ def test_run_server_forever(remove_unix_socket: Mock, do_cleanup: bool):
after_stop.return_value = Mock()
unix = Mock()

_run_server_forever(
loop, before_stop, after_stop, cleanup if do_cleanup else None, unix
)
with pytest.raises(KeyboardInterrupt):
_run_server_forever(
loop, before_stop, after_stop, cleanup if do_cleanup else None, unix
)

loop.run_forever.assert_called_once_with()
loop.run_until_complete.assert_has_calls(
Expand Down

0 comments on commit 4499d2c

Please sign in to comment.