Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/async #172

Merged
merged 47 commits into from
Jun 14, 2021
Merged

Feature/async #172

merged 47 commits into from
Jun 14, 2021

Conversation

JonatanMartens
Copy link
Collaborator

Full asyncio support in pyzeebe.

Changes

  • A task can now either be a regular or an async function.
  • Decorators can now be async
  • Exception handlers have to be async
  • All of ZeebeClient's methods are async
  • ZeebeWorker.work is now async

API Updates

New Features

Add support for asyncio tasks.

Deprecations

  1. ZeebeWorker.work is now async
  2. Tasks can now be async (not mandatory)
  3. All of ZeebeClient's methods are now async
  4. Exception handlers must be async
  5. Decorators can now be async (not mandatory)
  6. grpc_internals now uses grpc.aio instead of regular grpc
    • This means that most methods found there are now async.

Enhancements

  1. Extract create_connection_uri and create_channel from ZeebeAdapter
  2. Some tests have undergone a small refactor
  3. Add method connect to ZeebeAdapter to improve testability and solve a grpc.aio loop issue (see commit message)

Checklist

  • Unit tests
  • Documentation

References

Implements #144, #143

JobPoller is a new class in preperation for pyzeebe's move to asyncio.
This commit adds support for asyncio in both workers and clients.
It also adds a new class called JobExecutor, thus simplifying the ZeebeWorker class.
This achieves two things:
1. Allows setup without unexpected connections to zeebe (side effects) in a constructor
2. Solves an issue where if creating a worker and then calling worker.work() later, grpc.aio would use a different event loop
The Asyncmock package adds the AsynMock class to this package, not unittest.mock.
@coveralls
Copy link

coveralls commented May 23, 2021

Pull Request Test Coverage Report for Build 937025768

  • 287 of 326 (88.04%) changed or added relevant lines in 24 files are covered.
  • 9 unchanged lines in 6 files lost coverage.
  • Overall coverage decreased (-2.6%) to 92.944%

Changes Missing Coverage Covered Lines Changed/Added Lines %
pyzeebe/grpc_internals/zeebe_message_adapter.py 4 5 80.0%
pyzeebe/grpc_internals/zeebe_process_adapter.py 22 25 88.0%
pyzeebe/grpc_internals/zeebe_adapter_base.py 24 28 85.71%
pyzeebe/grpc_internals/zeebe_job_adapter.py 16 20 80.0%
pyzeebe/worker/job_executor.py 32 37 86.49%
pyzeebe/worker/job_poller.py 32 40 80.0%
pyzeebe/worker/worker.py 8 22 36.36%
Files with Coverage Reduction New Missed Lines %
pyzeebe/credentials/camunda_cloud_credentials.py 1 90.0%
pyzeebe/grpc_internals/zeebe_adapter_base.py 1 90.74%
pyzeebe/task/task_config.py 1 90.91%
pyzeebe/task/task.py 1 84.62%
pyzeebe/worker/worker.py 1 67.39%
pyzeebe/credentials/oauth_credentials.py 4 86.21%
Totals Coverage Status
Change from base Build 845027740: -2.6%
Covered Lines: 685
Relevant Lines: 737

💛 - Coveralls

@kbakk kbakk self-requested a review June 1, 2021 15:47
README.md Outdated Show resolved Hide resolved
@kbakk
Copy link
Collaborator

kbakk commented Jun 1, 2021

Making the client async only introduces some limitations on where the client can be used.

