Skip to content

Commit

Permalink
Fix dag.clear() to set multiple dags to running when necessary (#15382)
Browse files Browse the repository at this point in the history
closes: #14260
related: #9824

When clearing task across dags using ExternalTaskMarker the dag state of the external DagRun is not set to active. So cleared tasks in the external dag will not automatically start if the DagRun is a Failed or Succeeded state.

Two changes are made to fix the issue:

Make clear_task_instances set DagRuns' state to dag_run_state for all the affected DagRuns.
The filter for DagRun in clear_task_instances is fixed too. Previously, it made an assumption that execution_dates for all the dag_ids are the same, which is not always correct.
test_external_task_marker_clear_activate is added to make sure the fix does the right thing.

(cherry picked from commit 2bca8a5)
  • Loading branch information
yuqian90 committed May 29, 2021
1 parent 285791d commit bdf9bc2
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 30 deletions.
10 changes: 10 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ https://developers.google.com/style/inclusive-documentation
-->

## Master

### `activate_dag_runs` argument of the function `clear_task_instances` is replaced with `dag_run_state`

To achieve the previous default behaviour of `clear_task_instances` with `activate_dag_runs=True`, no change is needed. To achieve the previous behaviour of `activate_dag_runs=False`, pass `dag_run_state=False` instead.

### `dag.set_dag_runs_state` is deprecated

The method `set_dag_runs_state` is no longer needed after a bug fix in PR: [#15382](https://github.com/apache/airflow/pull/15382). This method is now deprecated and will be removed in a future version.

## Airflow 2.1.0

### New "deprecated_api" extra
Expand Down
12 changes: 1 addition & 11 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,8 @@ def post_clear_task_instances(dag_id: str, session=None):
task_instances = dag.clear(get_tis=True, **data)
if not data["dry_run"]:
clear_task_instances(
task_instances,
session,
dag=dag,
activate_dag_runs=False, # We will set DagRun state later.
task_instances, session, dag=dag, dag_run_state=State.RUNNING if reset_dag_runs else False
)
if reset_dag_runs:
dag.set_dag_runs_state(
session=session,
start_date=data["start_date"],
end_date=data["end_date"],
state=State.RUNNING,
)
task_instances = task_instances.join(
DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date)
).add_column(DR.run_id)
Expand Down
21 changes: 8 additions & 13 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,11 @@ def set_dag_runs_state(
end_date: Optional[datetime] = None,
dag_ids: List[str] = None,
) -> None:
warnings.warn(
"This method is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
dag_ids = dag_ids or [self.dag_id]
query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
if start_date:
Expand Down Expand Up @@ -1172,7 +1177,8 @@ def clear(
:type include_subdags: bool
:param include_parentdag: Clear tasks in the parent dag of the subdag.
:type include_parentdag: bool
:param dag_run_state: state to set DagRun to
:param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
be changed.
:param dry_run: Find the tasks to clear but don't clear them.
:type dry_run: bool
:param session: The sqlalchemy session to use
Expand All @@ -1193,20 +1199,17 @@ def clear(
"""
TI = TaskInstance
tis = session.query(TI)
dag_ids = []
if include_subdags:
# Crafting the right filter for dag_id and task_ids combo
conditions = []
for dag in self.subdags + [self]:
conditions.append((TI.dag_id == dag.dag_id) & TI.task_id.in_(dag.task_ids))
dag_ids.append(dag.dag_id)
tis = tis.filter(or_(*conditions))
else:
tis = session.query(TI).filter(TI.dag_id == self.dag_id)
tis = tis.filter(TI.task_id.in_(self.task_ids))

if include_parentdag and self.is_subdag and self.parent_dag is not None:
dag_ids.append(self.parent_dag.dag_id)
p_dag = self.parent_dag.sub_dag(
task_ids_or_regex=r"^{}$".format(self.dag_id.split('.')[1]),
include_upstream=False,
Expand Down Expand Up @@ -1340,15 +1343,7 @@ def clear(
tis,
session,
dag=self,
activate_dag_runs=False, # We will set DagRun state later.
)

self.set_dag_runs_state(
session=session,
start_date=start_date,
end_date=end_date,
state=dag_run_state,
dag_ids=dag_ids,
dag_run_state=dag_run_state,
)
else:
count = 0
Expand Down
19 changes: 13 additions & 6 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def set_error_file(error_file: str, error: Union[str, Exception]) -> None:
def clear_task_instances(
tis,
session,
activate_dag_runs=True,
dag_run_state: str = State.RUNNING,
dag=None,
):
"""
Expand All @@ -142,7 +142,8 @@ def clear_task_instances(
:param tis: a list of task instances
:param session: current session
:param activate_dag_runs: flag to check for active dag run
:param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
be changed.
:param dag: DAG object
"""
job_ids = []
Expand Down Expand Up @@ -204,19 +205,25 @@ def clear_task_instances(
for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all(): # noqa
job.state = State.SHUTDOWN

if activate_dag_runs and tis:
if dag_run_state is not False and tis:
from airflow.models.dagrun import DagRun # Avoid circular import

dates_by_dag_id = defaultdict(set)
for instance in tis:
dates_by_dag_id[instance.dag_id].add(instance.execution_date)

drs = (
session.query(DagRun)
.filter(
DagRun.dag_id.in_({ti.dag_id for ti in tis}),
DagRun.execution_date.in_({ti.execution_date for ti in tis}),
or_(
and_(DagRun.dag_id == dag_id, DagRun.execution_date.in_(dates))
for dag_id, dates in dates_by_dag_id.items()
)
)
.all()
)
for dr in drs:
dr.state = State.RUNNING
dr.state = dag_run_state
dr.start_date = timezone.utcnow()


Expand Down
96 changes: 96 additions & 0 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,53 @@ def dag_bag_ext():
return dag_bag


@pytest.fixture
def dag_bag_parent_child():
"""
Create a DagBag with two DAGs looking like this. task_1 of child_dag_1 on day 1 depends on
task_0 of parent_dag_0 on day 1. Therefore, when task_0 of parent_dag_0 on day 1 and day 2
are cleared, parent_dag_0 DagRuns need to be set to running on both days, but child_dag_1
only needs to be set to running on day 1.
day 1 day 2
parent_dag_0 task_0 task_0
|
|
v
child_dag_1 task_1 task_1
"""
dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False)

day_1 = DEFAULT_DATE

dag_0 = DAG("parent_dag_0", start_date=day_1, schedule_interval=None)
task_0 = ExternalTaskMarker(
task_id="task_0",
external_dag_id="child_dag_1",
external_task_id="task_1",
execution_date=day_1.isoformat(),
recursion_depth=3,
dag=dag_0,
)

dag_1 = DAG("child_dag_1", start_date=day_1, schedule_interval=None)
_ = ExternalTaskSensor(
task_id="task_1",
external_dag_id=dag_0.dag_id,
external_task_id=task_0.task_id,
execution_date_fn=lambda execution_date: day_1 if execution_date == day_1 else [],
mode='reschedule',
dag=dag_1,
)

for dag in [dag_0, dag_1]:
dag_bag.bag_dag(dag=dag, root_dag=dag)

return dag_bag


def run_tasks(dag_bag, execution_date=DEFAULT_DATE):
"""
Run all tasks in the DAGs in the given dag_bag. Return the TaskInstance objects as a dict
Expand Down Expand Up @@ -464,6 +511,55 @@ def test_external_task_marker_transitive(dag_bag_ext):
assert_ti_state_equal(ti_b_3, State.NONE)


# pylint: disable=redefined-outer-name
def test_external_task_marker_clear_activate(dag_bag_parent_child):
"""
Test clearing tasks across DAGs and make sure the right DagRuns are activated.
"""
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType

dag_bag = dag_bag_parent_child
day_1 = DEFAULT_DATE
day_2 = DEFAULT_DATE + timedelta(days=1)

run_tasks(dag_bag, execution_date=day_1)
run_tasks(dag_bag, execution_date=day_2)

with create_session() as session:
for dag in dag_bag.dags.values():
for execution_date in [day_1, day_2]:
dagrun = dag.create_dagrun(
State.RUNNING, execution_date, run_type=DagRunType.MANUAL, session=session
)
dagrun.set_state(State.SUCCESS)
session.add(dagrun)

session.commit()

# Assert that dagruns of all the affected dags are set to SUCCESS before tasks are cleared.
for dag in dag_bag.dags.values():
for execution_date in [day_1, day_2]:
dagrun = dag.get_dagrun(execution_date=execution_date)
assert dagrun.state == State.SUCCESS

dag_0 = dag_bag.get_dag("parent_dag_0")
task_0 = dag_0.get_task("task_0")
clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2)

# Assert that dagruns of all the affected dags are set to RUNNING after tasks are cleared.
# Unaffected dagruns should be left as SUCCESS.
dagrun_0_1 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_1)
dagrun_0_2 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_2)
dagrun_1_1 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_1)
dagrun_1_2 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_2)

assert dagrun_0_1.state == State.RUNNING
assert dagrun_0_2.state == State.RUNNING
assert dagrun_1_1.state == State.RUNNING
assert dagrun_1_2.state == State.SUCCESS


def test_external_task_marker_future(dag_bag_ext):
"""
Test clearing tasks with no end_date. This is the case when users clear tasks with
Expand Down

0 comments on commit bdf9bc2

Please sign in to comment.