Skip to content

Commit

Permalink
WorkerPool: Wait for previous task in _try_send_to_primary_thread
Browse files Browse the repository at this point in the history
In order to prevent tasks from running in a non-main thread,
wait for the previous task inside _try_send_to_primary_thread,
then schedule the next task.

Closes: pytest-dev#96
  • Loading branch information
zmedico committed Feb 15, 2024
1 parent 372168e commit 8b37937
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions src/execnet/gateway_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,11 @@ def integrate_as_primary_thread(self):
with self._running_lock:
if self._shuttingdown:
break
primary_thread_task_ready.clear()
# Only clear if _try_send_to_primary_thread has not
# yet set the next self._primary_thread_task reply
# after waiting for this one to complete.
if reply is self._primary_thread_task:
primary_thread_task_ready.clear()

def trigger_shutdown(self):
with self._running_lock:
Expand All @@ -371,11 +375,19 @@ def _try_send_to_primary_thread(self, reply):
# note that we should be called with _running_lock hold
primary_thread_task_ready = self._primary_thread_task_ready
if primary_thread_task_ready is not None:
if not primary_thread_task_ready.is_set():
self._primary_thread_task = reply
# wake up primary thread
primary_thread_task_ready.set()
return True
if (
primary_thread_task_ready.is_set()
and self._primary_thread_task is not None
):
self._primary_thread_task.waitfinish()
self._primary_thread_task = reply
# wake up primary thread (it's okay if this is already set
# because we waited for the previous task to finish above
# and integrate_as_primary_thread will not clear it when
# it enters self._running_lock if it detects that a new
# task is available)
primary_thread_task_ready.set()
return True
return False

def spawn(self, func, *args, **kwargs):
Expand Down

0 comments on commit 8b37937

Please sign in to comment.