-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Devhawk/async #127
Devhawk/async #127
Conversation
) | ||
|
||
@classmethod | ||
async def get_workflow_status_async( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For async methods like this (and all the other DBOS
methods with _async
suffix), we have talked about moving them to a separate AsyncDBOS
type. Is that still the desired approach @chuck-dbos @kraftp @qianl15
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think yes
workflow_uuid, | ||
status, | ||
reset_recovery_attempts, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case above looks like it could have been changed to create the statement in the function, and that running the statement async or sync against the connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I suspect most if not all of the _some_func
/ some_func_sync
/ some_func_async
pattern methods could be implemented that way. I can change these to work that way if we'd rather
self, workflow_uuid: str | ||
) -> Optional[WorkflowStatusInternal]: | ||
async with self.async_engine.begin() as c: | ||
return await c.run_sync(self._get_workflow_status, workflow_uuid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a lot of these where await session.execute
could be tried.
actual_timeout = await self.sleep_async( | ||
workflow_uuid, timeout_function_id, timeout_seconds, skip_sleep=True | ||
) | ||
condition.wait(timeout=actual_timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So... this is not good if the code that is supposed to do the notification is in the same event loop, because it's a deadlock then. Probably it is not. But it is fiddly to reason about.
== oldest_entry_cte.c.created_at_epoch_ms, | ||
) | ||
.returning(SystemSchema.notifications.c.message) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this statement code be shared?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will fix
) | ||
|
||
payload = f"{target_uuid}::{key}" | ||
condition = threading.Condition() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio.Condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was worried about having different condition classes for sync/async versions. I will investigate further
) | ||
|
||
|
||
def start_workflow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a start_workflow_async that I missed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is not. start_workflow
is already async-ish in that it returns a WorkflowHandle
. Seemed odd to have a version that returned an Awaitable[WorkflowHandle]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but that is what is necessary to allow the event loop to proceed while the database call to record the workflow status is occurring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's 100% necessary
@@ -0,0 +1,52 @@ | |||
import uuid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a LOT more async tests than this. In particular, we need tests exercising every part of the async API just like for the sync API--because there's so much code duplication we need to test both sides of the code.
Depending on our API, we may also need tests for workflows that mix async and sync steps. If we don't want to support those, we have to deliberately design our API to make that impossible, for example by having a DBOS
and AsyncDBOS
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, though I'm not sure separating Sync vs Async DBOS will make it impossible
Add support for async workflow/step/transaction methods
Also moves internal code to
_core
package to indicate items that should not be directly importedfixes #112