Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/async #172

Merged
merged 47 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
56a55a3
Add job poller class
JonatanMartens Apr 27, 2021
06dd393
Fix request timeout being passed as job timeout
JonatanMartens Apr 29, 2021
bc2cc03
Add pytest-asyncio to dev dependencies
JonatanMartens May 4, 2021
09687e2
Remove old zeebe_workflow_adapter file
JonatanMartens May 4, 2021
53bfbab
Add asyncio support for workers and clients
JonatanMartens May 4, 2021
03e950a
Turn exception handlers async
JonatanMartens May 7, 2021
60fec95
Add support for async decorators
JonatanMartens May 7, 2021
3367d88
Connect to zeebe after ZeebeAdapter creation
JonatanMartens May 8, 2021
1239536
Add pyzeebe version 3.x.x to version table
JonatanMartens May 8, 2021
fee0516
Update README to include async examples
JonatanMartens May 8, 2021
7baae29
Update documentation to include async support
JonatanMartens May 8, 2021
aa55d76
Update examples to include async support
JonatanMartens May 8, 2021
5db4520
Optimize imports
JonatanMartens May 8, 2021
b7ddcd6
Optimize imports for tests
JonatanMartens May 8, 2021
d6a7c95
Add asyncmock to dev dependencies
JonatanMartens May 8, 2021
f046e94
Import mocks from mock package instead of unittest
JonatanMartens May 15, 2021
33cce84
Lock pipenv after rebase
JonatanMartens May 15, 2021
d6855c4
Fix tests after rebase
JonatanMartens May 18, 2021
2020439
Reintroduce task_state and a limit of concurrent jobs
JonatanMartens May 23, 2021
d54e4c2
Add job to task_state after polling
JonatanMartens May 23, 2021
963df27
Bump pytest-asyncio to 0.15.1
JonatanMartens May 23, 2021
907a587
Change activate_jobs return type to AsyncGenerator
JonatanMartens May 23, 2021
8533ef0
Change asyncify_decorators return type to TaskConfig
JonatanMartens May 23, 2021
8fe593f
Remove returned job in execute_one_job
JonatanMartens May 23, 2021
1ed0a62
Bump zeebe version to 1.0.0
JonatanMartens May 23, 2021
6b990e4
Add disconnect method to ZeebeAdapterBase
JonatanMartens May 23, 2021
ea83a4c
Use one worker and client accross all integration tests
JonatanMartens May 23, 2021
e0e4cc7
Fix unexpected name argument in asyncio.create_task
JonatanMartens May 23, 2021
825c330
Bump version to 3.0.0
JonatanMartens May 28, 2021
682b530
Add type annotation to jobs_queue to satisfy mypy
JonatanMartens May 29, 2021
12d1fb1
Split function tools from task_builder
JonatanMartens May 29, 2021
1a70fac
Sort imports
JonatanMartens May 29, 2021
b9eb2bd
Preserve function signature when converting to dict function
JonatanMartens Jun 4, 2021
9e7f3de
Remove duplicate job subscription to task_state
JonatanMartens Jun 4, 2021
de9ab9d
Change logger.warn to logger.warning
JonatanMartens Jun 4, 2021
de17a2f
Clarify comment in README example
JonatanMartens Jun 4, 2021
ae96240
Use aiofiles instead to open files
JonatanMartens Jun 6, 2021
f2c5535
Merge branch 'feature/async-worker' of github.com:camunda-community-h…
JonatanMartens Jun 6, 2021
e6b6c64
Add SyncZeebeClient for backwards compatability
JonatanMartens Jun 12, 2021
ac0bec9
Fix path to exceptions in docs
JonatanMartens Jun 12, 2021
b5fa169
Ignore type hints in sync client
JonatanMartens Jun 12, 2021
3e3d24f
Bump version from 3.0.0 to 3.0.0rc1
JonatanMartens Jun 12, 2021
b991e51
Stop worker in integration tests
JonatanMartens Jun 12, 2021
3336d61
Move unrelated functions from zeebe_adapter_base
JonatanMartens Jun 12, 2021
0ba5588
Add type hints to ZeebeAdapterBase tests
JonatanMartens Jun 12, 2021
90e2df4
Fix imports
JonatanMartens Jun 14, 2021
516e476
Add import checking to ci using isort
JonatanMartens Jun 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ asyncmock = "~=0.4.2"
oauthlib = "~=3.1.0"
requests-oauthlib = "~=1.3.0"
zeebe-grpc = "~=1.0.0"
aiofiles = "~=0.7.0"

