-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reinstate broadcast subscription of workchain for child processes #4154
Reinstate broadcast subscription of workchain for child processes #4154
Conversation
d773a9e
to
dd57b17
Compare
This PR is blocked because it needs a new release of |
@@ -67,7 +67,7 @@ def __init__(self, poll_interval=0, loop=None, communicator=None, rmq_submit=Fal | |||
self._plugin_version_provider = PluginVersionProvider() | |||
|
|||
if communicator is not None: | |||
self._communicator = communicator | |||
self._communicator = plumpy.wrap_communicator(communicator, self._loop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @sphuber , as a reminder, since now you wrap the communicator here to LoopCommnicator
, then there is no need to add task subscriber with scheduling task_receiver callback to the process event loop by create_task
.
In file aiida/manage/manager.py::Manager::create_daemon_runner
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the heads-up @unkcpz . I will have a look and update the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@unkcpz I added this simplification in the first commit of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all good. Thanks!
dd57b17
to
fdcba39
Compare
Codecov Report
@@ Coverage Diff @@
## develop #4154 +/- ##
===========================================
+ Coverage 78.89% 78.90% +0.02%
===========================================
Files 467 467
Lines 34470 34489 +19
===========================================
+ Hits 27191 27211 +20
+ Misses 7279 7278 -1
Continue to review full report at Codecov.
|
4e8d88f
to
ebc6db9
Compare
@@ -67,7 +67,7 @@ def __init__(self, poll_interval=0, loop=None, communicator=None, rmq_submit=Fal | |||
self._plugin_version_provider = PluginVersionProvider() | |||
|
|||
if communicator is not None: | |||
self._communicator = communicator | |||
self._communicator = plumpy.wrap_communicator(communicator, self._loop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all good. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, all good except for that callback change. If you can think of an easy way to test the 'double resolution' of the callback that might be useful but I don't see it as necessary so I wouldn't spend much time on it.
aiida/engine/runners.py
Outdated
# The broadcast subscriber has already removed | ||
pass | ||
|
||
broadcast_filter = kiwipy.BroadcastFilter(functools.partial(inline_callback, resolved), sender=pk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This, unfortunately, doesn't work. Python has this sneaky thing where all primitives are passed by value other types by reference meaning that your resolved
in the callback is actually always a new (False
) boolean each time the function is invoked.
You can test it with this:
res = False
def do(res):
if res:
print("Already resolved")
return
res = True
print("Now resolved")
do(res)
do(res)
As ugly as it is, this is one solution:
res = [False]
def do(res):
if res[0]:
print("Already resolved")
return
res[0] = True
print("Now resolved")
do(res)
do(res)
But if you don't like it feel free to create anything where an non-primitive is used as a flag. I think at some point I may have created a special Flag
class just for this in some of my code. A threading.Event
is also a decent choice.
25d6231
to
076849d
Compare
This is necessary because otherwise subscribers that use the runner's communicator and receive a message will execute the callback on the communication thread. This should be prevented for two reasons. Firstly, it will cause the business logic to be executed on the communication thread which can cause it to fail to respond in time to heartbeat requests from RabbitMQ when it is under heavy load. Secondly, the communication thread does not share the same session as the main thread. This can cause problems when the callbacks query the database as it can be out of sync with the session of the main thread. For Django this will most likely go by unnoticed, but SqlAlchemy will except. Since the communicator of the runner is now wrapped, the process launcher of the daemon runner no longer has to be added as a task subscriber through a callback, but the `add_task_subscriber` can be called directly and the `LoopCommunicator` will take of everything.
076849d
to
bea3cb4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, good to go mate.
When a `WorkChain` step submits sub processes, awaitables will be created for them. Only when these awaitables have been resolved, meaning the subprocesses, have terminated, can the workchain continue to the next step. The original concept was, for each awaitable, to schedule a callback once the process had reached a terminal state. The callback was supposed to be triggered by having the runner add a broadcast subscriber that would listen for state changes of the sub process. As a fail-safe, a polling mechanism would also check periodically just in case the broadcast message would be missed and prevent the caller from waiting indefinitely. However, the broadcast subscriber was never added and so the system relied solely on the polling mechanism. This completely undermines the benefits of having an event-based mechanism, so in this commit the `Runner.call_on_process_finish` now also registers the broadcast subscriber. Note that the affected code had some references to `calculation` which has been generalized to `process`, since this also applies to workflows that might waited upon. The `CalculationFuture` has been renamed to `ProcessFuture` in similar vein. It is currently not used, but it could have been used for the problem that this commit solves, so it has been decided to leave it in for now and not remove it entirely.
bea3cb4
to
7441fb8
Compare
In PR aiidateam#4154, the LoopCommunicator was used as the runner's communicator. This makes it easy to schecule callbacks into the correct event loops, however, the `Process` has its own mechanism to do this. In object method `_schedule_rpc` process schedule the callback into its event loop. Using `LoopCommunitor` lead to a problem that when controller send the controlling message by rpc for example when calling `controller.pause_process` the `pause` subscriber is run by `plumpy.futures.create_task`, which is not necessary. `call_on_process_finish` receive a callback which not expected to be a coroutine, however, in `action_awaitables` we schedule `_run_task` which is a corotinefunction to it. The `call_soon` method of Process object is more proper here.
Fixes #4151
When a
WorkChain
step submits sub processes, awaitables will becreated for them. Only when these awaitables have been resolved, meaning
the subprocesses, have terminated, can the workchain continue to the
next step.
The original concept was, for each awaitable, to schedule a callback
once the process had reached a terminal state. The callback was supposed
to be triggered by having the runner add a broadcast subscriber that would
listen for state changes of the sub process. As a fail-safe, a polling
mechanism would also check periodically just in case the broadcast
message would be missed and prevent the caller from waiting indefinitely.
However, the broadcast subscriber was never added and so the system
relied solely on the polling mechanism. This completely undermines the
benefits of having an event-based mechanism, so in this commit the
Runner.call_on_process_finish
now also registers the broadcastsubscriber.
Note that the affected code had some references to
calculation
whichhas been generalized to
process
, since this also applies to workflowsthat might waited upon. The
CalculationFuture
has been renamed toProcessFuture
in similar vein. It is currently not used, but it couldhave been used for the problem that this commit solves, so it has been
decided to leave it in for now and not remove it entirely.