-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix bug allowing task instances to survive when dagrun_timeout is exceeded #14321
Conversation
It seems odd to me that this failed -- the only logs for the failure indicate a timeout in the CI? Unless I'm missing something which is highly possible. |
I have re-run the tests again -- it might be a one-off failure |
Looks like that was the case :) |
.filter(TI.state.in_(State.unfinished)) | ||
) | ||
for task_instance in unfinished_task_instances: | ||
task_instance.state = State.SKIPPED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what was the old behaviour in 1.10.x -- skip vs failed
We should add a test for it too.
cc @ashb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe setting the task instance state to failed will have the same effect. It just has to be a state in State.finished
. I chose skipped, because I figured the tasks didn’t even get a chance to run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, can you add a test please to cover this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kaxil Would it be bad to use time.sleep
to force a timeout in the test? I'm not sure there's a way to otherwise indicate a dag run has timed out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E.g:
def test_do_schedule_max_active_runs_dag_timed_out(self):
"""Test that tasks are set to a finished state when their DAG times out"""
dag = DAG(
dag_id='test_max_active_run_with_dag_timed_out',
start_date=DEFAULT_DATE,
schedule_interval='@once',
max_active_runs=1,
catchup=True,
)
dag.dagrun_timeout = datetime.timedelta(seconds=1)
with dag:
task1 = BashOperator(
task_id='task1',
bash_command=' for((i=1;i<=600;i+=1)); do sleep "$i"; done',
)
session = settings.Session()
dagbag = DagBag(
dag_folder=os.devnull,
include_examples=False,
read_dags_from_db=True,
)
dagbag.bag_dag(dag=dag, root_dag=dag)
dagbag.sync_to_db(session=session)
run1 = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
session=session,
)
run1_ti = run1.get_task_instance(task1.task_id, session)
run1_ti.state = State.RUNNING
sleep(1)
run2 = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(seconds=10),
state=State.RUNNING,
session=session,
)
dag.sync_to_db(session=session)
job = SchedulerJob(subdir=os.devnull)
job.executor = MockExecutor()
job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
num_queued = job._do_scheduling(session)
assert run1.state == State.FAILED
assert run1_ti.state == State.SKIPPED
assert run2.state == State.RUNNING
num_queued = job._do_scheduling(session)
run2_ti = run2.get_task_instance(task1.task_id, session)
assert run2_ti.state == State.QUEUED
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test seems to be successful with a 1 second sleep. Just pushed a commit including a test for this scenario
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kaxil i think the CI failed due to a transient error again...
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
Task instances would not be terminated when dagrun_timeout is exceeded which allowed new DAG runs to be created, but task instances within the new DAGs would never be instantiated. As alluded to in #13407, the query to determine if a DAG is active is based on task instances being unfinished. Since task instances were not being set to a
State.finished
state when their DAG run was set toFAILED
state, the scheduler continued to pick up the DAG asRUNNING
, and tasks in new DAG runs would never be scheduled.closes: #12912
related: #13407