Skip to content

Commit

Permalink
Put back set_dag_runs_state and add DeprecationWarning instead
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqian90 committed May 27, 2021
1 parent a3566e2 commit b251a1d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
8 changes: 8 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ not have any effect in an existing deployment where the ``default_pool`` already

Previously this was controlled by `non_pooled_task_slot_count` in `[core]` section, which was not documented.

### `activate_dag_runs` argument of the function `clear_task_instances` is replaced with `dag_run_state`

To achieve the previous default behaviour of `clear_task_instances` with `activate_dag_runs=True`, no change is needed. To achieve the previous behaviour of `activate_dag_runs=False`, pass `dag_run_state=False` instead.

### `dag.set_dag_runs_state` is deprecated

The method `set_dag_runs_state` is no longer needed after a bug fix in PR: [#15382](https://github.com/apache/airflow/pull/15382). This method is now deprecated and will be removed in a future version.

## Airflow 2.1.0

### New "deprecated_api" extra
Expand Down
22 changes: 22 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,28 @@ def topological_sort(self, include_subdag_tasks: bool = False):

return tuple(graph_sorted)

@provide_session
def set_dag_runs_state(
self,
state: str = State.RUNNING,
session: Session = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
dag_ids: List[str] = None,
) -> None:
warnings.warn(
"This method is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=3,
)
dag_ids = dag_ids or [self.dag_id]
query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
if start_date:
query = query.filter(DagRun.execution_date >= start_date)
if end_date:
query = query.filter(DagRun.execution_date <= end_date)
query.update({DagRun.state: state}, synchronize_session='fetch')

@provide_session
def clear(
self,
Expand Down
14 changes: 14 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,20 @@ def test_normalized_schedule_interval(self, schedule_interval, expected_n_schedu
assert dag.normalized_schedule_interval == expected_n_schedule_interval
assert dag.schedule_interval == schedule_interval

def test_set_dag_runs_state(self):
clear_db_runs()
dag_id = "test_set_dag_runs_state"
dag = DAG(dag_id=dag_id)

for i in range(3):
dag.create_dagrun(run_id=f"test{i}", state=State.RUNNING)

dag.set_dag_runs_state(state=State.NONE)
drs = DagRun.find(dag_id=dag_id)

assert len(drs) == 3
assert all(dr.state == State.NONE for dr in drs)

def test_create_dagrun_run_id_is_generated(self):
dag = DAG(dag_id="run_id_is_generated")
dr = dag.create_dagrun(run_type=DagRunType.MANUAL, execution_date=DEFAULT_DATE, state=State.NONE)
Expand Down

0 comments on commit b251a1d

Please sign in to comment.