Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce 'Callbacks Sink' classes for sending callbacks #21301

Merged
merged 12 commits into from
Feb 22, 2022

Conversation

mhenc
Copy link
Contributor

@mhenc mhenc commented Feb 3, 2022

This change refactors code, to prepare for DAG Processor separation:

  • move sending callbacks to through a dedicated 'PipeCallbacksSink' class. In a follow-up change, I will crete new DatabaseCallbacksSink which will send them to the Database.
  • introduce send_callback_to_execute for BaseExecutor - with default implementations. It can be overriden by Executor subclass. This method is called from scheduler job when sending callbacks.

Part of AIP-43
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Feb 3, 2022
@mhenc mhenc changed the title Refactor sending callback to Callbacks Sink classes Refactor sending callbacks to Callbacks Sink classes Feb 3, 2022
@potiuk potiuk added the AIP-43 DAG processor separation AIP-43 label Feb 6, 2022
@mhenc mhenc force-pushed the callbacks_sink branch 2 times, most recently from fa29eeb to 420baf8 Compare February 10, 2022 13:26
@mhenc mhenc changed the title Refactor sending callbacks to Callbacks Sink classes Refactor sending callbacks to 'Callbacks Sink' classes Feb 10, 2022
@mhenc mhenc force-pushed the callbacks_sink branch 4 times, most recently from 695d7f7 to 67508a1 Compare February 10, 2022 14:35
@mhenc mhenc changed the title Refactor sending callbacks to 'Callbacks Sink' classes Introduce 'Callbacks Sink' classes for sending callbacks Feb 10, 2022
@mhenc mhenc marked this pull request as ready for review February 10, 2022 14:49
@mhenc
Copy link
Contributor Author

mhenc commented Feb 10, 2022

cc: @potiuk

