Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-107219: Fix concurrent.futures terminate_broken() #109244

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ def terminate_broken(self, cause):
# https://github.com/python/cpython/issues/94777
self.call_queue._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
self.call_queue._writer.close()

# clean up resources
self.join_executor_internals()

Expand Down
18 changes: 18 additions & 0 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]

import errno
import io
import os
import sys
Expand Down Expand Up @@ -41,6 +42,7 @@
BUFSIZE = 8192
# A very generous timeout when it comes to local connections...
CONNECTION_TIMEOUT = 20.
WSA_OPERATION_ABORTED = 995
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same as _winapi.ERROR_OPERATION_ABORTED.

Copy link
Member Author

@vstinner vstinner Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm confused. I don't recall which doc I was looking to. WriteFile() is documented to return ERROR_OPERATION_ABORTED when it's canceled: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefile


_mmap_counter = itertools.count()

Expand Down Expand Up @@ -271,12 +273,22 @@ class PipeConnection(_ConnectionBase):
with FILE_FLAG_OVERLAPPED.
"""
_got_empty_message = False
_send_ov = None

def _close(self, _CloseHandle=_winapi.CloseHandle):
ov = self._send_ov
if ov is not None:
# Interrupt WaitForMultipleObjects() in _send_bytes()
ov.cancel()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio uses a similar code in ProactorEventLoop:

def _cancel_overlapped(self):
if self._ov is None:
return
try:
self._ov.cancel()
except OSError as exc:
context = {
'message': 'Cancelling an overlapped future failed',
'exception': exc,
'future': self,
}
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
self._ov = None

asyncio uses more advanced code around to handle more cases. For example, in asyncio, the cancel() API is part of the public API.

Here the cancellation is a standard action in the Windows Overlapped API. The cancellation is synchronous, it's easy!

Hopefully, we are not in the very complicated RegisterWaitWithQueue() case! This case requires an asynchronous cancellation which is really complicated to handle: the completion of the cancellation should be awaited!? See this horror story: https://vstinner.github.io/asyncio-proactor-cancellation-from-hell.html

_CloseHandle(self._handle)

def _send_bytes(self, buf):
if self._send_ov is not None:
# A connection should only be used by a single thread
raise ValueError("concurrent send_bytes() calls "
"are not supported")
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
self._send_ov = ov
try:
if err == _winapi.ERROR_IO_PENDING:
waitres = _winapi.WaitForMultipleObjects(
Expand All @@ -286,7 +298,13 @@ def _send_bytes(self, buf):
ov.cancel()
raise
finally:
self._send_ov = None
nwritten, err = ov.GetOverlappedResult(True)
if err == WSA_OPERATION_ABORTED:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What other value can it be? There is assert err == 0 below, so I guess that any error was unexpected.

Could we simply check that err is not zero here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to write a minimalist change: change at least code as possible. I introduce one new error, I added a check for this error, and that's all. I don't know the code enough to answer to your question. I'm not a multiprocessing or Windows API expert at all :-(

# close() was called by another thread while
# WaitForMultipleObjects() was waiting for the overlapped
# operation.
raise OSError(errno.EPIPE, "handle is closed")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to raise a BrokenPipeError exception here, since Queue._feed() has a special code path for that to ignore EPIPE errors silently:

except Exception as e:
if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
return

And concurrent.futures uses this code path for its "call queue" which is causing troubles here:

self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like we got lucky that callers were handling one thing we could raise! :)

Copy link
Member Author

@vstinner vstinner Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the beginning, I started by adding a new exception. But I chose to reuse the existing code instead. IMO BrokenPipeError perfectly makes sense for a PipeConnection.

assert err == 0
assert nwritten == len(buf)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Fix a race condition in ``concurrent.futures``. When a process in the
process pool was terminated abruptly (while the future was running or
pending), close the connection write end. If the call queue is blocked on
sending bytes to a worker process, closing the connection write end interrupts
the send, so the queue can be closed. Patch by Victor Stinner.