-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transport: AsyncTransport
plugin
#6626
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6626 +/- ##
==========================================
+ Coverage 78.00% 78.01% +0.02%
==========================================
Files 563 564 +1
Lines 41766 42504 +738
==========================================
+ Hits 32574 33154 +580
- Misses 9192 9350 +158 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Looks good, just to reiterate most important comments:
Why don't you just use Transport
instead of BlockingTransport
, since you set it one to the other? Now you have redundancy. I feel like this API is clear to me.
_BaseTransport -> Transport -> SshTransport
_BaseTransport -> AsyncTransport -> AsyncSshTransport
Will you make a PR in plumpy there so we can do a new release?
Tests I will review in the separate PR
@@ -119,7 +120,7 @@ pillow==10.1.0 | |||
platformdirs==3.11.0 | |||
plotly==5.17.0 | |||
pluggy==1.3.0 | |||
plumpy==0.22.3 | |||
plumpy@git+https://github.com/khsrali/plumpy.git@allow-async-upload-download#egg=plumpy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will you make a PR there so we can do a new release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes! Please review here: aiidateam/plumpy#272
utils/dependency_management.py
Outdated
if ( | ||
canonicalize_name(requirement_abstract.name) == canonicalize_name(requirement_concrete.name) | ||
and abstract_contains | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we remove this before merge? Otherwise it would be good to add some comment what the new if-else does. Hard to understand without context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to keep it, as it's very useful to pass CI when we make PRs like this, that are hooked to another PR, or branch of other repo with @
The problem is @
is not listed as a valid specifier
in class Specifier
.
This little change, basically, accepts @
as a valid specifier and will check if a hooked dependency is to the same "version" across all files, requirement-xx
and enviroment.yml
, etc...
This way, apart of this nice check, the dependency test fails and it still triggers the main unit tests test-presto
, test-3.xx
for such PRs.. (otherwise it won't)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few lines of comment to clarify this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice, perhaps would be better to separate into standalone PR for visibility.
btw: I started looking into using uv lockfile in #6640, seems like a better strategy than having to wrangle 4 different requirements files. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, this feature is already covered in the new PR #6640.
So I keep the changes temporarily for this PR only, and will revert 'utils/dependency_management.py'
before any merge.
src/aiida/transports/transport.py
Outdated
return str(path) | ||
|
||
|
||
class _BaseTransport: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this part of public API? I should use it if I create a new transport plugin? Or should I use Transport
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no this is private. No one should inherent from this except 'AsyncTransport', 'BlockingTransport'.
Only 'AsyncTransport', 'BlockingTransport' are the public ones -- to be used to create a new plugin--
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is problematic to have class like this, take the method get_safe_open_interval
as example.
def get_safe_open_interval(self):
"""Get an interval (in seconds) that suggests how long the user should wait
between consecutive calls to open the transport. This can be used as
a way to get the user to not swamp a limited number of connections, etc.
However it is just advisory.
If returns 0, it is taken that there are no reasons to limit the
frequency of open calls.
In the main class, it returns a default value (>0 for safety), set in
the _DEFAULT_SAFE_OPEN_INTERVAL attribute of the class. Plugins should override it.
:return: The safe interval between calling open, in seconds
:rtype: float
"""
return self._safe_open_interval
It says "Plugins should override it", then what is the point of define the method?
src/aiida/transports/transport.py
Outdated
|
||
|
||
# This is here for backwards compatibility | ||
Transport = BlockingTransport |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if this makes sense to make blocking the default one, especially if you expose both of them in the API. Shouldn't there be a public class for Blocking and Nonblocking transport which one should use to inherit from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just for backward compatibility as Giovanni suggested to call the former blocking Transport
, now as, BlockingTransport
@@ -164,7 +167,8 @@ def test_upload_local_copy_list( | |||
calc_info.local_copy_list = [[folder.uuid] + local_copy_list] | |||
|
|||
with node.computer.get_transport() as transport: | |||
execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) | |||
runner = get_manager().get_runner() | |||
runner.loop.run_until_complete(execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because execmanager.upload_calculation
is now a async function.. this way we can call it in a sync test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if you use the old way? The test just passes and continues before finishing the command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is very tricky to mix up the async programming and sync function, it is in general a very hard problem. This looks to me the runner.loop.run_until_complete
will block the running of the task until it complete so give no benefit after making these methods async. Is the create_task
the correct thing to use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I just asked Ali offline. This is only for tests and only for test the functionality of the implementation is correct. The async behaviors of four operations working together is not the purpose here.
I am about to finish #6627 which I think can benefit for the tests here as well. Please hold a bit for that. I'll try my best to get that one merge by Wednesday. |
I just followed what @giovannipizzi suggested. But agreed this makes more sense, so I'm gonna apply this changes..
Will do once my performance tests are ready.. |
Note to myself: |
utils/dependency_management.py
Outdated
if ( | ||
canonicalize_name(requirement_abstract.name) == canonicalize_name(requirement_concrete.name) | ||
and abstract_contains | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some minor changes
Note: |
Checklist:
|
Hi @khsrali, I merge #6640, so it should work now I guess. Can you resolve the conflict and try it again? Thanks. |
Thanks @unkcpz , now I face issues I never had before, lol:
actually I even tried to update the file using 'uv lock', still won't pass.. |
Sorry for the experience. We are now trying
I will push the fix now, but I basically only ran these two commands |
@agoscinski It would be nice to have it merged by the end of this week, because when I come back from holidays, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I give the implementations a first go. I was only checking the test_all_plugins.py
previous time where I also did changes.
TBH, I think the PR still requires some changes.
I personally think the huge inheritance pattern is the evil of a lot of our headaches, here it add more of this. Would you mind to have a read on
- https://python-patterns.guide/gang-of-four/composition-over-inheritance/
- https://realpython.com/python-protocol/?
The protocol can fit for both sync and async function, which means the AsyncTransport
can use the function name without "_async" as suffix. Then inside "daemon/execmanager.py", if the function is sync transport, it runs in the blocking manner in the coroutine, if it is async it is scheduled to the event loop.
For example:
remote_user = await transport.whoami() # instead of await transport.whoami_async()
In aiidateam/plumpy#272, the post https://textual.textualize.io/blog/2023/03/15/no-async-async-with-python/ was mentioned. For the transport, I think the idea can work well to have async usage under the hood and call sync function as well.
But anyway, it is more stylish requests from mine. I think the PR is a great effort to improve the performance with async ssh. I think @khsrali already did the most difficult part of understanding async behavior and benchmark workflow for proof the changes are correct. We can do a pair coding next year to also setter down the interface and stylish disagreement.
@@ -192,7 +192,7 @@ def _get_submit_command(self, submit_script): | |||
directory. | |||
IMPORTANT: submit_script should be already escaped. | |||
""" | |||
submit_command = f'bash {submit_script} > /dev/null 2>&1 & echo $!' | |||
submit_command = f'(bash {submit_script} > /dev/null 2>&1 & echo $!) &' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this change is related, can you move it to another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Command execution from asyncssh
library required this annotation, otherwise will not await it, therefore this change is related to this PR.
I've checked this change and it has no effect on expected behavior of command execution in paramiko
, so everything is safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write some comments on why this change does not affect the behavior of bash?
If I have a scheduler that is not direct but still run the bash command will it conflict with asyncssh?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only thing I can see happen since is that the printed PID with echo $!
is now printed after the next command because it is run concurrently now. This could be critical if we would rely on the printed PID, we want to read the PID from the echo command but get a different output. But as far as I checked, we do not rely on the printed PID but retrieve the pid using some long ps command (see _get_joblist_command
).
The gist is I don't think it interferes.
__all__ = ('Transport',) | ||
__all__ = ('AsyncTransport', 'Transport', 'TransportPath') | ||
|
||
TransportPath = Union[str, Path, PurePosixPath] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To deal with generic path typing, it is better to cover more I think:
PathLike = Union[AnyStr, os.PathLike]
In side the function, I'd rather all use pathlib.Path
instead of str
. The reason is we are all move to pathlib.Path
in other module among the code base.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggeestion.
str
still has to be supported, because there are plugins that have direct call on transport methods with srt
paths. For example in QE, there exist one or two call. Other plugins I have not checked.
And about covering more types, I'd suggest we do it when a concrete usecase showed up.
AnyStr
also includes bytes
which I believe we don't need.
os.PathLike
is very inclusive, and allows for custom paths, although I agree it's nice, but don't see why we would need that right now.
I defined that way, to be very specific what paths we support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type annotation change is not related to the main change here for adding AsyncTransport plugin, can you make it independent PR or commit to make it easy to see what are the main changes that requires for adding async transport?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate PR would nicer, but at least mention this in the commit message if not.
src/aiida/transports/transport.py
Outdated
return str(path) | ||
|
||
|
||
class _BaseTransport: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is problematic to have class like this, take the method get_safe_open_interval
as example.
def get_safe_open_interval(self):
"""Get an interval (in seconds) that suggests how long the user should wait
between consecutive calls to open the transport. This can be used as
a way to get the user to not swamp a limited number of connections, etc.
However it is just advisory.
If returns 0, it is taken that there are no reasons to limit the
frequency of open calls.
In the main class, it returns a default value (>0 for safety), set in
the _DEFAULT_SAFE_OPEN_INTERVAL attribute of the class. Plugins should override it.
:return: The safe interval between calling open, in seconds
:rtype: float
"""
return self._safe_open_interval
It says "Plugins should override it", then what is the point of define the method?
Please be aware that failed test of py3.10 can be caused by the changes of this PR.
It says the event loop is closed, it might caused because you use |
Hi @unkcpz , yes they seem to be flaky. Edit: haha, now they pass, lol |
It might work because the timeout is beyond the total testing time (set 40 minutes now) so no test is killed by the timeout. It might be because the
https://pypi.org/project/pytest-timeout/ Further when asking a chatbot
I was not able to verify this statement, but it is easy to test it by using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I give it another go through after the discussion on the interface.
I have three major requests:
- It requires to understand why the change below code is required for make asyncssh work.
(old)
submit_command = f'bash {submit_script} > /dev/null 2>&1 & echo $!'
(new)
submit_command = f'(bash {submit_script} > /dev/null 2>&1 & echo $!) &'
It is the change in direct scheduler, so what if there are scheduler that use same bash command as direct one, do they all need to adapted?
-
After ♻️ Make
Process.run
async plumpy#272, the aiida-core should adjust therun
method of process to async. I understand it is mainly to support AsyncTransport, but from the usage it is independent of this PR. Since it is a breaking change, it better to be separated as single commit out so easier to debug by commit (if something wrong in the future, we know it is because of transport or because ofrun
method call change).
I think it requires quite less change in the code base, by only change theupload_calculation
to await function and leave the inner function call unchanged. -
The type annotation changes to str, Path is not related to this PR. I'd recommend to do it in a separated PR to future discuss whether we need use more generic type as I recommended
PathLike
or be more specific to beUnion[Path, str, PathPosix]
.
The PR overall looks super nice and I think I was convinced that having *_sync
is a good API design. Just for the engine part, if possible separate it out can make this PR only transport related and we are more confident to move forward.
@khsrali I know it is a bit frustrated that I still have some requests, I also understand that move changes to be independent PR can be tedious. I'd happy to help. If you don't mind I can do separate the PR out for type annotation and engine change. Or we can do a pair coding to do it together.
Thanks @agoscinski, in the doc it also says "This (using thread for timeout) is the surest and most portable method.", so let's give it a try. |
Yes, I rerun and they passed, those were from |
Dear @unkcpz
Like we discussed before in person, this change is required in case a computer is set up with The change is compatible with other
I think this is a good point. I will squash everything into two commits accordingly.
Thank you for nice words, @unkcpz. |
AsyncTransport
pluginAsyncTransport
plugin
c3bf18b
to
28d7971
Compare
The performance numbers look awesome?
Out of curiosity, which Python version have you been using for the performance measurements? |
I used Python |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also mimik test_ssh.py for async. I talked with @khsrali and he will do it in the next PR, since this PR already quite large.
Could you go to the TODOs that you have added and make issues or resolve them?
@@ -192,7 +192,7 @@ def _get_submit_command(self, submit_script): | |||
directory. | |||
IMPORTANT: submit_script should be already escaped. | |||
""" | |||
submit_command = f'bash {submit_script} > /dev/null 2>&1 & echo $!' | |||
submit_command = f'(bash {submit_script} > /dev/null 2>&1 & echo $!) &' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only thing I can see happen since is that the printed PID with echo $!
is now printed after the next command because it is run concurrently now. This could be critical if we would rely on the printed PID, we want to read the PID from the echo command but get a different output. But as far as I checked, we do not rely on the printed PID but retrieve the pid using some long ps command (see _get_joblist_command
).
The gist is I don't think it interferes.
return value | ||
|
||
|
||
class AsyncSshTransport(AsyncTransport): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried to class AsyncSshTransport(AsyncTransport, SshTransport):
? That definitely would save a lot of code repetition but I am not 100% sure if that results in some weird behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked with @khsrali. Most function here are actually only temporary copied. They will change in the future to exploit the fact that functions are executed async to be more performant. Also async at the moment is a limited/simplified version of the ssh interface.
__all__ = ('Transport',) | ||
__all__ = ('AsyncTransport', 'Transport', 'TransportPath') | ||
|
||
TransportPath = Union[str, Path, PurePosixPath] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate PR would nicer, but at least mention this in the commit message if not.
src/aiida/transports/transport.py
Outdated
def path_to_str(path: TransportPath) -> str: | ||
"""Convert an instance of TransportPath = Union[str, Path, PurePosixPath] instance to a string.""" | ||
# We could check if the path is a Path or PurePosixPath instance, but it's too much overhead. | ||
return str(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm... like this implemented I don't see the difference in just using str(path)
, is this somehow related to the type checker? But even then I don't understand. Did you want to do an instance check here and forgot it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked with @khsrali. We agreed on removing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
utils/dependency_management.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why changing permission of this devop file is needed? If there is reason behind, better to have a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a mistake!
thanks for noticing, I'll take it off in the next commit.
5347271
to
2a07b6f
Compare
2a07b6f
to
91ea017
Compare
@agoscinski I applied your review. Also opened an issue for all TODOs as you requested: #6719 |
This PR proposes many changes to make transport tasks asynchronous. This ensures that the daemon won’t be blocked by time-consuming tasks such as uploads, downloads, and similar operations, requested by @giovannipizzi.
Here’s a summary of the main updates:
AsyncSshTransport
with the entry pointcore.ssh_async
.AsyncSshTransport
supports executing custom scripts before connections, which is particularly useful for authentication. 🥇transport.chdir()
andtransport.getcwd()
(merged inTransport
&Engine
: factor outgetcwd()
&chdir()
for compatibility with upcoming async transport #6594).AsyncSshTransport
.Transport
class. Introduces_BaseTransport
,Transport
, andAsyncTransport
as replacements.Transport
, while asynchronous ones should inherit fromAsyncSshTransport
.test_all_plugins.py
to reflect these changes. Unfortunately, existing tests for transport plugins remain minimal and need improvement in a separate PR (TODO).TransportPath
type and upgrades transport plugins to work withUnion[str, Path, PurePosixPath]
.copy_from_remote_to_remote_async
, addressing a previous issue where such tasks blocked the entire daemon.Dependencies: This PR relies on PR 272 in plumpy.
Test Results: Performance Comparisons
When
core.ssh_async
OutperformsIn scenarios where the daemon is blocked by heavy transfer tasks (uploading/downloading/copying large files),
core.ssh_async
shows significant improvement.For example, I submitted two WorkGraphs:
touch file
.The time taken until the submit command is processed (with one daemon running):
core.ssh_async
: Only 4 seconds! 🚀🚀🚀🚀 A major improvement!core.ssh
: 108 seconds (WorkGraph 1 fully completes before processing the second).When
core.ssh_async
andcore.ssh
Are ComparableFor tasks involving both (and many!) uploads and downloads (a common scenario), performance varies slightly depending on the case.
Large Files (~1 GB):
core.ssh_async
performs better due to simultaneous uploads and downloads. In some networks, this can almost double the bandwidth, as demonstrated in the graph below. My bandwidth is 11.8 MB/s but increased to nearly double under favorable conditions:However, under heavy network load, bandwidth may revert to its base level (e.g., 11.8 MB/s):
Test Case: Two WorkGraphs: one uploads 1 GB, the other retrieves 1 GB using
RemoteData
.core.ssh_async
: 120 secondscore.ssh
: 204 secondsSmall Files (Many Small Transfers):
core.ssh_async
: 105 secondscore.ssh
: 65 secondsIn this scenario, the overhead of asynchronous calls seems to outweigh the benefits. We need to discuss the trade-offs and explore possible optimizations. As @agoscinski mentioned, this might be expected, see here async overheads.
--- update on 16.01.2025
Some of these changes has moved to a separate PR Engine: Async run #6708