@mhenc mhenc force-pushed the callbacks_sink branch 2 times, most recently from 57116bb to b073276 Compare February 14, 2022 09:02
:param callback: Callback request to be executed.
"""
try:
self._get_sink_pipe().send(callback)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be a callable instead of just the value directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If for some reason the DagFileProcessorManager fails, it restarted by heartbeat_manager method:
https://github.com/apache/airflow/blob/main/airflow/dag_processing/manager.py#L311

which recreates the pipes:
https://github.com/apache/airflow/blob/main/airflow/dag_processing/manager.py#L137

Rename callbacks_sink to callback_sink
@mhenc mhenc requested a review from ashb February 15, 2022 13:27
@mhenc mhenc requested a review from ashb February 18, 2022 09:11
@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Feb 18, 2022
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@uranusjr
Copy link
Member

uranusjr commented Feb 18, 2022

Do we need an compatibility shim at airflow.utils.callback_requests?

@ashb
Copy link
Member

ashb commented Feb 18, 2022

I don't think so -- it's not used by Dag authors was my thinking.

@@ -633,7 +634,7 @@ def _process_executor_events(self, session: Session = None) -> int:
simple_task_instance=SimpleTaskInstance(ti),
msg=msg % (ti, state, ti.state, info),
)
self.processor_agent.send_callback_to_execute(request)
self.executor.send_callback(request)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Going through executor is what we discussed :).

@potiuk
Copy link
Member

potiuk commented Feb 20, 2022

Re: the errors - Just rebase @mhenc to see if it was a broken base. I do not recall similar error so the error might be actually caused by some circular import - maybe some imports need to be changed to local imports to handle that.

@mhenc mhenc requested review from ashb, potiuk and kaxil February 21, 2022 09:49
@potiuk potiuk merged commit 2a615ab into apache:main Feb 22, 2022
@mhenc mhenc deleted the callbacks_sink branch March 8, 2022 14:38
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Apr 8, 2022
@pingzh
Copy link
Contributor

pingzh commented Aug 19, 2022

@mhenc do you think we can unify the PipeCallbackSink and DatabaseCallbackSink to DatabaseCallbackSink? I think those two both serve as async way to pass callbacks to processor. DatabaseCallbackSink is durable since callbacks are stored in the db, restarting the scheduler won't loss them.

It will greatly simplify the code. Please let me know your thoughts.

cc @potiuk @ashb @uranusjr @kaxil

@mhenc
Copy link
Contributor Author

mhenc commented Aug 22, 2022

@pingzh I think the reason why we kept PipeCallbackSink was backward compatibility - in non-standalone DagProcessor mode we don't need DB (as we can rely on pipes).
Note that using DatabaseCallbackSink may have some impact - on performance (not sure how much) and database usage (additional requests).
But I think we could discuss it.

cc: @potiuk @ashb @uranusjr @kaxil

@pingzh
Copy link
Contributor

pingzh commented Aug 22, 2022

@pingzh I think the reason why we kept PipeCallbackSink was backward compatibility - in non-standalone DagProcessor mode we don't need DB (as we can rely on pipes). Note that using DatabaseCallbackSink may have some impact - on performance (not sure how much) and database usage (additional requests). But I think we could discuss it.

cc: @potiuk @ashb @uranusjr @kaxil

@mhenc thanks for the response. I agree with the concerns. Does it still create the table when it uses non-standalone DagProcessor? If yes, I think it is still backward-compatible as callbacks are still invoked. Using pipe vs db is more like a implementation choice. My concern here is supporting both cases make the code more complex and testing harder.

@potiuk
Copy link
Member

potiuk commented Sep 5, 2022

I think it is simply faster. We alredy know that Database in a number of cases is our design. when you have local implementation, with pipes, everything is done via memory and it avoids any database access. We made a very similar decision in case of DB isolation (AIP-44) where it would have been probably "nicer" from development point of view to have one interface over RPC, but then using local calls is (as measured during tests) ca. 10% faster. Here, I think the impact of DB will be rather bigger, because you not only serialize and send stuff over network but also persistently store it in the storage so I'd say ~20% performance loss for local installation when using DB vs. Pipes is something I would expect.

Now - the question is, what is our optimization goal. You can optimize for performance or esiness of maintenance, but more often than not, those two are not going together and you have to choose one of them (the usual thing with optimisation - you gain one thing, you loose another).

I thought about it and I think in our case optimizing for performance rather than easiness of maintenance is preferred. And it is likely very different when you develop some local customization even for big company like AirBnB. Philosophically at least. I have no "hard numberrs" to prove it, but the "back-of-the-envelope" calculation you can make indicate that in this case performance is more important.

Why?

When you are developing for one installation (like in AirbnB) i.e. internal development for single installation), then simplicity of maintenance is likely more important than performance - simply speaking whatever you can gain for performance will be paid back in the salary of people who will spend time for maintaining it (roughly speaking of course). In this case it all translates to money - either that you pay for computing power or for people time. And in case you are developing only for that one company even for big installation, performance gain is not "huge" when we talk about 10-20% difference. And of course it's not a "total performance" impact - Airflow is much more than that, but similarly "maintennce" overhead is not "total" either - this is but a tiny fraction of Airlfow code we are talking about.

Overall, I think this is very different when you have 10.000 installation of Airflow out there and you deliver a version that those 10.000 installations are using (of course the 10.000 number is out of the blue. but roughtly reflects the order of magnitude).

If you have one (or even few people) spending a little more time on maintenance and you save 10% for performance for 10.000 installations out there, the importance of performance increase vs. maintenance effort increase has a very much different relation. The "performance gain" is distributed of course vs "maintenance time loss" is centrlized in this case but net effect is that it's much more important to optimize for performance in our case in general. If only for environmental effect - less energy used overall, less waste generated - it's worth it.

Of course there other aspects - how easily it wil be to add new features and evolve it. But in this case, with the right level of abstraction and testing, I don't think this hampers our ability to extend the mechanism significatntly.

Going with this line of thought - I believe in most cases where we make such decision (like AIP-44 for example) performance should be our primary concern. Of course "premature optimization is the root of all evil" but in this case it's quite clear that by having maintenance "a little" more difficult, we gain substantial performance gains.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-43 DAG processor separation AIP-43 area:Scheduler including HA (high availability) scheduler changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants