From 8f1162b006ff596621bd58a7f05a14d647c2fe46 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 30 Aug 2024 15:42:14 +0200 Subject: [PATCH 1/6] Guarantee Nanny.running is set on failure --- distributed/nanny.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 7a14ee6576..0572bb48d0 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -758,13 +758,14 @@ async def start(self) -> Status: await self.process.terminate() self.status = Status.failed raise + finally: + self.running.set() if not msg: return self.status self.worker_address = msg["address"] self.worker_dir = msg["dir"] assert self.worker_address self.status = Status.running - self.running.set() return self.status From ebad5d56921d039ef7947c6e6fa310da5ded59c3 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 30 Aug 2024 16:02:02 +0200 Subject: [PATCH 2/6] fix timeout --- distributed/nanny.py | 41 ++++++++++++++++++--------------- distributed/tests/test_nanny.py | 31 +++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 0572bb48d0..a1020b5faa 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -745,19 +745,22 @@ async def start(self) -> Status: os.environ.update(self.pre_spawn_env) try: - await self.process.start() - except OSError: - logger.exception("Nanny failed to start process", exc_info=True) - # NOTE: doesn't wait for process to terminate, just for terminate signal to be sent - await self.process.terminate() - self.status = Status.failed - try: - msg = await self._wait_until_connected(uid) - except Exception: - # NOTE: doesn't wait for process to terminate, just for terminate signal to be sent - await self.process.terminate() - self.status = Status.failed - raise + try: + await self.process.start() + except OSError: + # This can only happen if the actual process creation failed, e.g. + # multiprocessing.Process.start failed. This is not tested! + logger.exception("Nanny failed to start process", exc_info=True) + # NOTE: doesn't wait for process to terminate, just for terminate signal to be sent + await self.process.terminate() + self.status = Status.failed + try: + msg = await self._wait_until_connected(uid) + except Exception: + # NOTE: doesn't wait for process to terminate, just for terminate signal to be sent + await self.process.terminate() + self.status = Status.failed + raise finally: self.running.set() if not msg: @@ -766,6 +769,7 @@ async def start(self) -> Status: self.worker_dir = msg["dir"] assert self.worker_address self.status = Status.running + self.running.set() return self.status @@ -831,11 +835,6 @@ async def kill( """ deadline = time() + timeout - if self.status == Status.stopped: - return - if self.status == Status.stopping: - await self.stopped.wait() - return # If the process is not properly up it will not watch the closing queue # and we may end up leaking this process # Therefore wait for it to be properly started before killing it @@ -843,10 +842,16 @@ async def kill( await self.running.wait() assert self.status in ( + Status.stopping, Status.running, Status.failed, # process failed to start, but hasn't been joined yet Status.closing_gracefully, ), self.status + if self.status == Status.stopped: + return + if self.status == Status.stopping: + await self.stopped.wait() + return self.status = Status.stopping logger.info("Nanny asking worker to close. Reason: %s", reason) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 52073c13d2..052f955511 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -582,6 +582,37 @@ async def test_worker_start_exception(s): assert logs.getvalue().count("ValueError: broken") == 1, logs.getvalue() +@gen_cluster(nthreads=[]) +async def test_worker_start_exception_while_killing(s): + nanny = Nanny(s.address, worker_class=BrokenWorker) + + async def try_to_kill_nanny(): + while not nanny.process or nanny.process.status not in ( + Status.starting, # this is what we want + # Status.failed # we might've missed it already + ): + await asyncio.sleep(0) + await nanny.kill() + + kill_task = asyncio.create_task(try_to_kill_nanny()) + with captured_logger(logger="distributed.nanny", level=logging.WARNING) as logs: + with raises_with_cause( + RuntimeError, + "Nanny failed to start", + RuntimeError, + "BrokenWorker failed to start", + ): + async with nanny: + pass + await kill_task + assert nanny.status == Status.failed + # ^ NOTE: `Nanny.close` sets it to `closed`, then `Server.start._close_on_failure` sets it to `failed` + assert nanny.process is None + assert "Restarting worker" not in logs.getvalue() + # Avoid excessive spewing. (It's also printed once extra within the subprocess, which is okay.) + assert logs.getvalue().count("ValueError: broken") == 1, logs.getvalue() + + @gen_cluster(nthreads=[]) async def test_failure_during_worker_initialization(s): with captured_logger(logger="distributed.nanny", level=logging.WARNING) as logs: From fe0a339ad2588b56ff73cbef0df3be33a1ee654d Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 2 Sep 2024 10:42:31 +0200 Subject: [PATCH 3/6] allow stopped as well --- distributed/nanny.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/nanny.py b/distributed/nanny.py index a1020b5faa..2f28436ecf 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -843,6 +843,7 @@ async def kill( assert self.status in ( Status.stopping, + Status.stopped, Status.running, Status.failed, # process failed to start, but hasn't been joined yet Status.closing_gracefully, From e5fb18d304d1a530f48ae8884fb83b1d007cbbdf Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 2 Sep 2024 15:06:05 +0200 Subject: [PATCH 4/6] shield kill --- distributed/nanny.py | 4 ++-- distributed/tests/test_nanny.py | 40 +++++++++++++++++++++++++-------- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 2f28436ecf..859b9f22dc 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -516,7 +516,7 @@ async def _(): await self.instantiate() try: - await wait_for(_(), timeout) + await wait_for(asyncio.shield(_()), timeout) except asyncio.TimeoutError: logger.error( f"Restart timed out after {timeout}s; returning before finished" @@ -769,7 +769,6 @@ async def start(self) -> Status: self.worker_dir = msg["dir"] assert self.worker_address self.status = Status.running - self.running.set() return self.status @@ -804,6 +803,7 @@ def mark_stopped(self): msg = self._death_message(self.process.pid, r) logger.info(msg) self.status = Status.stopped + self.running.clear() self.stopped.set() # Release resources self.process.close() diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 052f955511..c3e473a7aa 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -208,25 +208,47 @@ async def test_scheduler_file(): s.stop() -@pytest.mark.xfail( - os.environ.get("MINDEPS") == "true", - reason="Timeout errors with mindeps environment", -) -@gen_cluster(client=True, Worker=Nanny, nthreads=[("127.0.0.1", 2)]) -async def test_nanny_timeout(c, s, a): +@gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)]) +async def test_nanny_restart(c, s, a): + x = await c.scatter(123) + assert await c.submit(lambda: 1) == 1 + + await a.restart() + + while x.status != "cancelled": + await asyncio.sleep(0.1) + + assert await c.submit(lambda: 1) == 1 + + +@gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)]) +async def test_nanny_restart_timeout(c, s, a): x = await c.scatter(123) with captured_logger( logging.getLogger("distributed.nanny"), level=logging.ERROR ) as logger: - await a.restart(timeout=0.1) + await a.restart(timeout=0) out = logger.getvalue() assert "timed out" in out.lower() - start = time() while x.status != "cancelled": await asyncio.sleep(0.1) - assert time() < start + 7 + + assert await c.submit(lambda: 1) == 1 + + +@gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)]) +async def test_nanny_restart_timeout_stress(c, s, a): + x = await c.scatter(123) + restarts = [a.restart(timeout=random.random() * 0.1) for _ in range(100)] + await asyncio.gather(*restarts) + + while x.status != "cancelled": + await asyncio.sleep(0.1) + + assert await c.submit(lambda: 1) == 1 + assert len(s.workers) == 1 @gen_cluster( From 54db439dcf458388c5dcb8945304cc1101d9ac59 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 2 Sep 2024 15:08:25 +0200 Subject: [PATCH 5/6] randum up to one second --- distributed/tests/test_nanny.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index c3e473a7aa..ed62dede5a 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -241,7 +241,7 @@ async def test_nanny_restart_timeout(c, s, a): @gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)]) async def test_nanny_restart_timeout_stress(c, s, a): x = await c.scatter(123) - restarts = [a.restart(timeout=random.random() * 0.1) for _ in range(100)] + restarts = [a.restart(timeout=random.random()) for _ in range(100)] await asyncio.gather(*restarts) while x.status != "cancelled": From a32525303cd7c9c4c4f08050b14de0c7356497f4 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 2 Sep 2024 15:10:49 +0200 Subject: [PATCH 6/6] review comment --- distributed/tests/test_nanny.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index ed62dede5a..b05b7dc90c 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -609,10 +609,7 @@ async def test_worker_start_exception_while_killing(s): nanny = Nanny(s.address, worker_class=BrokenWorker) async def try_to_kill_nanny(): - while not nanny.process or nanny.process.status not in ( - Status.starting, # this is what we want - # Status.failed # we might've missed it already - ): + while not nanny.process or nanny.process.status != Status.starting: await asyncio.sleep(0) await nanny.kill()