From 958e25836ffa78273aa195865bc456dceee06351 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sat, 30 Jan 2021 12:02:53 +0000 Subject: [PATCH] Bugfix: Manual DagRun trigger should not skip scheduled runs (#13963) closes https://github.com/apache/airflow/issues/13434 (cherry picked from commit de277c69e7909cf0d563bbd542166397523ebbe0) --- airflow/models/dag.py | 1 - tests/jobs/test_scheduler_job.py | 63 ++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index dfc2827f20cf6..95712a2cbe7c7 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1846,7 +1846,6 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=None): or_( DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED, - DagRun.external_trigger.is_(True), ), ) .group_by(DagRun.dag_id) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 6b638061bb2bc..595da25006c02 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3595,6 +3595,69 @@ def test_scheduler_create_dag_runs_does_not_raise_error(self): "'test_scheduler_create_dag_runs_does_not_raise_error' not found in serialized_dag table" ) in log_output.output[0] + def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self): + """ + Test that externally triggered Dag Runs should not affect (by skipping) next + scheduled DAG runs + """ + dag = DAG( + dag_id='test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run', + start_date=DEFAULT_DATE, + schedule_interval="*/1 * * * *", + max_active_runs=5, + catchup=True, + ) + + DummyOperator(task_id='dummy', dag=dag, owner='airflow') + + session = settings.Session() + dag.clear() + dagbag = DagBag( + dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), + include_examples=False, + read_dags_from_db=True, + ) + dagbag.bag_dag(dag=dag, root_dag=dag) + # Write to dag and serialized_dag table + dagbag.sync_to_db(session) + dag = dagbag.get_dag(dag.dag_id) + + # Verify that dag_model.next_dagrun is equal to next execution_date + dag_model = session.query(DagModel).get(dag.dag_id) + assert dag_model.next_dagrun == DEFAULT_DATE + + job = SchedulerJob(subdir=os.devnull) + job.executor = MockExecutor(do_update=False) + job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) + + # Verify a DagRun is created with the correct execution_date + # when Scheduler._do_scheduling is run in the Scheduler Loop + job._do_scheduling(session) + dr1 = dag.get_dagrun(DEFAULT_DATE, session) + assert dr1 is not None + assert dr1.state == State.RUNNING + + # Verify that dag_model.next_dagrun is set to next execution_date + dag_model = session.query(DagModel).get(dag.dag_id) + assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1) + + # Trigger the Dag externally + dr = dag.create_dagrun( + state=State.RUNNING, + execution_date=timezone.utcnow(), + run_type=DagRunType.MANUAL, + session=session, + external_trigger=True, + ) + assert dr is not None + # Run DAG.bulk_write_to_db -- this is run when in DagFileProcessor.process_file + DAG.bulk_write_to_db([dag], session) + + # Test that 'dag_model.next_dagrun' has not been changed because of newly created external + # triggered DagRun. + dag_model = session.query(DagModel).get(dag.dag_id) + assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1) + def test_do_schedule_max_active_runs_upstream_failed(self): """ Test that tasks in upstream failed don't count as actively running.