Skip to content

Commit

Permalink
Fix bug allowing task instances to survive when dagrun_timeout is exc…
Browse files Browse the repository at this point in the history
…eeded (#14321)

closes: #12912
related: #13407
  • Loading branch information
RNHTTR authored Mar 5, 2021
1 parent 9f37af2 commit 09327ba
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
14 changes: 11 additions & 3 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1696,10 +1696,18 @@ def _schedule_dag_run(
and dag.dagrun_timeout
and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
):
dag_run.state = State.FAILED
dag_run.end_date = timezone.utcnow()
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
dag_run.set_state(State.FAILED)
unfinished_task_instances = (
session.query(TI)
.filter(TI.dag_id == dag_run.dag_id)
.filter(TI.execution_date == dag_run.execution_date)
.filter(TI.state.in_(State.unfinished))
)
for task_instance in unfinished_task_instances:
task_instance.state = State.SKIPPED
session.merge(task_instance)
session.flush()
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)

# Work out if we should allow creating a new DagRun now?
self._update_dag_next_dagruns([session.query(DagModel).get(dag_run.dag_id)], session)
Expand Down
63 changes: 63 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import unittest
from datetime import timedelta
from tempfile import NamedTemporaryFile, mkdtemp
from time import sleep
from unittest import mock
from unittest.mock import MagicMock, patch
from zipfile import ZipFile
Expand Down Expand Up @@ -3813,6 +3814,68 @@ def test_do_schedule_max_active_runs_upstream_failed(self):
ti = run2.get_task_instance(task1.task_id, session)
assert ti.state == State.QUEUED

def test_do_schedule_max_active_runs_dag_timed_out(self):
"""Test that tasks are set to a finished state when their DAG times out"""

dag = DAG(
dag_id='test_max_active_run_with_dag_timed_out',
start_date=DEFAULT_DATE,
schedule_interval='@once',
max_active_runs=1,
catchup=True,
)
dag.dagrun_timeout = datetime.timedelta(seconds=1)

with dag:
task1 = BashOperator(
task_id='task1',
bash_command=' for((i=1;i<=600;i+=1)); do sleep "$i"; done',
)

session = settings.Session()
dagbag = DagBag(
dag_folder=os.devnull,
include_examples=False,
read_dags_from_db=True,
)

dagbag.bag_dag(dag=dag, root_dag=dag)
dagbag.sync_to_db(session=session)

run1 = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
session=session,
)
run1_ti = run1.get_task_instance(task1.task_id, session)
run1_ti.state = State.RUNNING

sleep(1)

run2 = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(seconds=10),
state=State.RUNNING,
session=session,
)

dag.sync_to_db(session=session)

job = SchedulerJob(subdir=os.devnull)
job.executor = MockExecutor()
job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)

_ = job._do_scheduling(session)

assert run1.state == State.FAILED
assert run1_ti.state == State.SKIPPED
assert run2.state == State.RUNNING

_ = job._do_scheduling(session)
run2_ti = run2.get_task_instance(task1.task_id, session)
assert run2_ti.state == State.QUEUED

def test_do_schedule_max_active_runs_task_removed(self):
"""Test that tasks in removed state don't count as actively running."""

Expand Down

0 comments on commit 09327ba

Please sign in to comment.