From a384425500d9042b83bb31f0aa315ab28ff7e2dd Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Thu, 14 Sep 2023 12:55:33 +0300 Subject: [PATCH] gh-109370: Support closing Connection and PipeConnection from other thread --- Lib/multiprocessing/connection.py | 107 ++++++++++++++++++++++-------- 1 file changed, 81 insertions(+), 26 deletions(-) diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 7c425a2d8e7034..fa5af0bec0d72a 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -42,7 +42,6 @@ BUFSIZE = 8192 # A very generous timeout when it comes to local connections... CONNECTION_TIMEOUT = 20. -WSA_OPERATION_ABORTED = 995 _mmap_counter = itertools.count() @@ -130,12 +129,13 @@ def __init__(self, handle, readable=True, writable=True): # XXX should we use util.Finalize instead of a __del__? def __del__(self): - if self._handle is not None: - self._close() + handle = self._handle + if handle is not None: + self._close(handle) def _check_closed(self): if self._handle is None: - raise OSError("handle is closed") + raise OSError(errno.EPIPE, "handle is closed") def _check_readable(self): if not self._readable: @@ -174,11 +174,10 @@ def fileno(self): def close(self): """Close the connection""" - if self._handle is not None: - try: - self._close() - finally: - self._handle = None + handle = self._handle + if handle is not None: + self._handle = None + self._close(handle) def send_bytes(self, buf, offset=0, size=None): """Send the bytes data from a bytes-like object""" @@ -275,19 +274,27 @@ class PipeConnection(_ConnectionBase): _got_empty_message = False _send_ov = None - def _close(self, _CloseHandle=_winapi.CloseHandle): + def _close(self, handle, *, _CloseHandle=_winapi.CloseHandle): ov = self._send_ov if ov is not None: # Interrupt WaitForMultipleObjects() in _send_bytes() ov.cancel() - _CloseHandle(self._handle) + _CloseHandle(handle) def _send_bytes(self, buf): if self._send_ov is not None: # A connection should only be used by a single thread raise ValueError("concurrent send_bytes() calls " "are not supported") - ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) + try: + ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) + except OSError as err: + if err.errno == errno.EBADF: + raise OSError(errno.EPIPE, "handle is closed") + raise + except TypeError: + self._check_closed() + raise self._send_ov = ov try: if err == _winapi.ERROR_IO_PENDING: @@ -300,7 +307,7 @@ def _send_bytes(self, buf): finally: self._send_ov = None nwritten, err = ov.GetOverlappedResult(True) - if err == WSA_OPERATION_ABORTED: + if err == _winapi.ERROR_OPERATION_ABORTED: # close() was called by another thread while # WaitForMultipleObjects() was waiting for the overlapped # operation. @@ -315,8 +322,16 @@ def _recv_bytes(self, maxsize=None): else: bsize = 128 if maxsize is None else min(maxsize, 128) try: - ov, err = _winapi.ReadFile(self._handle, bsize, - overlapped=True) + try: + ov, err = _winapi.ReadFile(self._handle, bsize, + overlapped=True) + except OSError as err: + if err.errno == errno.EBADF: + raise OSError(errno.EPIPE, "handle is closed") + raise + except TypeError: + self._check_closed() + raise try: if err == _winapi.ERROR_IO_PENDING: waitres = _winapi.WaitForMultipleObjects( @@ -341,20 +356,44 @@ def _recv_bytes(self, maxsize=None): raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") def _poll(self, timeout): - if (self._got_empty_message or - _winapi.PeekNamedPipe(self._handle)[0] != 0): - return True + try: + if (self._got_empty_message or + _winapi.PeekNamedPipe(self._handle)[0] != 0): + return True + except OSError as err: + if err.errno == errno.EBADF: + raise OSError(errno.EPIPE, "handle is closed") + raise + except TypeError: + self._check_closed() + raise return bool(wait([self], timeout)) def _get_more_data(self, ov, maxsize): buf = ov.getbuffer() f = io.BytesIO() f.write(buf) - left = _winapi.PeekNamedPipe(self._handle)[1] + try: + left = _winapi.PeekNamedPipe(self._handle)[1] + except OSError as err: + if err.errno == errno.EBADF: + raise OSError(errno.EPIPE, "handle is closed") + raise + except TypeError: + self._check_closed() + raise assert left > 0 if maxsize is not None and len(buf) + left > maxsize: self._bad_message_length() - ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) + try: + ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) + except OSError as err: + if err.errno == errno.EBADF: + raise OSError(errno.EPIPE, "handle is closed") + raise + except TypeError: + self._check_closed() + raise rbytes, err = ov.GetOverlappedResult(True) assert err == 0 assert rbytes == left @@ -369,20 +408,28 @@ class Connection(_ConnectionBase): """ if _winapi: - def _close(self, _close=_multiprocessing.closesocket): - _close(self._handle) + def _close(self, handle, *, _close=_multiprocessing.closesocket): + _close(handle) _write = _multiprocessing.send _read = _multiprocessing.recv else: - def _close(self, _close=os.close): - _close(self._handle) + def _close(self, handle, *, _close=os.close): + _close(handle) _write = os.write _read = os.read def _send(self, buf, write=_write): remaining = len(buf) while True: - n = write(self._handle, buf) + try: + n = write(self._handle, buf) + except OSError as err: + if err.errno == errno.EBADF: + raise OSError(errno.EPIPE, "handle is closed") + raise + except TypeError: + self._check_closed() + raise remaining -= n if remaining == 0: break @@ -393,7 +440,15 @@ def _recv(self, size, read=_read): handle = self._handle remaining = size while remaining > 0: - chunk = read(handle, remaining) + try: + chunk = read(handle, remaining) + except OSError as err: + if err.errno == errno.EBADF: + raise OSError(errno.EPIPE, "handle is closed") + raise + except TypeError: + self._check_closed() + raise n = len(chunk) if n == 0: if remaining == size: