diff --git a/UPDATING.md b/UPDATING.md index 4777b0e10b794..b18dff578e76c 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -133,6 +133,10 @@ If you are using DAGs Details API endpoint, use `max_active_tasks` instead of `c When marking a task success/failed in Graph View, its downstream tasks that are in failed/upstream_failed state are automatically cleared. +### Clearing a running task sets its state to `RESTARTING` + +Previously, clearing a running task sets its state to `SHUTDOWN`. The task gets killed and goes into `FAILED` state. After [#16681](https://github.com/apache/airflow/pull/16681), clearing a running task sets its state to `RESTARTING`. The task is eligible for retry without going into `FAILED` state. + ### Remove `TaskInstance.log_filepath` attribute This method returned incorrect values for a long time, because it did not take into account the different diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py index 18893f2110a4f..745f248fc4da0 100644 --- a/airflow/jobs/base_job.py +++ b/airflow/jobs/base_job.py @@ -202,7 +202,7 @@ def heartbeat(self, only_if_necessary: bool = False): session.merge(self) previous_heartbeat = self.latest_heartbeat - if self.state == State.SHUTDOWN: + if self.state in State.terminating_states: self.kill() # Figure out how long to sleep for diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index e2057d5a34af0..1fdd26cd0a611 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -477,7 +477,7 @@ def task_instance_scheduling_decisions(self, session: Session = None) -> TISched schedulable_tis: List[TI] = [] changed_tis = False - tis = list(self.get_task_instances(session=session, state=State.task_states + (State.SHUTDOWN,))) + tis = list(self.get_task_instances(session=session, state=State.task_states)) self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis)) for ti in tis: try: diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 6fb437b012b68..11fa590fc213d 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -158,7 +158,9 @@ def clear_task_instances( for ti in tis: if ti.state == State.RUNNING: if ti.job_id: - ti.state = State.SHUTDOWN + # If a task is cleared when running, set its state to RESTARTING so that + # the task is terminated and becomes eligible for retry. + ti.state = State.RESTARTING job_ids.append(ti.job_id) else: task_id = ti.task_id @@ -211,7 +213,7 @@ def clear_task_instances( from airflow.jobs.base_job import BaseJob for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all(): - job.state = State.SHUTDOWN + job.state = State.RESTARTING if activate_dag_runs is not None: warnings.warn( @@ -1519,6 +1521,11 @@ def handle_failure_with_callback( def is_eligible_to_retry(self): """Is task instance is eligible for retry""" + if self.state == State.RESTARTING: + # If a task is cleared when running, it goes into RESTARTING state and is always + # eligible for retry + return True + return self.task.retries and self.try_number <= self.max_tries @provide_session diff --git a/airflow/utils/state.py b/airflow/utils/state.py index e95b4095da2b2..f408c94b0d3df 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -39,7 +39,8 @@ class TaskInstanceState(str, Enum): QUEUED = "queued" # Executor has enqueued the task RUNNING = "running" # Task is executing SUCCESS = "success" # Task completed - SHUTDOWN = "shutdown" # External request to shut down + SHUTDOWN = "shutdown" # External request to shut down (e.g. marked failed when running) + RESTARTING = "restarting" # External request to restart (e.g. cleared when running) FAILED = "failed" # Task errored out UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor @@ -84,6 +85,7 @@ class State: SCHEDULED = TaskInstanceState.SCHEDULED QUEUED = TaskInstanceState.QUEUED SHUTDOWN = TaskInstanceState.SHUTDOWN + RESTARTING = TaskInstanceState.RESTARTING UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED @@ -105,6 +107,7 @@ class State: TaskInstanceState.RUNNING: 'lime', TaskInstanceState.SUCCESS: 'green', TaskInstanceState.SHUTDOWN: 'blue', + TaskInstanceState.RESTARTING: 'violet', TaskInstanceState.FAILED: 'red', TaskInstanceState.UP_FOR_RETRY: 'gold', TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise', @@ -159,6 +162,7 @@ def color_fg(cls, state): TaskInstanceState.RUNNING, TaskInstanceState.SENSING, TaskInstanceState.SHUTDOWN, + TaskInstanceState.RESTARTING, TaskInstanceState.UP_FOR_RETRY, TaskInstanceState.UP_FOR_RESCHEDULE, ] @@ -182,6 +186,11 @@ def color_fg(cls, state): A list of states indicating that a task or dag is a success state. """ + terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING]) + """ + A list of states indicating that a task has been terminated. + """ + class PokeState: """Static class with poke states constants used in smart operator.""" diff --git a/airflow/www/static/css/graph.css b/airflow/www/static/css/graph.css index f4c7b942c5a77..ce76df7dfcc7b 100644 --- a/airflow/www/static/css/graph.css +++ b/airflow/www/static/css/graph.css @@ -148,6 +148,10 @@ g.node.shutdown rect { stroke: blue; } +g.node.restarting rect { + stroke: violet; +} + g.node.upstream_failed rect { stroke: orange; } diff --git a/airflow/www/static/css/tree.css b/airflow/www/static/css/tree.css index 05b2c81de9973..c17cf0a6eb65b 100644 --- a/airflow/www/static/css/tree.css +++ b/airflow/www/static/css/tree.css @@ -67,6 +67,10 @@ rect.shutdown { fill: blue; } +rect.restarting { + fill: violet; +} + rect.upstream_failed { fill: orange; } diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js index d3d913b6dd27a..5b1ffee5406e3 100644 --- a/airflow/www/static/js/graph.js +++ b/airflow/www/static/js/graph.js @@ -536,7 +536,7 @@ function getNodeState(nodeId, tis) { // In this order, if any of these states appeared in childrenStates, return it as // the group state. const priority = ['failed', 'upstream_failed', 'up_for_retry', 'up_for_reschedule', - 'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'removed', + 'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'restarting', 'removed', 'no_status', 'success', 'skipped']; return priority.find((state) => childrenStates.has(state)) || 'no_status'; diff --git a/docs/apache-airflow/concepts/tasks.rst b/docs/apache-airflow/concepts/tasks.rst index d4a6608c7c539..f481baba60485 100644 --- a/docs/apache-airflow/concepts/tasks.rst +++ b/docs/apache-airflow/concepts/tasks.rst @@ -70,6 +70,8 @@ The possible states for a Task Instance are: * ``queued``: The task has been assigned to an Executor and is awaiting a worker * ``running``: The task is running on a worker (or on a local/synchronous executor) * ``success``: The task finished running without errors +* ``shutdown``: The task was externally requested to shut down when it was running +* ``restarting``: The task was externally requested to restart when it was running * ``failed``: The task had an error during execution and failed to run * ``skipped``: The task was skipped due to branching, LatestOnly, or similar. * ``upstream_failed``: An upstream task failed and the :ref:`Trigger Rule ` says we needed it diff --git a/docs/apache-airflow/img/task_lifecycle_diagram.png b/docs/apache-airflow/img/task_lifecycle_diagram.png index ad0bd9ecf49ec..810942fc74001 100644 Binary files a/docs/apache-airflow/img/task_lifecycle_diagram.png and b/docs/apache-airflow/img/task_lifecycle_diagram.png differ diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 7cbda6690c8b6..726c07bfa1d9d 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1432,7 +1432,7 @@ def test_clear_set_dagrun_state_for_parent_dag(self, dag_run_state): @parameterized.expand( [(state, State.NONE) for state in State.task_states if state != State.RUNNING] - + [(State.RUNNING, State.SHUTDOWN)] + + [(State.RUNNING, State.RESTARTING)] ) # type: ignore def test_clear_dag(self, ti_state_begin, ti_state_end: Optional[str]): dag_id = 'test_clear_dag' diff --git a/tests/www/views/test_views_base.py b/tests/www/views/test_views_base.py index 84a1ea6b13231..537f661e588c5 100644 --- a/tests/www/views/test_views_base.py +++ b/tests/www/views/test_views_base.py @@ -58,7 +58,7 @@ def test_home(capture_templates, admin_client): val_state_color_mapping = ( 'const STATE_COLOR = {"failed": "red", ' '"null": "lightblue", "queued": "gray", ' - '"removed": "lightgrey", "running": "lime", ' + '"removed": "lightgrey", "restarting": "violet", "running": "lime", ' '"scheduled": "tan", "sensing": "lightseagreen", ' '"shutdown": "blue", "skipped": "pink", ' '"success": "green", "up_for_reschedule": "turquoise", '