Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(task-processor): catch all exceptions #3737

Merged
merged 1 commit into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions api/task_processor/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import traceback
from threading import Thread

from django.db import DatabaseError
from django.utils import timezone

from task_processor.processor import run_recurring_tasks, run_tasks
Expand Down Expand Up @@ -36,14 +35,14 @@ def run_iteration(self) -> None:
try:
run_tasks(self.queue_pop_size)
run_recurring_tasks(self.queue_pop_size)
except DatabaseError as e:
except Exception as e:
# To prevent task threads from dying if they get an error retrieving the tasks from the
# database this will allow the thread to continue trying to retrieve tasks if it can
# successfully re-establish a connection to the database.
# TODO: is this also what is causing tasks to get stuck as locked? Can we unlock
# tasks here?

logger.error("Received database error retrieving tasks: %s.", e)
logger.error("Received error retrieving tasks: %s.", e)
logger.debug(traceback.format_exc())

def stop(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
import logging
from typing import Type

import pytest
from django.db import DatabaseError
from pytest_django.fixtures import SettingsWrapper
from pytest_mock import MockerFixture

from task_processor.threads import TaskRunner
from tests.unit.task_processor.conftest import GetTaskProcessorCaplog


def test_task_runner_is_resilient_to_database_errors(
@pytest.mark.parametrize(
"exception_class, exception_message",
[(DatabaseError, "Database error"), (Exception, "Generic error")],
)
def test_task_runner_is_resilient_to_errors(
db: None,
mocker: MockerFixture,
get_task_processor_caplog: GetTaskProcessorCaplog,
settings: SettingsWrapper,
exception_class: Type[Exception],
exception_message: str,
) -> None:
# Given
caplog = get_task_processor_caplog(logging.DEBUG)

task_runner = TaskRunner()
mocker.patch(
"task_processor.threads.run_tasks", side_effect=DatabaseError("Database error")
"task_processor.threads.run_tasks",
side_effect=exception_class(exception_message),
)

# When
Expand All @@ -31,7 +38,7 @@ def test_task_runner_is_resilient_to_database_errors(
assert caplog.records[0].levelno == logging.ERROR
assert (
caplog.records[0].message
== "Received database error retrieving tasks: Database error."
== f"Received error retrieving tasks: {exception_message}."
)

assert caplog.records[1].levelno == logging.DEBUG
Expand Down