From be707dab54bcbc27d3075d19616d5b99c93d5bb8 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 15 Feb 2024 16:13:39 -0800 Subject: [PATCH] WorkerPool: Wait for previous task in _try_send_to_primary_thread In order to prevent tasks from running in a non-main thread, wait for the previous task inside _try_send_to_primary_thread, then schedule the next task. Add a main_thread_only execmodel to distinguish this new behavior from the existing thread execmodel, since users of the thread execmodel expect that tasks can run in multiple threads concurrently. Closes: https://github.com/pytest-dev/execnet/issues/96 --- doc/basics.rst | 10 +++++----- src/execnet/gateway_base.py | 40 +++++++++++++++++++++++++++++++------ testing/conftest.py | 6 ++++-- testing/test_termination.py | 2 +- testing/test_threadpool.py | 4 ++-- 5 files changed, 46 insertions(+), 16 deletions(-) diff --git a/doc/basics.rst b/doc/basics.rst index aa6dabaf..f0eebd85 100644 --- a/doc/basics.rst +++ b/doc/basics.rst @@ -138,14 +138,14 @@ processes then you often want to call ``group.terminate()`` yourself and specify a larger or not timeout. -threading models: gevent, eventlet, thread -=========================================== +threading models: gevent, eventlet, thread, main_thread_only +==================================================================== .. versionadded:: 1.2 (status: experimental!) -execnet supports "thread", "eventlet" and "gevent" as thread models -on each of the two sides. You need to decide which model to use -before you create any gateways:: +execnet supports "main_thread_only", "thread", "eventlet" and "gevent" +as thread models on each of the two sides. You need to decide which +model to use before you create any gateways:: # content of threadmodel.py import execnet diff --git a/src/execnet/gateway_base.py b/src/execnet/gateway_base.py index c055d64f..66efdfe6 100644 --- a/src/execnet/gateway_base.py +++ b/src/execnet/gateway_base.py @@ -252,7 +252,7 @@ def Event(self): def get_execmodel(backend): if hasattr(backend, "backend"): return backend - if backend == "thread": + if backend in ("thread", "main_thread_only"): return ThreadExecModel() elif backend == "eventlet": return EventletExecModel() @@ -322,7 +322,7 @@ def __init__(self, execmodel, hasprimary=False): self._shuttingdown = False self._waitall_events = [] if hasprimary: - if self.execmodel.backend != "thread": + if self.execmodel.backend not in ("thread", "main_thread_only"): raise ValueError("hasprimary=True requires thread model") self._primary_thread_task_ready = self.execmodel.Event() else: @@ -332,7 +332,7 @@ def integrate_as_primary_thread(self): """integrate the thread with which we are called as a primary thread for executing functions triggered with spawn(). """ - assert self.execmodel.backend == "thread", self.execmodel + assert self.execmodel.backend in ("thread", "main_thread_only"), self.execmodel primary_thread_task_ready = self._primary_thread_task_ready # interacts with code at REF1 while 1: @@ -345,7 +345,11 @@ def integrate_as_primary_thread(self): with self._running_lock: if self._shuttingdown: break - primary_thread_task_ready.clear() + # Only clear if _try_send_to_primary_thread has not + # yet set the next self._primary_thread_task reply + # after waiting for this one to complete. + if reply is self._primary_thread_task: + primary_thread_task_ready.clear() def trigger_shutdown(self): with self._running_lock: @@ -376,6 +380,19 @@ def _try_send_to_primary_thread(self, reply): # wake up primary thread primary_thread_task_ready.set() return True + elif ( + self.execmodel.backend == "main_thread_only" + and self._primary_thread_task is not None + ): + self._primary_thread_task.waitfinish() + self._primary_thread_task = reply + # wake up primary thread (it's okay if this is already set + # because we waited for the previous task to finish above + # and integrate_as_primary_thread will not clear it when + # it enters self._running_lock if it detects that a new + # task is available) + primary_thread_task_ready.set() + return True return False def spawn(self, func, *args, **kwargs): @@ -1106,7 +1123,18 @@ def join(self, timeout=None): class WorkerGateway(BaseGateway): def _local_schedulexec(self, channel, sourcetask): sourcetask = loads_internal(sourcetask) - self._execpool.spawn(self.executetask, (channel, sourcetask)) + if self.execmodel.backend == "main_thread_only": + # TODO: Maybe use something like queue.Queue to queue an asynchronous + # spawn here in order to avoid using another thread. + import threading + + t = threading.Thread( + target=self._execpool.spawn, + args=(self.executetask, (channel, sourcetask)), + ) + t.start() + else: + self._execpool.spawn(self.executetask, (channel, sourcetask)) def _terminate_execution(self): # called from receiverthread @@ -1132,7 +1160,7 @@ def serve(self): def trace(msg): self._trace("[serve] " + msg) - hasprimary = self.execmodel.backend == "thread" + hasprimary = self.execmodel.backend in ("thread", "main_thread_only") self._execpool = WorkerPool(self.execmodel, hasprimary=hasprimary) trace("spawning receiver thread") self._initreceive() diff --git a/testing/conftest.py b/testing/conftest.py index 9ec44d30..166c7c4d 100644 --- a/testing/conftest.py +++ b/testing/conftest.py @@ -124,7 +124,7 @@ def anypython(request): pytest.skip(f"no {name} found") if "execmodel" in request.fixturenames and name != "sys.executable": backend = request.getfixturevalue("execmodel").backend - if backend != "thread": + if backend not in ("thread", "main_thread_only"): pytest.xfail(f"cannot run {backend!r} execmodel with bare {name}") return executable @@ -173,7 +173,9 @@ def gw(request, execmodel, group): return gw -@pytest.fixture(params=["thread", "eventlet", "gevent"], scope="session") +@pytest.fixture( + params=["thread", "main_thread_only", "eventlet", "gevent"], scope="session" +) def execmodel(request): if request.param != "thread": pytest.importorskip(request.param) diff --git a/testing/test_termination.py b/testing/test_termination.py index 282c1979..9cefaf20 100644 --- a/testing/test_termination.py +++ b/testing/test_termination.py @@ -36,7 +36,7 @@ def doit(): def test_endmarker_delivery_on_remote_killterm(makegateway, execmodel): - if execmodel.backend != "thread": + if execmodel.backend not in ("thread", "main_thread_only"): pytest.xfail("test and execnet not compatible to greenlets yet") gw = makegateway("popen") q = execmodel.queue.Queue() diff --git a/testing/test_threadpool.py b/testing/test_threadpool.py index 4d1edd8c..0162e2ea 100644 --- a/testing/test_threadpool.py +++ b/testing/test_threadpool.py @@ -164,7 +164,7 @@ def wait_then_put(): def test_primary_thread_integration(execmodel): - if execmodel.backend != "thread": + if execmodel.backend not in ("thread", "main_thread_only"): with pytest.raises(ValueError): WorkerPool(execmodel=execmodel, hasprimary=True) return @@ -188,7 +188,7 @@ def func(): def test_primary_thread_integration_shutdown(execmodel): - if execmodel.backend != "thread": + if execmodel.backend not in ("thread", "main_thread_only"): pytest.skip("can only run with threading") pool = WorkerPool(execmodel=execmodel, hasprimary=True) queue = execmodel.queue.Queue()