Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/async #172

Merged
merged 47 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
56a55a3
Add job poller class
JonatanMartens Apr 27, 2021
06dd393
Fix request timeout being passed as job timeout
JonatanMartens Apr 29, 2021
bc2cc03
Add pytest-asyncio to dev dependencies
JonatanMartens May 4, 2021
09687e2
Remove old zeebe_workflow_adapter file
JonatanMartens May 4, 2021
53bfbab
Add asyncio support for workers and clients
JonatanMartens May 4, 2021
03e950a
Turn exception handlers async
JonatanMartens May 7, 2021
60fec95
Add support for async decorators
JonatanMartens May 7, 2021
3367d88
Connect to zeebe after ZeebeAdapter creation
JonatanMartens May 8, 2021
1239536
Add pyzeebe version 3.x.x to version table
JonatanMartens May 8, 2021
fee0516
Update README to include async examples
JonatanMartens May 8, 2021
7baae29
Update documentation to include async support
JonatanMartens May 8, 2021
aa55d76
Update examples to include async support
JonatanMartens May 8, 2021
5db4520
Optimize imports
JonatanMartens May 8, 2021
b7ddcd6
Optimize imports for tests
JonatanMartens May 8, 2021
d6a7c95
Add asyncmock to dev dependencies
JonatanMartens May 8, 2021
f046e94
Import mocks from mock package instead of unittest
JonatanMartens May 15, 2021
33cce84
Lock pipenv after rebase
JonatanMartens May 15, 2021
d6855c4
Fix tests after rebase
JonatanMartens May 18, 2021
2020439
Reintroduce task_state and a limit of concurrent jobs
JonatanMartens May 23, 2021
d54e4c2
Add job to task_state after polling
JonatanMartens May 23, 2021
963df27
Bump pytest-asyncio to 0.15.1
JonatanMartens May 23, 2021
907a587
Change activate_jobs return type to AsyncGenerator
JonatanMartens May 23, 2021
8533ef0
Change asyncify_decorators return type to TaskConfig
JonatanMartens May 23, 2021
8fe593f
Remove returned job in execute_one_job
JonatanMartens May 23, 2021
1ed0a62
Bump zeebe version to 1.0.0
JonatanMartens May 23, 2021
6b990e4
Add disconnect method to ZeebeAdapterBase
JonatanMartens May 23, 2021
ea83a4c
Use one worker and client accross all integration tests
JonatanMartens May 23, 2021
e0e4cc7
Fix unexpected name argument in asyncio.create_task
JonatanMartens May 23, 2021
825c330
Bump version to 3.0.0
JonatanMartens May 28, 2021
682b530
Add type annotation to jobs_queue to satisfy mypy
JonatanMartens May 29, 2021
12d1fb1
Split function tools from task_builder
JonatanMartens May 29, 2021
1a70fac
Sort imports
JonatanMartens May 29, 2021
b9eb2bd
Preserve function signature when converting to dict function
JonatanMartens Jun 4, 2021
9e7f3de
Remove duplicate job subscription to task_state
JonatanMartens Jun 4, 2021
de9ab9d
Change logger.warn to logger.warning
JonatanMartens Jun 4, 2021
de17a2f
Clarify comment in README example
JonatanMartens Jun 4, 2021
ae96240
Use aiofiles instead to open files
JonatanMartens Jun 6, 2021
f2c5535
Merge branch 'feature/async-worker' of github.com:camunda-community-h…
JonatanMartens Jun 6, 2021
e6b6c64
Add SyncZeebeClient for backwards compatability
JonatanMartens Jun 12, 2021
ac0bec9
Fix path to exceptions in docs
JonatanMartens Jun 12, 2021
b5fa169
Ignore type hints in sync client
JonatanMartens Jun 12, 2021
3e3d24f
Bump version from 3.0.0 to 3.0.0rc1
JonatanMartens Jun 12, 2021
b991e51
Stop worker in integration tests
JonatanMartens Jun 12, 2021
3336d61
Move unrelated functions from zeebe_adapter_base
JonatanMartens Jun 12, 2021
0ba5588
Add type hints to ZeebeAdapterBase tests
JonatanMartens Jun 12, 2021
90e2df4
Fix imports
JonatanMartens Jun 14, 2021
516e476
Add import checking to ci using isort
JonatanMartens Jun 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-zeebe-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
zeebe-version: [ "1.0.0-rc3" ]
zeebe-version: [ "1.0.0" ]

