Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached) #13407

Closed
soltanianalytics opened this issue Dec 31, 2020 · 11 comments
Labels
affected_version:2.0 Issues Reported for 2.0 area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug pending-response priority:low Bug with a simple workaround that would not block a release
Milestone

Comments

@soltanianalytics
Copy link
Contributor

Apache Airflow version: 2.0, LocalExecutor

Environment: Docker on Windows 10 with WSL using image apache/airflow:2.0.0-python3.8

What happened:

Situation:

  • There is a DAG, say mydag, with
    • catchup=True
    • max_active_runs=1
  • Let's say there are two DAG runs, t=0 and t=1
  • The first task of the DAG is a sensor that senses if the previous DAG was succesful
  • Now, t=0 gets run, tasks are scheduled, and a task in t=0 fails
  • Then, t=1 gets run, and the first task - the sensor - cannot sense the successful task, thus keeps sensing
  • Now I clear the failed task in t=0 and expect that this would run, as it did in airflow 1.x
  • It doesn't - instead the scheduler gives the following:
scheduler_1  | [2020-12-31 15:25:32,770] {scheduler_job.py:1667} INFO - DAG mydag already has 1 active runs, not queuing any tasks for run 2020-12-26 05:00:00+00:00 [note: this is t=0]

Thus, t=0 never finishes and t=1 never sensed the finished run, and any t=n with n>1 also have no chance of ever succeeding.

One alternative would be to remove the max_active_runs constraint, but that is not feasible, as this would create hundreds of DAG runs at once and that is a complete and total performance killer.

What you expected to happen:

As with previous airflow versions, I would expect that the cleared tasks get scheduled again, which they don't.

Why this happens:

tl;dr Ultimately, this happens because airflow uses TI instead of DR here: https://github.com/apache/airflow/blob/v2-0-stable/airflow/jobs/scheduler_job.py#L1499-L1509

_do_scheduling() runs _schedule_dag_run() once for each dag_id, and gives the set of active dag runs as arg, here: https://github.com/apache/airflow/blob/v2-0-stable/airflow/jobs/scheduler_job.py#L1515. The tasks that should be queued are not queued because the dag runs are not in the abovementioned set of active dag runs. This is in spite of the fact that they are running. This is because https://github.com/apache/airflow/blob/v2-0-stable/airflow/jobs/scheduler_job.py#L1499-L1509 looks at all TaskInstances of that dagrun and their execution date instead of looking at the DagRuns, and since the tasks were successfull or failed and then cleared, they are filtered out in the query. If you replace TI with DR in that query, this should work perfectly fine, without breaking anything that currently works and fixing this issue.

How to reproduce it:

You don't need to have the sensor logic I described above to reproduce this behavior. While I didn't do this, the following should reproduce the behavior:

  • Create a DAG mydag with catchup=True and max_active_runs=1
  • Just have a dummy task or something, let it run a couple of times so you have a couple of successful DAG states
  • Pause the DAG*
  • Clear a couple of tasks in dag runs that were successful
  • run this snippet to see the result of the query with TI and DR, respectively
from airflow import models, settings
from airflow.utils.state import State
TI = models.TaskInstance
DR = models.DagRun
dag_id = "mydag"

result = "\n\nactive DAG runs according to current code logic:"
for data_tuple in settings.Session().query(TI.dag_id, TI.execution_date).filter(TI.dag_id.in_([dag_id]), TI.state.notin_(list(State.finished))):
 result += "\n\t" + str(data_tuple)

result += "\n\nactive DAG runs according to my proposed code logic:"
for data_tuple in settings.Session().query(DR.dag_id, DR.execution_date).filter(DR.dag_id.in_([dag_id]), DR.state.in_([State.RUNNING])):
 result += "\n\t" + str(data_tuple)

print(result, "\n")

*Pausing of the DAG only avoids that your airflow instance works through the dag runs one-by-one; you would not need to pause if your DAG has a sensor that senses the success of the previous DAG like mine do.

I will be creating a PR with the suggested fix shortly.

@soltanianalytics soltanianalytics added the kind:bug This is a clearly a bug label Dec 31, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Dec 31, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@soltanianalytics
Copy link
Contributor Author

