Skip to content

Commit

Permalink
Restore mutex without compromising performance
Browse files Browse the repository at this point in the history
  • Loading branch information
temeddix committed Nov 18, 2023
1 parent 1e26094 commit b6b8cb4
Showing 1 changed file with 95 additions and 52 deletions.
147 changes: 95 additions & 52 deletions qasync/_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class _IocpProactor(windows_events.IocpProactor):
def __init__(self):
self.__events = []
super(_IocpProactor, self).__init__()
self._lock = QtCore.QMutex()

def select(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
Expand All @@ -71,36 +72,60 @@ def select(self, timeout=None):
return tmp

def close(self):
if self._iocp is None:
# already closed
return

# Cancel remaining registered operations.
for fut, ov, obj, callback in list(self._cache.values()):
if fut.cancelled():
# Nothing to do with cancelled futures
pass
elif isinstance(fut, windows_events._WaitCancelFuture):
# _WaitCancelFuture must not be cancelled
pass
else:
try:
fut.cancel()
except OSError as exc:
if self._loop is not None:
context = {
"message": "Cancelling a future failed",
"exception": exc,
"future": fut,
}
if fut._source_traceback:
context["source_traceback"] = fut._source_traceback
self._loop.call_exception_handler(context)

self._results = []

_winapi.CloseHandle(self._iocp)
self._iocp = None
self._logger.debug("Closing")
super(_IocpProactor, self).close()

# Wrap all I/O submission methods to acquire the internal lock first; listed
# in the order they appear in the base class source code.

def recv(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv(conn, nbytes, flags)

def recv_into(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv_into(conn, buf, flags)

def recvfrom(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recvfrom(conn, nbytes, flags)

def recvfrom_into(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recvfrom_into(conn, buf, flags)

def sendto(self, conn, buf, flags=0, addr=None):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).sendto(conn, buf, flags, addr)

def send(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).send(conn, buf, flags)

def accept(self, listener):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept(listener)

def connect(self, conn, address):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).connect(conn, address)

def sendfile(self, sock, file, offset, count):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).sendfile(sock, file, offset, count)

def accept_pipe(self, pipe):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept_pipe(pipe)

# connect_pipe() does not actually use the delayed completion machinery.

# This takes care of wait_for_handle() too.
def _wait_for_handle(self, handle, timeout, _is_cancel):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self)._wait_for_handle(
handle, timeout, _is_cancel
)

def _poll(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
Expand All @@ -116,33 +141,48 @@ def _poll(self, timeout=None):
raise ValueError("timeout too big")

while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
ms = 0

err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue

if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))

# Remove unregistered futures
for ov in self._unregistered:
self._cache.pop(ov.address, None)
self._unregistered.clear()
with QtCore.QMutexLocker(self._lock):
err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
self._logger.debug(
"GetQueuedCompletionStatus() returned an unexpected "
+ "event: err=%s transferred=%s key=%#x address=%#x",
err,
transferred,
key,
address,
)
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue

if obj in self._stopped_serving:
# FIXME: Seems like this will ultimately call Future.cancel(),
# which then would end up calling self._loop.call_soon() on the
# poller thread, which is not thread-safe.
f.cancel()
# Don't call the callback if _register() already read the result or
# if the overlapped has been cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))

# FIXME: Need to check logic around self._unregistered here: Do we rely
# on no new events being registered between the GetQueuedCompletionStatus
# call and this?
with QtCore.QMutexLocker(self._lock):
# Remove unregistered futures
for ov in self._unregistered:
self._cache.pop(ov.address, None)
self._unregistered.clear()


@with_logger
Expand All @@ -153,9 +193,11 @@ def __init__(self, proactor, parent):
self.__stop = False
self.__proactor = proactor
self.__sig_events = parent.sig_events
self.__semaphore = QtCore.QSemaphore()

def start(self):
super().start()
self.__semaphore.acquire()

def stop(self):
self.__stop = True
Expand All @@ -164,6 +206,7 @@ def stop(self):

def run(self):
self._logger.debug("Thread started")
self.__semaphore.release()

while not self.__stop:
events = self.__proactor.select(0.01)
Expand Down

0 comments on commit b6b8cb4

Please sign in to comment.