container: python:3.6

Expand Down
5 changes: 4 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ pyzeebe = {editable = true, path = "."}
sphinx = "~=3.5.2"
sphinx-rtd-theme = "*"
pytest-mock = "*"
pytest-asyncio = "~=0.15.1"
asyncmock = "~=0.4.2"

[packages]
oauthlib = "~=3.1.0"
requests-oauthlib = "~=1.3.0"
zeebe-grpc = "~=1.0.0rc2"
zeebe-grpc = "~=1.0.0"
aiofiles = "~=0.7.0"

[pipenv]
allow_prereleases = true
508 changes: 264 additions & 244 deletions Pipfile.lock

Large diffs are not rendered by default.

28 changes: 17 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Zeebe version support:

| Pyzeebe version | Tested Zeebe versions |
| :-------------: | ---------------------- |
| 3.x.x | 1.0.0 |
| 2.x.x | 0.23, 0.24, 0.25, 0.26 |
| 1.x.x | 0.23, 0.24 |

Expand All @@ -34,33 +35,38 @@ For full documentation please visit: https://pyzeebe.readthedocs.io/en/stable/
The `ZeebeWorker` class uses threading to get and run jobs.

```python
import asyncio

from pyzeebe import ZeebeWorker, Job


def on_error(exception: Exception, job: Job):
async def on_error(exception: Exception, job: Job):
"""
on_error will be called when the task fails
"""
print(exception)
job.set_error_status(f"Failed to handle job {job}. Error: {str(exception)}")
await job.set_error_status(f"Failed to handle job {job}. Error: {str(exception)}")



worker = ZeebeWorker(hostname="<zeebe_host>", port=26500) # Create a zeebe worker

@worker.task(task_type="example", exception_handler=on_error)
def example_task(input: str):
def example_task(input: str) -> dict:
return {"output": f"Hello world, {input}!"}


worker.work() # Now every time that a task with type example is called example_task will be called
@worker.task(task_type="example2", exception_handler=on_error)
async def another_example_task(name: str) -> dict: # Tasks can also be async
return {"output": f"Hello world, {name} from async task!"}

asyncio.run(worker.work()) # Now every time that a task with type `example` or `example2` is called, the corresponding function will be called
```

Stop a worker:

```python
zeebe_worker.work() # Worker will begin working
zeebe_worker.stop() # Stops worker after all running jobs have been completed
await zeebe_worker.stop() # Stops worker after all running jobs have been completed
```

### Client
Expand All @@ -72,22 +78,22 @@ from pyzeebe import ZeebeClient
zeebe_client = ZeebeClient(hostname="localhost", port=26500)

# Run a Zeebe process instance
process_instance_key = zeebe_client.run_process(bpmn_process_id="My zeebe process", variables={})
process_instance_key = await zeebe_client.run_process(bpmn_process_id="My zeebe process", variables={})

