Skip to content

Commit

Permalink
pythongh-109047: concurrent.futures catches exc on add_call_item_to_q…
Browse files Browse the repository at this point in the history
…ueue()

concurrent.futures: The *executor manager thread* now catches
exceptions when adding an item to the *call queue*. During Python
finalization, creating a new thread can now raise RuntimeError. Catch
the exception and call terminate_broken() in this case.

Add test_python_finalization_error() to test_concurrent_futures.

concurrent.futures._ExecutorManagerThread changes:

* terminate_broken() no longer calls shutdown_workers() since the
  queue is no longer working anymore (read and write ends of the
  queue pipe are closed).
* terminate_broken() now terminates child processes.
* wait_result_broken_or_wakeup() now uses the short form of
  traceback.format_exception().
* _ExecutorManagerThread.terminate_broken() now holds shutdown_lock
  to prevent race conditons with ProcessPoolExecutor.submit().

ProcessPoolExecutor changes:

* ProcessPoolExecutor.submit() now starts by checking if the executor
  is broken.

multiprocessing.Queue changes:

* Add _terminate_broken() method.
* _start_thread() sets _thread to None on exception to prevent
  leaking "dangling threads" even if the thread was not started
  yet.
  • Loading branch information
vstinner committed Sep 29, 2023
1 parent 3439cb0 commit 3137689
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 23 deletions.
49 changes: 31 additions & 18 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,12 @@ def run(self):
# Main loop for the executor manager thread.

while True:
self.add_call_item_to_queue()
try:
self.add_call_item_to_queue()
except BaseException as exc:
cause = format_exception(exc)
self.terminate_broken(cause)
return

result_item, is_broken, cause = self.wait_result_broken_or_wakeup()

Expand Down Expand Up @@ -425,8 +430,8 @@ def wait_result_broken_or_wakeup(self):
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as e:
cause = format_exception(type(e), e, e.__traceback__)
except BaseException as exc:
cause = format_exception(exc)

elif wakeup_reader in ready:
is_broken = False
Expand Down Expand Up @@ -463,7 +468,7 @@ def is_shutting_down(self):
return (_global_shutdown or executor is None
or executor._shutdown_thread)

def terminate_broken(self, cause):
def _terminate_broken(self, cause):
# Terminate the executor because it is in a broken state. The cause
# argument can be used to display more information on the error that
# lead the executor into becoming broken.
Expand Down Expand Up @@ -505,17 +510,14 @@ def terminate_broken(self, cause):
for p in self.processes.values():
p.terminate()

# Prevent queue writing to a pipe which is no longer read.
# https://github.com/python/cpython/issues/94777
self.call_queue._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
if sys.platform == 'win32':
self.call_queue._writer.close()
self.call_queue._terminate_broken()

# clean up resources
self.join_executor_internals()
self._join_executor_internals(broken=True)

def terminate_broken(self, cause):
with self.shutdown_lock:
self._terminate_broken(cause)

def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
Expand Down Expand Up @@ -558,15 +560,21 @@ def shutdown_workers(self):
break

def join_executor_internals(self):
self.shutdown_workers()
with self.shutdown_lock:
self._join_executor_internals()

def _join_executor_internals(self, broken=False):
if not broken:
self.shutdown_workers()
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
with self.shutdown_lock:
self.thread_wakeup.close()
self.thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
if broken:
p.terminate()
p.join()

def get_n_children_alive(self):
Expand Down Expand Up @@ -773,7 +781,13 @@ def _launch_processes(self):
for _ in range(len(self._processes), self._max_workers):
self._spawn_process()

def _check_broken(self):
if self._broken:
raise BrokenProcessPool(self._broken)

def _spawn_process(self):
self._check_broken()

p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
Expand All @@ -786,8 +800,7 @@ def _spawn_process(self):

def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
self._check_broken()
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
if _global_shutdown:
Expand Down
28 changes: 23 additions & 5 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ def cancel_join_thread(self):
except AttributeError:
pass

def _terminate_broken(self):
# Close a Queue on error.

# gh-94777: Prevent queue writing to a pipe which is no longer read.
self._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
if sys.platform == 'win32':
self.call_queue._writer.close()

self.close()
self.join_thread()

def _start_thread(self):
debug('Queue._start_thread()')

Expand All @@ -169,13 +183,17 @@ def _start_thread(self):
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
name='QueueFeederThread'
name='QueueFeederThread',
daemon=True,
)
self._thread.daemon = True

debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
try:
debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
except:
self._thread = None
raise

if not self._joincancelled:
self._jointhread = Finalize(
Expand Down
26 changes: 26 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import threading
import time
import unittest
from concurrent import futures
Expand Down Expand Up @@ -187,6 +188,31 @@ def test_max_tasks_early_shutdown(self):
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))

def test_python_finalization_error(self):
context = self.get_context()

# gh-109047: Create _ExecutorManagerThread, but block
# QueueFeederThread. Mock the threading.start_new_thread() function
# to inject RuntimeError: simulate the error raised during Python
# finalization.
orig_start_new_thread = threading._start_new_thread
nthread = 0
def mock_start_new_thread(func, *args):
nonlocal nthread
if nthread >= 1:
raise RuntimeError("can't create new thread at "
"interpreter shutdown")
nthread += 1
return orig_start_new_thread(func, *args)

with support.swap_attr(threading, '_start_new_thread',
mock_start_new_thread):
executor = self.executor_type(max_workers=2, mp_context=context)
with executor:
with self.assertRaises(BrokenProcessPool):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions
when adding an item to the *call queue*. During Python finalization, creating a
new thread can now raise :exc:`RuntimeError`. Catch the exception and call
``terminate_broken()`` in this case. Patch by Victor Stinner.

0 comments on commit 3137689

Please sign in to comment.