The worker would typically be run in a dedicated worker process (I'd imagine), however the client may be used in a synchronous function (e.g. a Flask route).

I think we should offer a synchronous client in addition to an async client for this reason. The synchronous client can be a wrapper around the async client.

Copy link
Collaborator

@kbakk kbakk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments (only 1/3 through the review, continuing later)

pyzeebe/__init__.py Outdated Show resolved Hide resolved
@@ -0,0 +1,10 @@
from typing import Awaitable, Callable, Dict, TypeVar, Union
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this module be prefixed with _ to indicate it's internal?

pyzeebe/function_tools/dict_tools.py Show resolved Hide resolved
Copy link
Collaborator

@kbakk kbakk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Truly amazing work! Well done!

Have read through most of the changes, but there's a lot I don't fully understand here, and simply a lot of changes/additions, so I won't be able to review everything. I'll dive some more into the parts I'm more familiar with, and will add comments on them as well.

I think releasing this as a beta, and letting people test this, could be a good idea (maybe update the README in master to inform that there's effort going on, and we would like people to review or try it out).

pyzeebe/grpc_internals/zeebe_adapter_base.py Outdated Show resolved Hide resolved
pyzeebe/worker/task_router.py Show resolved Hide resolved
@kbakk
Copy link
Collaborator

kbakk commented Jun 2, 2021

A comment on the tests - they're blazing fast now! ✨🚀✨ 👏

Copy link
Collaborator

@kbakk kbakk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more input, after having tested.

I notice that pyzeebe/credentials/oauth_credentials.py is still using the synchronous requests module.

I came across this https://docs.authlib.org/en/latest/client/httpx.html#httpx-oauth-2-0 which could help solve that hurdle.


An issue I managed to consistently reproduce was the following:

  1. Create a basic worker that only prints the input variable
  2. Create a simple BPMN with 1 task, deploy 33 jobs on the broker - I used this oneliner: for i in {1..33} ; do echo "Submitting $i..." && zbctl-1.0.0 --insecure create instance "Process_1n8eewe" --variables "{\"num\": $i}" ; done
  3. Run the worker (important: it has to be started after the jobs have been deployed)

For me, the worker processes the 32 first jobs, then hangs.

I'm suspecting that it's getting RESOURCE_EXCHAUSTED but I'm not able to verify

pyzeebe/grpc_internals/zeebe_process_adapter.py Outdated Show resolved Hide resolved
tests/integration/integration_test.py Show resolved Hide resolved
pyzeebe/worker/job_executor.py Outdated Show resolved Hide resolved
pyzeebe/task/task_builder.py Outdated Show resolved Hide resolved
@JonatanMartens
Copy link
Collaborator Author

Making the client async only introduces some limitations on where the client can be used.

The worker would typically be run in a dedicated worker process (I'd imagine), however the client may be used in a synchronous function (e.g. a Flask route).

I think we should offer a synchronous client in addition to an async client for this reason. The synchronous client can be a wrapper around the async client.

I prefer to keep one interface. Adding a SyncZeebeClient (or whatever we would call it) would create a 'second' interface.

In practice, this class would just wrap all of ZeebeClient's methods using asyncio.run. This is something that the user can do just as easily, so I see no reason to include it in the library.

@JonatanMartens
Copy link
Collaborator Author

Truly amazing work! Well done!

Have read through most of the changes, but there's a lot I don't fully understand here, and simply a lot of changes/additions, so I won't be able to review everything. I'll dive some more into the parts I'm more familiar with, and will add comments on them as well.

I think releasing this as a beta, and letting people test this, could be a good idea (maybe update the README in master to inform that there's effort going on, and we would like people to review or try it out).

Good idea!

Co-authored-by: Kristoffer Bakkejord <[email protected]>
@kbakk
Copy link
Collaborator

kbakk commented Jun 4, 2021

I prefer to keep one interface. Adding a SyncZeebeClient (or whatever we would call it) would create a 'second' interface.

It's a second use case (use in synchronous context), so I think it may warrant that.

In practice, this class would just wrap all of ZeebeClient's methods using asyncio.run. This is something that the user can do just as easily, so I see no reason to include it in the library.

I think that if several users are to perform certain types of setup in a general way, it should be part of the library. Else they will have to reinvent the wheel, adding tests, possibly distributring it as a package (if it's used in different applications).

According to the Python 2020 developer survey, 89 % used a web framework without async support (Flask and Django - maybe Django was introducing async around then, but I assume most users didn't start using that feature right away). From this, I'd say running synchronously is the primary way this will be used.

Also, 100 % of the current use of ZeebeClient today is synchronous ;)

Another take on this is to keep the synchronous parts of the client that exists today.

Copy link
Collaborator

@kbakk kbakk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well done, looks good.

A minor suggestion/comment ony.

async def test_raises_on_invalid_process(self, zeebe_adapter: ZeebeProcessAdapter):
with patch("builtins.open") as mock_open:
mock_open.return_value = BytesIO()
open_mock: MagicMock
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer using the tmp_path fixture, when validating file reads/writes.

https://docs.pytest.org/en/6.2.x/tmpdir.html#the-tmp-path-fixture

@JonatanMartens JonatanMartens merged commit f746940 into pre-release/3.0.0 Jun 14, 2021
@JonatanMartens JonatanMartens deleted the feature/async-worker branch June 14, 2021 20:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3.0.0 Will be released in Zeebe 3.0.0 breaking change help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Asyncio ZeebeClient support Asyncio ZeebeWorker support Pyzeebe 3.0.0
3 participants