Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Catch cancelled error status from gateway #215

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pyzeebe/grpc_internals/zeebe_adapter_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def _should_retry(self):
async def _common_zeebe_grpc_errors(self, rpc_error: grpc.aio.AioRpcError):
if self.is_error_status(rpc_error, grpc.StatusCode.RESOURCE_EXHAUSTED):
raise ZeebeBackPressureError()
elif self.is_error_status(rpc_error, grpc.StatusCode.UNAVAILABLE):
elif self.is_error_status(
rpc_error, grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.CANCELLED
):
self._current_connection_retries += 1
if not self._should_retry():
await self._close()
Expand All @@ -45,8 +47,10 @@ async def _common_zeebe_grpc_errors(self, rpc_error: grpc.aio.AioRpcError):
raise rpc_error

@staticmethod
def is_error_status(rpc_error: grpc.aio.AioRpcError, status_code: grpc.StatusCode):
return rpc_error.code() == status_code
def is_error_status(
rpc_error: grpc.aio.AioRpcError, *status_codes: grpc.StatusCode
):
return rpc_error.code() in status_codes

async def _close(self):
try:
Expand Down
138 changes: 70 additions & 68 deletions tests/unit/grpc_internals/zeebe_adapter_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,75 +7,77 @@
from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase


def test_should_retry_no_current_retries(zeebe_adapter: ZeebeAdapterBase):
zeebe_adapter._max_connection_retries = 1
assert zeebe_adapter._should_retry()
class TestShouldRetry:
def test_returns_true_when_no_current_retries(
self, zeebe_adapter: ZeebeAdapterBase
):
zeebe_adapter._max_connection_retries = 1
assert zeebe_adapter._should_retry()


def test_should_retry_current_retries_over_max(zeebe_adapter: ZeebeAdapterBase):
zeebe_adapter._max_connection_retries = 1
zeebe_adapter._current_connection_retries = 1
assert not zeebe_adapter._should_retry()


@pytest.mark.asyncio
async def test_common_zeebe_grpc_error_internal(zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(
grpc.StatusCode.INTERNAL, None, None
)
with pytest.raises(ZeebeInternalError):
await zeebe_adapter._common_zeebe_grpc_errors(error)


@pytest.mark.asyncio
async def test_common_zeebe_grpc_error_back_pressure(zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(
grpc.StatusCode.RESOURCE_EXHAUSTED, None, None
)
with pytest.raises(ZeebeBackPressureError):
await zeebe_adapter._common_zeebe_grpc_errors(error)


@pytest.mark.asyncio
async def test_common_zeebe_grpc_error_gateway_unavailable(zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(
grpc.StatusCode.UNAVAILABLE, None, None
)
with pytest.raises(ZeebeGatewayUnavailableError):
await zeebe_adapter._common_zeebe_grpc_errors(error)


@pytest.mark.asyncio
async def test_common_zeebe_grpc_error_unkown_error(zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(
"FakeGrpcStatus", None, None
)
with pytest.raises(grpc.aio.AioRpcError):
await zeebe_adapter._common_zeebe_grpc_errors(error)


@pytest.mark.asyncio
async def test_close_after_retried_unavailable(zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(
grpc.StatusCode.UNAVAILABLE, None, None
)

zeebe_adapter._close = AsyncMock()
zeebe_adapter._max_connection_retries = 1
with pytest.raises(ZeebeGatewayUnavailableError):
await zeebe_adapter._common_zeebe_grpc_errors(error)

zeebe_adapter._close.assert_called_once()
def test_returns_false_when_current_retries_over_max(
self, zeebe_adapter: ZeebeAdapterBase
):
zeebe_adapter._max_connection_retries = 1
zeebe_adapter._current_connection_retries = 1
assert not zeebe_adapter._should_retry()


@pytest.mark.asyncio
async def test_close_after_retried_internal(zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(
grpc.StatusCode.INTERNAL, None, None
)
zeebe_adapter._close = AsyncMock()
zeebe_adapter._max_connection_retries = 1
with pytest.raises(ZeebeInternalError):
await zeebe_adapter._common_zeebe_grpc_errors(error)

zeebe_adapter._close.assert_called_once()
class TestCommonZeebeGrpcErrors:
async def test_raises_internal_error_on_internal_error_status(
self, zeebe_adapter: ZeebeAdapterBase
):
error = grpc.aio.AioRpcError(grpc.StatusCode.INTERNAL, None, None)
with pytest.raises(ZeebeInternalError):
await zeebe_adapter._common_zeebe_grpc_errors(error)

async def test_raises_back_pressure_error_on_resource_exhausted(
self, zeebe_adapter: ZeebeAdapterBase
):
error = grpc.aio.AioRpcError(grpc.StatusCode.RESOURCE_EXHAUSTED, None, None)
with pytest.raises(ZeebeBackPressureError):
await zeebe_adapter._common_zeebe_grpc_errors(error)

async def test_raises_gateway_unavailable_on_unavailable_status(
self,
zeebe_adapter: ZeebeAdapterBase,
):
error = grpc.aio.AioRpcError(grpc.StatusCode.UNAVAILABLE, None, None)
with pytest.raises(ZeebeGatewayUnavailableError):
await zeebe_adapter._common_zeebe_grpc_errors(error)

async def test_raises_gateway_unavailable_on_cancelled_status(
self,
zeebe_adapter: ZeebeAdapterBase,
):
error = grpc.aio.AioRpcError(grpc.StatusCode.CANCELLED, None, None)

with pytest.raises(ZeebeGatewayUnavailableError):
await zeebe_adapter._common_zeebe_grpc_errors(error)

async def test_reraises_unkown_error(
self,
zeebe_adapter: ZeebeAdapterBase,
):
error = grpc.aio.AioRpcError("FakeGrpcStatus", None, None)
with pytest.raises(grpc.aio.AioRpcError):
await zeebe_adapter._common_zeebe_grpc_errors(error)

async def test_closes_after_retries_exceeded(self, zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(grpc.StatusCode.UNAVAILABLE, None, None)

zeebe_adapter._close = AsyncMock()
zeebe_adapter._max_connection_retries = 1
with pytest.raises(ZeebeGatewayUnavailableError):
await zeebe_adapter._common_zeebe_grpc_errors(error)

zeebe_adapter._close.assert_called_once()

async def test_closes_after_internal_error(self, zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(grpc.StatusCode.INTERNAL, None, None)
zeebe_adapter._close = AsyncMock()
zeebe_adapter._max_connection_retries = 1
with pytest.raises(ZeebeInternalError):
await zeebe_adapter._common_zeebe_grpc_errors(error)

zeebe_adapter._close.assert_called_once()