Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix edge case where a Linearizer could get stuck #12358

Merged
merged 19 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12358.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where `Linearizer`s could get stuck if a cancellation were to happen at the wrong time.
6 changes: 5 additions & 1 deletion synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,11 @@ async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry:
#
# This needs to happen while we hold the lock. We could put it on the
# exit path, but that would slow down the uncontended case.
await self._clock.sleep(0)
try:
await self._clock.sleep(0)
except CancelledError:
self._release_lock(key, entry)
raise

return entry

Expand Down
51 changes: 47 additions & 4 deletions tests/util/test_linearizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Hashable, Tuple
from typing import Hashable, Tuple

from typing_extensions import Protocol

from twisted.internet import defer, reactor
from twisted.internet.base import ReactorBase
Expand All @@ -25,10 +27,15 @@
from tests import unittest


class UnblockFunction(Protocol):
def __call__(self, pump_reactor: bool = True) -> None:
...


class LinearizerTestCase(unittest.TestCase):
def _start_task(
self, linearizer: Linearizer, key: Hashable
) -> Tuple["Deferred[None]", "Deferred[None]", Callable[[], None]]:
) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]:
"""Starts a task which acquires the linearizer lock, blocks, then completes.

Args:
Expand All @@ -52,11 +59,12 @@ async def task() -> None:

d = defer.ensureDeferred(task())

def unblock() -> None:
def unblock(pump_reactor: bool = True) -> None:
unblock_d.callback(None)
# The next task, if it exists, will acquire the lock and require a kick of
# the reactor to advance.
self._pump()
if pump_reactor:
self._pump()

return d, acquired_d, unblock

Expand Down Expand Up @@ -212,3 +220,38 @@ def test_cancellation(self) -> None:
)
unblock3()
self.successResultOf(d3)

def test_cancellation_during_sleep(self) -> None:
"""Tests cancellation during the sleep just after waiting for a `Linearizer`."""
linearizer = Linearizer()

key = object()

d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)

# Create a second task, waiting for the first task.
d2, acquired_d2, _ = self._start_task(linearizer, key)
self.assertFalse(acquired_d2.called)

# Create a third task, waiting for the second task.
d3, acquired_d3, unblock3 = self._start_task(linearizer, key)
self.assertFalse(acquired_d3.called)

# Once the first task completes, cancel the waiting second task while it is
# sleeping just after acquiring the lock.
unblock1(pump_reactor=False)
self.successResultOf(d1)
d2.cancel()
self._pump()

self.assertTrue(d2.called)
self.failureResultOf(d2, CancelledError)

# The third task should continue running.
self.assertTrue(
acquired_d3.called,
"Third task did not get the lock after the second task was cancelled",
)
unblock3()
self.successResultOf(d3)