[pipenv]
allow_prereleases = true
154 changes: 81 additions & 73 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def example_task(input: str) -> dict:
async def another_example_task(name: str) -> dict: # Tasks can also be async
return {"output": f"Hello world, {name} from async task!"}

asyncio.run(worker.work()) # Now every time that a task with type example is called example_task will be called
asyncio.run(worker.work()) # Now every time that a task with type `example` or `example2` is called, the corresponding function will be called
```

Stop a worker:
Expand Down
4 changes: 4 additions & 0 deletions docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ Client Reference
.. autoclass:: pyzeebe.ZeebeClient
:members:
:undoc-members:

.. autoclass:: pyzeebe.SyncZeebeClient
:members:
:undoc-members:
8 changes: 4 additions & 4 deletions docs/exceptions.rst → docs/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ All ``pyzeebe`` exceptions inherit from :py:class:`PyZeebeError`

.. autoexception:: pyzeebe.errors.MessageAlreadyExistsError

.. autoexception:: pyzeebe.exceptions.ProcessDefinitionNotFoundError
.. autoexception:: pyzeebe.errors.ProcessDefinitionNotFoundError

.. autoexception:: pyzeebe.exceptions.ProcessInstanceNotFoundError
.. autoexception:: pyzeebe.errors.ProcessInstanceNotFoundError

.. autoexception:: pyzeebe.exceptions.ProcessDefinitionHasNoStartEventError
.. autoexception:: pyzeebe.errors.ProcessDefinitionHasNoStartEventError

.. autoexception:: pyzeebe.exceptions.ProcessInvalidError
.. autoexception:: pyzeebe.errors.ProcessInvalidError

.. autoexception:: pyzeebe.errors.InvalidJSONError

Expand Down
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ Table Of Contents
Client <client>
Worker <worker>
Decorators <decorators>
Exceptions <exceptions>
Exceptions <errors>
4 changes: 2 additions & 2 deletions pyzeebe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from pyzeebe import errors
from pyzeebe.client.client import ZeebeClient
from pyzeebe.credentials.camunda_cloud_credentials import \
CamundaCloudCredentials
from pyzeebe.client.sync_client import SyncZeebeClient # type: ignore
from pyzeebe.credentials.camunda_cloud_credentials import CamundaCloudCredentials
from pyzeebe.credentials.oauth_credentials import OAuthCredentials
from pyzeebe.job.job import Job
from pyzeebe.job.job_status import JobStatus
Expand Down
31 changes: 31 additions & 0 deletions pyzeebe/client/sync_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# type: ignore

import asyncio
from typing import Dict, List, Tuple

from pyzeebe import ZeebeClient
from pyzeebe.credentials.base_credentials import BaseCredentials


class SyncZeebeClient(ZeebeClient):
def __init__(self, hostname: str = None, port: int = None, credentials: BaseCredentials = None,
secure_connection: bool = False, max_connection_retries: int = 10):
super().__init__(hostname, port, credentials,
secure_connection, max_connection_retries)
self.loop = asyncio.get_event_loop()

def run_process(self, bpmn_process_id: str, variables: Dict = None, version: int = -1) -> int:
return self.loop.run_until_complete(super().run_process(bpmn_process_id, variables, version))

def run_process_with_result(self, bpmn_process_id: str, variables: Dict = None, version: int = -1,
timeout: int = 0, variables_to_fetch: List[str] = None) -> Tuple[int, Dict]:
return self.loop.run_until_complete(super().run_process_with_result(bpmn_process_id, variables, version, timeout, variables_to_fetch))

def cancel_process_instance(self, process_instance_key: int) -> int:
return self.loop.run_until_complete(super().cancel_process_instance(process_instance_key))

def deploy_process(self, *process_file_path: str) -> None:
return self.loop.run_until_complete(super().deploy_process(*process_file_path))

def publish_message(self, name: str, correlation_key: str, variables: Dict = None, time_to_live_in_milliseconds: int = 60000, message_id: str = None) -> None:
return self.loop.run_until_complete(super().publish_message(name, correlation_key, variables, time_to_live_in_milliseconds, message_id))
3 changes: 3 additions & 0 deletions pyzeebe/function_tools/dict_tools.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import functools

from pyzeebe.function_tools import AsyncFunction, DictFunction


def convert_to_dict_function(single_value_function: AsyncFunction, variable_name: str) -> DictFunction:
@functools.wraps(single_value_function)
async def inner_fn(*args, **kwargs):
JonatanMartens marked this conversation as resolved.
Show resolved Hide resolved
return {variable_name: await single_value_function(*args, **kwargs)}

Expand Down
Loading