# Run a process and receive the result
process_instance_key, process_result = zeebe_client.run_process_with_result(
process_instance_key, process_result = await zeebe_client.run_process_with_result(
bpmn_process_id="My zeebe process",
timeout=10000
)

# Deploy a BPMN process definition
zeebe_client.deploy_process("process.bpmn")
await zeebe_client.deploy_process("process.bpmn")

# Cancel a running process
zeebe_client.cancel_process_instance(process_instance_key=12345)
await zeebe_client.cancel_process_instance(process_instance_key=12345)

# Publish message
zeebe_client.publish_message(name="message_name", correlation_key="some_id")
await zeebe_client.publish_message(name="message_name", correlation_key="some_id")

```

Expand Down
10 changes: 5 additions & 5 deletions docs/client_quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ To change connection retries:

.. code-block:: python

worker = ZeebeClient(max_connection_retries=1) # Will only accept one failure and disconnect upon the second
client = ZeebeClient(max_connection_retries=1) # Will only accept one failure and disconnect upon the second


This means the client will disconnect upon two consecutive failures. Each time the client connects successfully the counter is reset.
Expand Down Expand Up @@ -68,7 +68,7 @@ Run a Zeebe process instance

.. code-block:: python

process_instance_key = client.run_process("bpmn_process_id")
process_instance_key = await client.run_process("bpmn_process_id")


Run a process with result
Expand All @@ -78,7 +78,7 @@ To run a process and receive the result directly:

.. code-block:: python

process_instance_key, result = client.run_process_with_result("bpmn_process_id")
process_instance_key, result = await client.run_process_with_result("bpmn_process_id")

# result will be a dict

Expand All @@ -88,12 +88,12 @@ Deploy a process

.. code-block:: python

client.deploy_process("process_file.bpmn")
await client.deploy_process("process_file.bpmn")


Publish a message
-----------------

.. code-block:: python

client.publish_message(name="message_name", correlation_key="correlation_key")
await client.publish_message(name="message_name", correlation_key="correlation_key")
4 changes: 4 additions & 0 deletions docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ Client Reference
.. autoclass:: pyzeebe.ZeebeClient
:members:
:undoc-members:

.. autoclass:: pyzeebe.SyncZeebeClient
:members:
:undoc-members:
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
author = 'Jonatan Martens'

# The full version, including alpha/beta/rc tags
release = '2.3.1'
release = '3.0.0'

# -- General configuration ---------------------------------------------------

Expand Down Expand Up @@ -59,6 +59,6 @@
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']

version = "2.3.1"
version = "3.0.0"

master_doc = 'index'
13 changes: 11 additions & 2 deletions docs/decorators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
Decorators
==========

A ``pyzeebe`` decorator is a function that receives a :py:class:`Job` instance and returns a :py:class:`Job`.
A ``pyzeebe`` decorator is an async/sync function that receives a :py:class:`Job` instance and returns a :py:class:`Job`.

.. code-block:: python

Callable[[Job], Job]
Union[
Callable[[Job], Job],
Callable[[Job], Awaitable[Job]]
]

An example decorator:

Expand All @@ -16,6 +19,12 @@ An example decorator:
logging.info(job)
return job

# Or:

async def logging_decorator(job: Job) -> Job:
await async_logger.info(job)
return job

If a decorator raises an :class:`Exception` ``pyzeebe`` will just ignore it and continue the task/other decorators.

Task Decorators
Expand Down
8 changes: 4 additions & 4 deletions docs/exceptions.rst → docs/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ All ``pyzeebe`` exceptions inherit from :py:class:`PyZeebeError`

.. autoexception:: pyzeebe.errors.MessageAlreadyExistsError

.. autoexception:: pyzeebe.exceptions.ProcessDefinitionNotFoundError
.. autoexception:: pyzeebe.errors.ProcessDefinitionNotFoundError

.. autoexception:: pyzeebe.exceptions.ProcessInstanceNotFoundError
.. autoexception:: pyzeebe.errors.ProcessInstanceNotFoundError

.. autoexception:: pyzeebe.exceptions.ProcessDefinitionHasNoStartEventError
.. autoexception:: pyzeebe.errors.ProcessDefinitionHasNoStartEventError

.. autoexception:: pyzeebe.exceptions.ProcessInvalidError
.. autoexception:: pyzeebe.errors.ProcessInvalidError

.. autoexception:: pyzeebe.errors.InvalidJSONError

Expand Down
10 changes: 5 additions & 5 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ Creating a worker
worker = ZeebeWorker()

@worker.task(task_type="my_task")
def my_task(x: int):
async def my_task(x: int):
return {"y": x + 1}

worker.work()
await worker.work()

Creating a client

Expand All @@ -37,10 +37,10 @@ Creating a client

client = ZeebeClient()

client.run_process("my_process")
await client.run_process("my_process")

# Run process with variables:
client.run_process("my_process", variables={"x": 0})
await client.run_process("my_process", variables={"x": 0})


Dependencies
Expand All @@ -62,4 +62,4 @@ Table Of Contents
Client <client>
Worker <worker>
Decorators <decorators>
Exceptions <exceptions>
Exceptions <errors>
13 changes: 10 additions & 3 deletions docs/worker_quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ Create and start a worker

.. code-block:: python

import asyncio

from pyzeebe import ZeebeWorker


worker = ZeebeWorker()


@worker.task(task_type="my_task")
def my_task(x: int):
async def my_task(x: int):
return {"y": x + 1}

worker.work()
asyncio.run(worker.work())


Worker connection options
Expand Down Expand Up @@ -86,6 +88,11 @@ To add a task to the worker:
.. code-block:: python

@worker.task(task_type="my_task")
def my_task(x: int):
async def my_task(x: int):
return {"y": x + 1}

# Or using a non-async function:

@worker.task(task_type="my_task")
def second_task(x: int):
return {"y": x + 1}
2 changes: 1 addition & 1 deletion docs/worker_taskrouter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Creating a task with a router is the exact same process as wiht a :py:class:`Zee
.. code-block:: python

@router.task(task_type="my_task")
def my_task(x: int):
async def my_task(x: int):
return {"y": x + 1}


Expand Down
37 changes: 32 additions & 5 deletions docs/worker_tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,43 @@ To create a task you must first create a :py:class:`ZeebeWorker` or :py:class:`Z
.. code-block:: python

@worker.task(task_type="my_task")
def my_task():
async def my_task():
return {}

This is a task that does nothing. It receives no parameters and also doesn't return any.


.. note::

While this task indeed returns a python dictionary, it doesn't return anything to Zeebe. Do do that we have to fill the dictionary.
While this task indeed returns a python dictionary, it doesn't return anything to Zeebe. To do that we have to fill the dictionary with values.


Async/Sync Tasks
----------------

Tasks can be regular or async functions. If given a regular function, pyzeebe will convert it into an async one by running `asyncio.run_in_executor`

.. note::

Make sure not to call any blocking function in an async task. This would slow the entire worker down.

Do:

.. code-block:: python

@worker.task(task_type="my_task")
def my_task():
time.sleep(10) # Blocking call
return {}

Don't:

.. code-block:: python

@worker.task(task_type="my_task")
async def my_task():
time.sleep(10) # Blocking call
return {}

Task Exception Handler
----------------------
Expand All @@ -30,7 +57,7 @@ An exception handler's signature:

.. code-block:: python

Callable[[Exception, Job], None]
Callable[[Exception, Job], Awaitable[None]]

In other words: an exception handler is a function that receives an :class:`Exception` and :py:class:`Job` instance (a pyzeebe class).

Expand All @@ -43,9 +70,9 @@ To add an exception handler to a task:
from pyzeebe import Job


def my_exception_handler(exception: Exception, job: Job) -> None:
async def my_exception_handler(exception: Exception, job: Job) -> None:
print(exception)
job.set_failure_status(message=str(exception))
await job.set_failure_status(message=str(exception))


@worker.task(task_type="my_task", exception_handler=my_exception_handler)
Expand Down
12 changes: 6 additions & 6 deletions examples/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pyzeebe import ZeebeClient, CamundaCloudCredentials
from pyzeebe import CamundaCloudCredentials, ZeebeClient

# Create a zeebe client without credentials
zeebe_client = ZeebeClient(hostname="localhost", port=26500)
Expand All @@ -13,20 +13,20 @@
zeebe_client = ZeebeClient(credentials=camunda_cloud_credentials)

# Run a Zeebe instance process
process_instance_key = zeebe_client.run_process(
process_instance_key = await zeebe_client.run_process(
bpmn_process_id="My zeebe process", variables={})

# Run a Zeebe process instance and receive the result
process_instance_key, process_result = zeebe_client.run_process_with_result(
process_instance_key, process_result = await zeebe_client.run_process_with_result(
bpmn_process_id="My zeebe process",
timeout=10000
) # Will wait 10000 milliseconds (10 seconds)

# Deploy a bpmn process definition
zeebe_client.deploy_process("process.bpmn")
await zeebe_client.deploy_process("process.bpmn")

# Cancel a running process
zeebe_client.cancel_process_instance(process_instance_key=12345)
await zeebe_client.cancel_process_instance(process_instance_key=12345)

# Publish message
zeebe_client.publish_message(name="message_name", correlation_key="some_id")
await zeebe_client.publish_message(name="message_name", correlation_key="some_id")
Loading