Skip to content

Commit

Permalink
Bugfix: Manual DagRun trigger should not skip scheduled runs (#13963)
Browse files Browse the repository at this point in the history
closes #13434

(cherry picked from commit de277c6)
  • Loading branch information
kaxil committed Feb 4, 2021
1 parent 073d0b1 commit 958e258
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
1 change: 0 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1846,7 +1846,6 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=None):
or_(
DagRun.run_type == DagRunType.BACKFILL_JOB,
DagRun.run_type == DagRunType.SCHEDULED,
DagRun.external_trigger.is_(True),
),
)
.group_by(DagRun.dag_id)
Expand Down
63 changes: 63 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3595,6 +3595,69 @@ def test_scheduler_create_dag_runs_does_not_raise_error(self):
"'test_scheduler_create_dag_runs_does_not_raise_error' not found in serialized_dag table"
) in log_output.output[0]

def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self):
"""
Test that externally triggered Dag Runs should not affect (by skipping) next
scheduled DAG runs
"""
dag = DAG(
dag_id='test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run',
start_date=DEFAULT_DATE,
schedule_interval="*/1 * * * *",
max_active_runs=5,
catchup=True,
)

DummyOperator(task_id='dummy', dag=dag, owner='airflow')

session = settings.Session()
dag.clear()
dagbag = DagBag(
dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
include_examples=False,
read_dags_from_db=True,
)
dagbag.bag_dag(dag=dag, root_dag=dag)
# Write to dag and serialized_dag table
dagbag.sync_to_db(session)
dag = dagbag.get_dag(dag.dag_id)

# Verify that dag_model.next_dagrun is equal to next execution_date
dag_model = session.query(DagModel).get(dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE

job = SchedulerJob(subdir=os.devnull)
job.executor = MockExecutor(do_update=False)
job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)

# Verify a DagRun is created with the correct execution_date
# when Scheduler._do_scheduling is run in the Scheduler Loop
job._do_scheduling(session)
dr1 = dag.get_dagrun(DEFAULT_DATE, session)
assert dr1 is not None
assert dr1.state == State.RUNNING

# Verify that dag_model.next_dagrun is set to next execution_date
dag_model = session.query(DagModel).get(dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)

# Trigger the Dag externally
dr = dag.create_dagrun(
state=State.RUNNING,
execution_date=timezone.utcnow(),
run_type=DagRunType.MANUAL,
session=session,
external_trigger=True,
)
assert dr is not None
# Run DAG.bulk_write_to_db -- this is run when in DagFileProcessor.process_file
DAG.bulk_write_to_db([dag], session)

# Test that 'dag_model.next_dagrun' has not been changed because of newly created external
# triggered DagRun.
dag_model = session.query(DagModel).get(dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)

def test_do_schedule_max_active_runs_upstream_failed(self):
"""
Test that tasks in upstream failed don't count as actively running.
Expand Down

0 comments on commit 958e258

Please sign in to comment.