Skip to content

Commit

Permalink
Introduce RESTARTING state (#16681)
Browse files Browse the repository at this point in the history
closes: #16680

This PR makes sure that when a user clears a running task, the task does not fail. Instead it is killed and retried gracefully.

This is done by introducing a new State called RESTARTING. As the name suggests, a TaskInstance is set to this state when it's cleared while running. Most of the places handles RESTARTING the same way SHUTDOWN is handled, except in TaskInstance.is_eligible_to_retry, where it is always be treated as eligible for retry.
  • Loading branch information
yuqian90 authored Jul 31, 2021
1 parent 00d823e commit 0e0e887
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 8 deletions.
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ If you are using DAGs Details API endpoint, use `max_active_tasks` instead of `c

When marking a task success/failed in Graph View, its downstream tasks that are in failed/upstream_failed state are automatically cleared.

### Clearing a running task sets its state to `RESTARTING`

Previously, clearing a running task sets its state to `SHUTDOWN`. The task gets killed and goes into `FAILED` state. After [#16681](https://github.com/apache/airflow/pull/16681), clearing a running task sets its state to `RESTARTING`. The task is eligible for retry without going into `FAILED` state.

### Remove `TaskInstance.log_filepath` attribute

This method returned incorrect values for a long time, because it did not take into account the different
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def heartbeat(self, only_if_necessary: bool = False):
session.merge(self)
previous_heartbeat = self.latest_heartbeat

if self.state == State.SHUTDOWN:
if self.state in State.terminating_states:
self.kill()

# Figure out how long to sleep for
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def task_instance_scheduling_decisions(self, session: Session = None) -> TISched
schedulable_tis: List[TI] = []
changed_tis = False

tis = list(self.get_task_instances(session=session, state=State.task_states + (State.SHUTDOWN,)))
tis = list(self.get_task_instances(session=session, state=State.task_states))
self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis))
for ti in tis:
try:
Expand Down
11 changes: 9 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ def clear_task_instances(
for ti in tis:
if ti.state == State.RUNNING:
if ti.job_id:
ti.state = State.SHUTDOWN
# If a task is cleared when running, set its state to RESTARTING so that
# the task is terminated and becomes eligible for retry.
ti.state = State.RESTARTING
job_ids.append(ti.job_id)
else:
task_id = ti.task_id
Expand Down Expand Up @@ -211,7 +213,7 @@ def clear_task_instances(
from airflow.jobs.base_job import BaseJob

for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all():
job.state = State.SHUTDOWN
job.state = State.RESTARTING

if activate_dag_runs is not None:
warnings.warn(
Expand Down Expand Up @@ -1519,6 +1521,11 @@ def handle_failure_with_callback(

def is_eligible_to_retry(self):
"""Is task instance is eligible for retry"""
if self.state == State.RESTARTING:
# If a task is cleared when running, it goes into RESTARTING state and is always
# eligible for retry
return True

return self.task.retries and self.try_number <= self.max_tries

@provide_session
Expand Down
11 changes: 10 additions & 1 deletion airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class TaskInstanceState(str, Enum):
QUEUED = "queued" # Executor has enqueued the task
RUNNING = "running" # Task is executing
SUCCESS = "success" # Task completed
SHUTDOWN = "shutdown" # External request to shut down
SHUTDOWN = "shutdown" # External request to shut down (e.g. marked failed when running)
RESTARTING = "restarting" # External request to restart (e.g. cleared when running)
FAILED = "failed" # Task errored out
UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
Expand Down Expand Up @@ -84,6 +85,7 @@ class State:
SCHEDULED = TaskInstanceState.SCHEDULED
QUEUED = TaskInstanceState.QUEUED
SHUTDOWN = TaskInstanceState.SHUTDOWN
RESTARTING = TaskInstanceState.RESTARTING
UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
Expand All @@ -105,6 +107,7 @@ class State:
TaskInstanceState.RUNNING: 'lime',
TaskInstanceState.SUCCESS: 'green',
TaskInstanceState.SHUTDOWN: 'blue',
TaskInstanceState.RESTARTING: 'violet',
TaskInstanceState.FAILED: 'red',
TaskInstanceState.UP_FOR_RETRY: 'gold',
TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise',
Expand Down Expand Up @@ -159,6 +162,7 @@ def color_fg(cls, state):
TaskInstanceState.RUNNING,
TaskInstanceState.SENSING,
TaskInstanceState.SHUTDOWN,
TaskInstanceState.RESTARTING,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
]
Expand All @@ -182,6 +186,11 @@ def color_fg(cls, state):
A list of states indicating that a task or dag is a success state.
"""

terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING])
"""
A list of states indicating that a task has been terminated.
"""


class PokeState:
"""Static class with poke states constants used in smart operator."""
Expand Down
4 changes: 4 additions & 0 deletions airflow/www/static/css/graph.css
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ g.node.shutdown rect {
stroke: blue;
}

g.node.restarting rect {
stroke: violet;
}

g.node.upstream_failed rect {
stroke: orange;
}
Expand Down
4 changes: 4 additions & 0 deletions airflow/www/static/css/tree.css
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ rect.shutdown {
fill: blue;
}

rect.restarting {
fill: violet;
}

rect.upstream_failed {
fill: orange;
}
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/static/js/graph.js
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ function getNodeState(nodeId, tis) {
// In this order, if any of these states appeared in childrenStates, return it as
// the group state.
const priority = ['failed', 'upstream_failed', 'up_for_retry', 'up_for_reschedule',
'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'removed',
'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'restarting', 'removed',
'no_status', 'success', 'skipped'];

return priority.find((state) => childrenStates.has(state)) || 'no_status';
Expand Down
2 changes: 2 additions & 0 deletions docs/apache-airflow/concepts/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ The possible states for a Task Instance are:
* ``queued``: The task has been assigned to an Executor and is awaiting a worker
* ``running``: The task is running on a worker (or on a local/synchronous executor)
* ``success``: The task finished running without errors
* ``shutdown``: The task was externally requested to shut down when it was running
* ``restarting``: The task was externally requested to restart when it was running
* ``failed``: The task had an error during execution and failed to run
* ``skipped``: The task was skipped due to branching, LatestOnly, or similar.
* ``upstream_failed``: An upstream task failed and the :ref:`Trigger Rule <concepts:trigger-rules>` says we needed it
Expand Down
Binary file modified docs/apache-airflow/img/task_lifecycle_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,7 @@ def test_clear_set_dagrun_state_for_parent_dag(self, dag_run_state):

@parameterized.expand(
[(state, State.NONE) for state in State.task_states if state != State.RUNNING]
+ [(State.RUNNING, State.SHUTDOWN)]
+ [(State.RUNNING, State.RESTARTING)]
) # type: ignore
def test_clear_dag(self, ti_state_begin, ti_state_end: Optional[str]):
dag_id = 'test_clear_dag'
Expand Down
2 changes: 1 addition & 1 deletion tests/www/views/test_views_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_home(capture_templates, admin_client):
val_state_color_mapping = (
'const STATE_COLOR = {"failed": "red", '
'"null": "lightblue", "queued": "gray", '
'"removed": "lightgrey", "running": "lime", '
'"removed": "lightgrey", "restarting": "violet", "running": "lime", '
'"scheduled": "tan", "sensing": "lightseagreen", '
'"shutdown": "blue", "skipped": "pink", '
'"success": "green", "up_for_reschedule": "turquoise", '
Expand Down

0 comments on commit 0e0e887

Please sign in to comment.