diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 2c02b79bfbd..da68e61a986 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3003,7 +3003,7 @@ async def test_log_remove_worker(c, s, a, b): events = {topic: [ev for _, ev in evs] for topic, evs in s.get_events().items()} for evs in events.values(): for ev in evs: - if ev["action"] == "retire-workers": + if ev.get("action", None) == "retire-workers": for k in ("retired", "could-not-retire"): ev[k] = {addr: "snip" for addr in ev[k]} if "stimulus_id" in ev: # Strip timestamp @@ -3083,6 +3083,7 @@ async def test_log_remove_worker(c, s, a, b): "worker": b.address, }, ], + "worker-get-client": [{"client": c.id, "timeout": 5, "worker": b.address}], } diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index f5e72225c3c..a7f8c236b31 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -363,3 +363,56 @@ def long_running(): assert len(res) == 2 assert res[a.address] > 25 assert res[b.address] > 25 + + +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_log_event(c, s, a): + # Run a task that spawns a worker client + def f(x): + with worker_client(timeout=10, separate_thread=True) as wc: + x = wc.submit(inc, x) + y = wc.submit(double, x) + result = x.result() + y.result() + return result + + future = c.submit(f, 1) + result = await future + assert result == 6 + + # Ensure a corresponding event is logged + for topic in ["worker-get-client", "worker-client"]: + events = [msg for t, msg in s.get_events().items() if t == topic] + assert len(events) == 1 + assert events[0][0][1] == { + "worker": a.address, + "timeout": 10, + "client": c.id, + } + + +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_log_event_implicit(c, s, a): + # Run a task that spawns a worker client + def f(x): + x = delayed(inc)(x) + y = delayed(double)(x) + result = x.compute() + y.compute() + return result + + future = c.submit(f, 1) + result = await future + assert result == 6 + + # Ensure a corresponding event is logged + events = [ + msg for topic, msg in s.get_events().items() if topic == "worker-get-client" + ] + assert len(events) == 1 + assert events[0][0][1] == { + "worker": a.address, + "timeout": 5, + "client": c.id, + } + # Do not log a `worker-client` since this client was created implicitly + events = [msg for topic, msg in s.get_events().items() if topic == "worker-client"] + assert len(events) == 0 diff --git a/distributed/worker.py b/distributed/worker.py index 18ef0aca86c..bb1c0637740 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2609,6 +2609,14 @@ def _get_client(self, timeout: float | None = None) -> Client: if not asynchronous: assert self._client.status == "running" + self.log_event( + "worker-get-client", + { + "client": self._client.id, + "timeout": timeout, + }, + ) + return self._client def get_current_task(self) -> Key: diff --git a/distributed/worker_client.py b/distributed/worker_client.py index 355156206d0..86965d4cf79 100644 --- a/distributed/worker_client.py +++ b/distributed/worker_client.py @@ -53,6 +53,13 @@ def worker_client(timeout=None, separate_thread=True): worker = get_worker() client = get_client(timeout=timeout) + worker.log_event( + "worker-client", + { + "client": client.id, + "timeout": timeout, + }, + ) with contextlib.ExitStack() as stack: if separate_thread: try: