Skip to content

Commit

Permalink
enh(jobstore): Add logic to deal with django.db.OperationalError.
Browse files Browse the repository at this point in the history
Resolves #145.
  • Loading branch information
jcass77 committed Jun 13, 2021
1 parent a5ab320 commit 4436ddc
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 3 deletions.
8 changes: 8 additions & 0 deletions django_apscheduler/jobstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django import db
from django.db import transaction, IntegrityError

from django_apscheduler import util
from django_apscheduler.models import DjangoJob, DjangoJobExecution
from django_apscheduler.util import (
get_apscheduler_datetime,
Expand Down Expand Up @@ -200,6 +201,7 @@ def __init__(self, pickle_protocol: int = pickle.HIGHEST_PROTOCOL):
super().__init__()
self.pickle_protocol = pickle_protocol

@util.retry_on_db_operational_error
def lookup_job(self, job_id: str) -> Union[None, AppSchedulerJob]:
try:
job_state = DjangoJob.objects.get(id=job_id).job_state
Expand All @@ -212,6 +214,7 @@ def get_due_jobs(self, now) -> List[AppSchedulerJob]:
dt = get_django_internal_datetime(now)
return self._get_jobs(next_run_time__lte=dt)

@util.retry_on_db_operational_error
def get_next_run_time(self):
try:
job = DjangoJob.objects.filter(next_run_time__isnull=False).earliest(
Expand All @@ -228,6 +231,7 @@ def get_all_jobs(self):

return jobs

@util.retry_on_db_operational_error
def add_job(self, job: AppSchedulerJob):
with transaction.atomic():
try:
Expand All @@ -239,6 +243,7 @@ def add_job(self, job: AppSchedulerJob):
except IntegrityError:
raise ConflictingIdError(job.id)

@util.retry_on_db_operational_error
def update_job(self, job: AppSchedulerJob):
# Acquire lock for update
with transaction.atomic():
Expand All @@ -255,12 +260,14 @@ def update_job(self, job: AppSchedulerJob):
except DjangoJob.DoesNotExist:
raise JobLookupError(job.id)

@util.retry_on_db_operational_error
def remove_job(self, job_id: str):
try:
DjangoJob.objects.get(id=job_id).delete()
except DjangoJob.DoesNotExist:
raise JobLookupError(job_id)

@util.retry_on_db_operational_error
def remove_all_jobs(self):
# Implicit: will also delete all DjangoJobExecutions due to on_delete=models.CASCADE
DjangoJob.objects.all().delete()
Expand All @@ -277,6 +284,7 @@ def _reconstitute_job(self, job_state):

return job

@util.retry_on_db_operational_error
def _get_jobs(self, **filters):
jobs = []
failed_job_ids = set()
Expand Down
3 changes: 2 additions & 1 deletion django_apscheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class DjangoJobExecution(models.Model):
objects = DjangoJobExecutionManager()

@classmethod
@util.retry_on_db_operational_error
def atomic_update_or_create(
cls,
lock,
Expand All @@ -136,7 +137,7 @@ def atomic_update_or_create(
traceback: str = None,
) -> "DjangoJobExecution":
"""
Uses an APScheduler lock to ensures that only one database entry can be created / updated at a time.
Uses an APScheduler lock to ensure that only one database entry can be created / updated at a time.
This keeps django_apscheduler in sync with APScheduler and maintains a 1:1 mapping between APScheduler events
that are triggered and the corresponding DjangoJobExecution model instances that are persisted to the database.
Expand Down
62 changes: 62 additions & 0 deletions django_apscheduler/util.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import logging
from datetime import datetime
from functools import wraps

from apscheduler.schedulers.base import BaseScheduler
from django import db
from django.conf import settings
from django.utils import formats
from django.utils import timezone

logger = logging.getLogger(__name__)


def get_dt_format() -> str:
"""Return the configured format for displaying datetimes in the Django admin views"""
Expand Down Expand Up @@ -45,3 +50,60 @@ def get_apscheduler_datetime(dt: datetime, scheduler: BaseScheduler) -> datetime
return timezone.make_aware(dt, timezone=scheduler.timezone)

return dt


def retry_on_db_operational_error(func):
"""
This decorator can be used to wrap a database-related method so that it will be retried when a
django.db.OperationalError is encountered.
The rationale is that django.db.OperationalError is usually raised when attempting to use an old database
connection that the database backend has since closed. Closing the Django connection as well, and re-trying with
a fresh connection, is usually sufficient to solve the problem.
It is a reluctant workaround for users that persistently have issues with stale database connections (most notably:
2006, 'MySQL server has gone away').
The recommended approach is still to rather install a database connection pooler (like pgbouncer), to take care of
database connection management for you, but the issue has been raised enough times by different individuals that a
workaround is probably justified.
CAUTION: any method that this decorator is applied to MUST be idempotent (i.e. the method can be retried a second
time without any unwanted side effects). If your method performs any actions before the django.db.OperationalError
is raised then those will be repeated. If you don't want that to happen then it would be best to handle the
django.db.OperationalError exception manually and call `db.close_old_connections()` in an appropriate fashion
inside your method instead.
The following list of alternative workarounds were also considered:
1. Calling db.close_old_connections() pre-emptively before the job store executes a DB operation: this would break
Django's standard connection management. For example, if the `CONN_MAX_AGE` setting is set to 0, a new connection
will be required for *every* database operation (as opposed to at the end of every *request* like in the Django
standard). The database overhead, and associated performance penalty, that this approach would impose feel
unreasonable. See: https://docs.djangoproject.com/en/dev/ref/settings/#std:setting-CONN_MAX_AGE.
2. Using a custom QuerySet or database backend that handles django.db.OperationalError automatically: this would
be more convenient than having to decorate individual methods, but it would also break when a DB operation needs
to be re-tried as part of an atomic transaction. See: https://github.com/django/django/pull/2740
3. Pinging the database before each query to see if it is still available: django-apscheduler used to make use of
this approach (see: https://github.com/jcass77/django-apscheduler/blob/9ac06b33d19961da6c36d5ac814d4338beb11309/django_apscheduler/models.py#L16-L51)
Injecting an additional database query, on an arbitrary schedule, seems like an unreasonable thing to do,
especially considering that doing so would probably be unnecessary for users that already make use of a database
connection pooler.
"""

@wraps(func)
def func_wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
except db.OperationalError as e:
logger.warning(
f"DB error executing '{func.__name__}' ({e}). Retrying with a new DB connection..."
)
db.close_old_connections()
result = func(*args, **kwargs)

return result

return func_wrapper
3 changes: 3 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ This changelog is used to track all major changes to django-apscheduler.
- The Django admin page will now show a list of all the manually triggered jobs that could not be completed
before `settings.APSCHEDULER_RUN_NOW_TIMEOUT` seconds elapsed.
- Make more of the string output on the admin page Django-translatable.
- Introduce a `retry_on_db_operational_error` utility decorator for retrying database-related operations when
a `django.db.OperationalError` is encountered (
Resolves [#145](https://github.com/jcass77/django-apscheduler/issues/145)).

## v0.5.2 (2021-01-28)

Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
],
keywords="django apscheduler django-apscheduler",
packages=find_packages(exclude=("tests",)),
install_requires=["django>=2.2", "apscheduler>=3.2,<4.0", ],
install_requires=[
"django>=2.2",
"apscheduler>=3.2,<4.0",
],
zip_safe=False,
)
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@
from apscheduler.job import Job
from apscheduler.schedulers.base import BaseScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from django import db
from django.db import transaction

from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJob


def raise_db_operational_error(*args, **kwargs):
"""Helper method for triggering a db.OperationalError as a side effect of executing mocked DB operations"""
raise db.OperationalError("Some DB-related error")


class DummyScheduler(BaseScheduler):
def __init__(self, *args, **kwargs):
super(DummyScheduler, self).__init__(*args, **kwargs)
Expand Down
113 changes: 112 additions & 1 deletion tests/test_jobstores.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import warnings
from datetime import datetime
from unittest import mock

import pytest
from apscheduler import events
from apscheduler.events import JobExecutionEvent, JobSubmissionEvent
from django import db
from django.utils import timezone

from django_apscheduler.jobstores import (
Expand All @@ -12,6 +14,7 @@
register_events,
)
from django_apscheduler.models import DjangoJob, DjangoJobExecution
from tests import conftest
from tests.conftest import DummyScheduler, dummy_job


Expand Down Expand Up @@ -167,7 +170,115 @@ class TestDjangoJobStore:
See 'test_apscheduler_jobstore.py' for details
"""

pass
@pytest.mark.django_db(transaction=True)
def test_lookup_job_does_retry_on_db_operational_error(self, jobstore):
with mock.patch.object(db.connection, "close") as close_mock:
with pytest.raises(db.OperationalError, match="Some DB-related error"):
with mock.patch(
"django_apscheduler.jobstores.DjangoJob.objects.get",
side_effect=conftest.raise_db_operational_error,
):
jobstore.lookup_job("some job")

assert close_mock.call_count == 1

@pytest.mark.django_db(transaction=True)
def test_get_due_jobs_does_retry_on_db_operational_error(self, jobstore):
with mock.patch.object(db.connection, "close") as close_mock:
with pytest.raises(db.OperationalError, match="Some DB-related error"):
with mock.patch(
"django_apscheduler.jobstores.DjangoJob.objects.filter",
side_effect=conftest.raise_db_operational_error,
):
jobstore.get_due_jobs(datetime(2016, 5, 3))

assert close_mock.call_count == 1

@pytest.mark.django_db(transaction=True)
def test_get_next_run_time_does_retry_on_db_operational_error(self, jobstore):
with mock.patch.object(db.connection, "close") as close_mock:
with pytest.raises(db.OperationalError, match="Some DB-related error"):
with mock.patch(
"django_apscheduler.jobstores.DjangoJob.objects.filter",
side_effect=conftest.raise_db_operational_error,
):
jobstore.get_next_run_time()

assert close_mock.call_count == 1

@pytest.mark.django_db(transaction=True)
def test_add_job_does_retry_on_db_operational_error(self, jobstore, create_job):
job = create_job(
func=dummy_job,
trigger="date",
trigger_args={"run_date": datetime(2016, 5, 3)},
id="test",
)

with mock.patch.object(db.connection, "close") as close_mock:
with pytest.raises(db.OperationalError, match="Some DB-related error"):
with mock.patch(
"django_apscheduler.jobstores.DjangoJob.objects.create",
side_effect=conftest.raise_db_operational_error,
):
jobstore.add_job(job)

assert close_mock.call_count == 1

@pytest.mark.django_db(transaction=True)
def test_update_job_does_retry_on_db_operational_error(self, jobstore, create_job):
job = create_job(
func=dummy_job,
trigger="date",
trigger_args={"run_date": datetime(2016, 5, 3)},
id="test",
)

with mock.patch.object(db.connection, "close") as close_mock:
with pytest.raises(db.OperationalError, match="Some DB-related error"):
with mock.patch(
"django_apscheduler.jobstores.DjangoJob.objects.get",
side_effect=conftest.raise_db_operational_error,
):
jobstore.update_job(job)

assert close_mock.call_count == 1

@pytest.mark.django_db(transaction=True)
def test_remove_job_does_retry_on_db_operational_error(self, jobstore):
with mock.patch.object(db.connection, "close") as close_mock:
with pytest.raises(db.OperationalError, match="Some DB-related error"):
with mock.patch(
"django_apscheduler.jobstores.DjangoJob.objects.get",
side_effect=conftest.raise_db_operational_error,
):
jobstore.remove_job("some job")

assert close_mock.call_count == 1

@pytest.mark.django_db(transaction=True)
def test_remove_all_jobs_does_retry_on_db_operational_error(self, jobstore):
with mock.patch.object(db.connection, "close") as close_mock:
with pytest.raises(db.OperationalError, match="Some DB-related error"):
with mock.patch(
"django_apscheduler.jobstores.DjangoJob.objects.all",
side_effect=conftest.raise_db_operational_error,
):
jobstore.remove_all_jobs()

assert close_mock.call_count == 1

@pytest.mark.django_db(transaction=True)
def test_get_jobs_does_retry_on_db_operational_error(self, jobstore):
with mock.patch.object(db.connection, "close") as close_mock:
with pytest.raises(db.OperationalError, match="Some DB-related error"):
with mock.patch(
"django_apscheduler.jobstores.DjangoJob.objects.filter",
side_effect=conftest.raise_db_operational_error,
):
jobstore._get_jobs()

assert close_mock.call_count == 1


@pytest.mark.django_db
Expand Down
32 changes: 32 additions & 0 deletions tests/test_models.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import logging
from datetime import timedelta
from threading import RLock
from unittest import mock

import pytest
from apscheduler import events
from django import db
from django.utils import timezone

from django_apscheduler.models import DjangoJobExecution, DjangoJob
from tests import conftest

logging.basicConfig()

Expand Down Expand Up @@ -139,6 +142,35 @@ def test_atomic_update_or_create_ignores_late_submission_events(
assert ex.duration is None
assert ex.finished is None

@pytest.mark.django_db(transaction=True)
def test_test_atomic_update_or_create_does_retry_on_db_operational_error(
self, request, jobstore
):
now = timezone.now()
job = DjangoJob.objects.create(id="test_job", next_run_time=now)
request.addfinalizer(job.delete)

ex = DjangoJobExecution.objects.create(
job_id=job.id,
run_time=job.next_run_time - timedelta(seconds=5),
status=DjangoJobExecution.SENT,
)

with mock.patch.object(db.connection, "close") as close_mock:
with pytest.raises(db.OperationalError, match="Some DB-related error"):
with mock.patch(
"django_apscheduler.models.DjangoJobExecution.objects.select_for_update",
side_effect=conftest.raise_db_operational_error,
):
DjangoJobExecution.atomic_update_or_create(
RLock(),
ex.job_id,
ex.run_time,
DjangoJobExecution.SUCCESS,
)

assert close_mock.call_count == 1

@pytest.mark.django_db
def test_str(self, request):
now = timezone.now()
Expand Down
Loading

0 comments on commit 4436ddc

Please sign in to comment.