I did some more reading (mainly #1442 and https://issues.apache.org/jira/browse/AIRFLOW-137). I see now that using TI was entirely on purpose. Currently, it is expected to ignore the DagRuns which were re-set to running by the clearing of tasks in order to avoid a violation of max_active_runs, until such a violation is avoided. My issue with this is that

  1. The scheduler does not schedule tasks in DagRuns which are, in fact, running
  2. When a user clears tasks, the user would want these tasks to be scheduled, therefore I think the violation of max_active_runs - as is the case in my usecase - is on purpose and a feature, not a bug

But, from #1442 I can see that a user might also want to just have specific tasks run, but have them run across a large number of DagRuns, while only executing tasks in max_active_runs or less DagRuns. Arguably, when using backfill or just generally catchup=True, I would expect that I can rely on the tasks being executed ordered by execution_date, because if I just have my airflow installation running, that is also the order in which the tasks are being run. Thus, I think a second alternative is an approach where we keep the abovementioned logic but adjust it so that only tasks in the first max_active_runs DagRuns are run, ordered by execution_date.

I will create a second PR with this alternative approach.

soltanianalytics pushed a commit to soltanianalytics/airflow that referenced this issue Jan 1, 2021
@turbaszek turbaszek added the area:Scheduler including HA (high availability) scheduler label Jan 2, 2021
@turbaszek
Copy link
Member

CC @ashb

soltanianalytics pushed a commit to soltanianalytics/airflow that referenced this issue Jan 2, 2021
soltanianalytics pushed a commit to soltanianalytics/airflow that referenced this issue Jan 2, 2021
soltanianalytics pushed a commit to soltanianalytics/airflow that referenced this issue Jan 2, 2021
soltanianalytics pushed a commit to soltanianalytics/airflow that referenced this issue Jan 2, 2021
@vikramkoka vikramkoka added the affected_version:2.0 Issues Reported for 2.0 label Jan 16, 2021
@kaxil kaxil changed the title Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached) Jan 28, 2021
@kaxil kaxil added priority:medium Bug that should be fixed before next release but would not block a release priority:low Bug with a simple workaround that would not block a release and removed priority:medium Bug that should be fixed before next release but would not block a release labels Jan 28, 2021
@kaxil kaxil modified the milestones: Airflow 2.0.1, Airflow 2.0.2 Feb 4, 2021
kaxil pushed a commit that referenced this issue Mar 5, 2021
ashb pushed a commit that referenced this issue Mar 19, 2021
@ashb ashb removed this from the Airflow 2.0.2 milestone Apr 22, 2021
@ashb ashb added this to the Airflow 2.0.3 milestone Apr 22, 2021
@ashb ashb modified the milestones: Airflow 2.0.3, Airflow 2.1.1 May 7, 2021
@kaxil kaxil modified the milestones: Airflow 2.1.1, Airflow 2.2 Jun 22, 2021
@ephraimbuddy
Copy link
Contributor

@soltanianalytics please can you test this on 2.1.3, I was not able to reproduce

@soltanianalytics
Copy link
Contributor Author

The behavior has indeed changed since 2.0.0, and while not perfect, I can deal with it for now - closing the ticket

@ephraimbuddy
Copy link
Contributor

The behavior has indeed changed since 2.0.0, and while not perfect, I can deal with it for now - closing the ticket

Please can you provide more context to not perfect, I'm happy to look into this issue if you can explain more, thanks

@soltanianalytics
Copy link
Contributor Author

My core usecase is re-running DAGs that are not currently running.

So I have a DAG with max_active_runs=1 and catchup=True. This DAG depends on the previous DagRuns being successful. I implement this logic via a sensor that senses the success of the last task of the previous DagRun. If the previous DagRun failed, the current one will keep sensing into the abyss. It might fail after some time, too. Then I might have n DagRuns that I want to re-run. This n can be in the dozens. If I just clear the tasks of all the DAGs I want to run, they may not be running in order, but because all but the oldest tasks will have sensors that will not be successful until the previous DagRun was successful, this will only execute properly if the DagRuns are executed in chronological order.

I can make that happen if I let the currently active DagRun fail before clearing tasks and setting DagRun states to running. Most of the time, that should do the trick. If not, I'll just delete all relevant DagRuns and then they'll re-appear chronologically.

@ephraimbuddy
Copy link
Contributor

Thanks @soltanianalytics

@soltanianalytics
Copy link
Contributor Author

If I don't let the current one fail first, or if Airflow otherwise has a hickup, then it will simply never schedule the "correct" DagRuns because of max_active_runs (so my idea of max_active_runs is that it should only apply when creating running DagRuns and Airflow should consider all DagRuns that are running irrespective of max_active_runs once they do run, however others seem to disagree with this interpretation)

@ephraimbuddy
Copy link
Contributor

Actually, the problem with max_active_runs is that the line of code below:

dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)

doesn't get distinct dag_ids. For example:

dag1 has 8queued dagruns and dag2 has 5 queued dagruns and dag3 has 6

The code would get all the 8 dagruns from dag1(if it has the closest date) and then 2 dagruns from dag2 and nothing from dag3.
The correct thing should be for each dag, get all queued dagruns and set the state to running if the max_active_runs is not reached

@soltanianalytics
Copy link
Contributor Author

soltanianalytics commented Aug 30, 2021

Note that in the issue I described above, all RagRuns are already running, but tasks are not scheduled

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.0 Issues Reported for 2.0 area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug pending-response priority:low Bug with a simple workaround that would not block a release
Projects
None yet
Development

No branches or pull requests

7 participants