Skip to content

Commit

Permalink
pythongh-109370: Support closing Connection and PipeConnection from o…
Browse files Browse the repository at this point in the history
…ther thread
  • Loading branch information
serhiy-storchaka committed Sep 14, 2023
1 parent 1f885df commit a384425
Showing 1 changed file with 81 additions and 26 deletions.
107 changes: 81 additions & 26 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
BUFSIZE = 8192
# A very generous timeout when it comes to local connections...
CONNECTION_TIMEOUT = 20.
WSA_OPERATION_ABORTED = 995

_mmap_counter = itertools.count()

Expand Down Expand Up @@ -130,12 +129,13 @@ def __init__(self, handle, readable=True, writable=True):
# XXX should we use util.Finalize instead of a __del__?

def __del__(self):
if self._handle is not None:
self._close()
handle = self._handle
if handle is not None:
self._close(handle)

def _check_closed(self):
if self._handle is None:
raise OSError("handle is closed")
raise OSError(errno.EPIPE, "handle is closed")

def _check_readable(self):
if not self._readable:
Expand Down Expand Up @@ -174,11 +174,10 @@ def fileno(self):

def close(self):
"""Close the connection"""
if self._handle is not None:
try:
self._close()
finally:
self._handle = None
handle = self._handle
if handle is not None:
self._handle = None
self._close(handle)

def send_bytes(self, buf, offset=0, size=None):
"""Send the bytes data from a bytes-like object"""
Expand Down Expand Up @@ -275,19 +274,27 @@ class PipeConnection(_ConnectionBase):
_got_empty_message = False
_send_ov = None

def _close(self, _CloseHandle=_winapi.CloseHandle):
def _close(self, handle, *, _CloseHandle=_winapi.CloseHandle):
ov = self._send_ov
if ov is not None:
# Interrupt WaitForMultipleObjects() in _send_bytes()
ov.cancel()
_CloseHandle(self._handle)
_CloseHandle(handle)

def _send_bytes(self, buf):
if self._send_ov is not None:
# A connection should only be used by a single thread
raise ValueError("concurrent send_bytes() calls "
"are not supported")
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
try:
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
self._send_ov = ov
try:
if err == _winapi.ERROR_IO_PENDING:
Expand All @@ -300,7 +307,7 @@ def _send_bytes(self, buf):
finally:
self._send_ov = None
nwritten, err = ov.GetOverlappedResult(True)
if err == WSA_OPERATION_ABORTED:
if err == _winapi.ERROR_OPERATION_ABORTED:
# close() was called by another thread while
# WaitForMultipleObjects() was waiting for the overlapped
# operation.
Expand All @@ -315,8 +322,16 @@ def _recv_bytes(self, maxsize=None):
else:
bsize = 128 if maxsize is None else min(maxsize, 128)
try:
ov, err = _winapi.ReadFile(self._handle, bsize,
overlapped=True)
try:
ov, err = _winapi.ReadFile(self._handle, bsize,
overlapped=True)
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
try:
if err == _winapi.ERROR_IO_PENDING:
waitres = _winapi.WaitForMultipleObjects(
Expand All @@ -341,20 +356,44 @@ def _recv_bytes(self, maxsize=None):
raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")

def _poll(self, timeout):
if (self._got_empty_message or
_winapi.PeekNamedPipe(self._handle)[0] != 0):
return True
try:
if (self._got_empty_message or
_winapi.PeekNamedPipe(self._handle)[0] != 0):
return True
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
return bool(wait([self], timeout))

def _get_more_data(self, ov, maxsize):
buf = ov.getbuffer()
f = io.BytesIO()
f.write(buf)
left = _winapi.PeekNamedPipe(self._handle)[1]
try:
left = _winapi.PeekNamedPipe(self._handle)[1]
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
assert left > 0
if maxsize is not None and len(buf) + left > maxsize:
self._bad_message_length()
ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
try:
ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
rbytes, err = ov.GetOverlappedResult(True)
assert err == 0
assert rbytes == left
Expand All @@ -369,20 +408,28 @@ class Connection(_ConnectionBase):
"""

if _winapi:
def _close(self, _close=_multiprocessing.closesocket):
_close(self._handle)
def _close(self, handle, *, _close=_multiprocessing.closesocket):
_close(handle)
_write = _multiprocessing.send
_read = _multiprocessing.recv
else:
def _close(self, _close=os.close):
_close(self._handle)
def _close(self, handle, *, _close=os.close):
_close(handle)
_write = os.write
_read = os.read

def _send(self, buf, write=_write):
remaining = len(buf)
while True:
n = write(self._handle, buf)
try:
n = write(self._handle, buf)
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
remaining -= n
if remaining == 0:
break
Expand All @@ -393,7 +440,15 @@ def _recv(self, size, read=_read):
handle = self._handle
remaining = size
while remaining > 0:
chunk = read(handle, remaining)
try:
chunk = read(handle, remaining)
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
n = len(chunk)
if n == 0:
if remaining == size:
Expand Down

0 comments on commit a384425

Please sign in to comment.