From b251a1d57b2057545a90a9dc1a27ec5b46d92337 Mon Sep 17 00:00:00 2001 From: Qian Yu Date: Thu, 27 May 2021 21:40:24 +0800 Subject: [PATCH] Put back set_dag_runs_state and add DeprecationWarning instead --- UPDATING.md | 8 ++++++++ airflow/models/dag.py | 22 ++++++++++++++++++++++ tests/models/test_dag.py | 14 ++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/UPDATING.md b/UPDATING.md index c7a97dfa2228d..928cfae08252e 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -92,6 +92,14 @@ not have any effect in an existing deployment where the ``default_pool`` already Previously this was controlled by `non_pooled_task_slot_count` in `[core]` section, which was not documented. +### `activate_dag_runs` argument of the function `clear_task_instances` is replaced with `dag_run_state` + +To achieve the previous default behaviour of `clear_task_instances` with `activate_dag_runs=True`, no change is needed. To achieve the previous behaviour of `activate_dag_runs=False`, pass `dag_run_state=False` instead. + +### `dag.set_dag_runs_state` is deprecated + +The method `set_dag_runs_state` is no longer needed after a bug fix in PR: [#15382](https://github.com/apache/airflow/pull/15382). This method is now deprecated and will be removed in a future version. + ## Airflow 2.1.0 ### New "deprecated_api" extra diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 92be819adb529..ffe3f5bf5e21d 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1117,6 +1117,28 @@ def topological_sort(self, include_subdag_tasks: bool = False): return tuple(graph_sorted) + @provide_session + def set_dag_runs_state( + self, + state: str = State.RUNNING, + session: Session = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + dag_ids: List[str] = None, + ) -> None: + warnings.warn( + "This method is deprecated and will be removed in a future version.", + DeprecationWarning, + stacklevel=3, + ) + dag_ids = dag_ids or [self.dag_id] + query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)) + if start_date: + query = query.filter(DagRun.execution_date >= start_date) + if end_date: + query = query.filter(DagRun.execution_date <= end_date) + query.update({DagRun.state: state}, synchronize_session='fetch') + @provide_session def clear( self, diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index bd2cdb13fdfff..bd9fdb7155d62 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1224,6 +1224,20 @@ def test_normalized_schedule_interval(self, schedule_interval, expected_n_schedu assert dag.normalized_schedule_interval == expected_n_schedule_interval assert dag.schedule_interval == schedule_interval + def test_set_dag_runs_state(self): + clear_db_runs() + dag_id = "test_set_dag_runs_state" + dag = DAG(dag_id=dag_id) + + for i in range(3): + dag.create_dagrun(run_id=f"test{i}", state=State.RUNNING) + + dag.set_dag_runs_state(state=State.NONE) + drs = DagRun.find(dag_id=dag_id) + + assert len(drs) == 3 + assert all(dr.state == State.NONE for dr in drs) + def test_create_dagrun_run_id_is_generated(self): dag = DAG(dag_id="run_id_is_generated") dr = dag.create_dagrun(run_type=DagRunType.MANUAL, execution_date=DEFAULT_DATE, state=State.NONE)