Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add type hints #417

Merged
merged 1 commit into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 71 additions & 72 deletions poetry.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ oauthlib = "^3.1.0"
requests-oauthlib = ">=1.3.0,<3.0.0"
aiofiles = ">=0.7,<24"
zeebe-grpc = "^8.4.0"
typing-extensions = "^4.5.0"
typing-extensions = "^4.11.0"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.4,<9.0"
Expand All @@ -33,7 +33,7 @@ pytest-grpc = "^0.8.0"
pytest-mock = "^3.11.1"
pylint = ">=2.17.5,<4.0.0"
black = "^23.7.0"
mypy = "^1.4.1"
mypy = "^1.10.0"
coveralls = "^3.3.1"
responses = ">=0.23.2,<0.26.0"
bump2version = "^1.0.1"
Expand All @@ -48,6 +48,8 @@ types-requests-oauthlib = ">=1.3.0,<3.0.0"

[tool.mypy]
python_version = "3.8"
packages = ["pyzeebe"]
strict = true

[[tool.mypy.overrides]]
module = [
Expand All @@ -57,7 +59,6 @@ module = [
]
ignore_missing_imports = true


[tool.pylint.master]
max-line-length = 120
disable = ["C0114", "C0115", "C0116"]
Expand Down
22 changes: 21 additions & 1 deletion pyzeebe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pyzeebe import errors
from pyzeebe.channel import *
from pyzeebe.client.client import ZeebeClient
from pyzeebe.client.sync_client import SyncZeebeClient # type: ignore
from pyzeebe.client.sync_client import SyncZeebeClient
from pyzeebe.credentials.base import CredentialsABC
from pyzeebe.credentials.camunda_identity import CamundaIdentityCredentials
from pyzeebe.credentials.plugins import AuthMetadataPlugin
Expand All @@ -14,3 +14,23 @@
from pyzeebe.task.types import TaskDecorator
from pyzeebe.worker.task_router import ZeebeTaskRouter
from pyzeebe.worker.worker import ZeebeWorker

__all__ = (
"errors",
"create_camunda_cloud_channel",
"create_insecure_channel",
"create_secure_channel",
"ZeebeClient",
"SyncZeebeClient",
"Job",
"JobStatus",
"ExceptionHandler",
"TaskConfig",
"TaskDecorator",
"ZeebeTaskRouter",
"default_exception_handler",
"ZeebeWorker",
"CredentialsABC",
"CamundaIdentityCredentials",
"AuthMetadataPlugin",
)
4 changes: 3 additions & 1 deletion pyzeebe/channel/camunda_cloud_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ def _get_access_token(url: str, client_id: str, client_secret: str, audience: st
},
)
response.raise_for_status()
return response.json()["access_token"]
access_token = response.json()["access_token"]
assert isinstance(access_token, str)
return access_token
except HTTPError as http_error:
raise InvalidOAuthCredentialsError(url=url, client_id=client_id, audience=audience) from http_error

Expand Down
4 changes: 2 additions & 2 deletions pyzeebe/channel/insecure_channel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Optional
from typing import Any, Dict, Optional

import grpc

Expand All @@ -7,7 +7,7 @@


def create_insecure_channel(
hostname: Optional[str] = None, port: Optional[int] = None, channel_options: Optional[Dict] = None
hostname: Optional[str] = None, port: Optional[int] = None, channel_options: Optional[Dict[str, Any]] = None
) -> grpc.aio.Channel:
"""
Create an insecure channel
Expand Down
4 changes: 2 additions & 2 deletions pyzeebe/channel/secure_channel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Optional
from typing import Any, Dict, Optional

import grpc

Expand All @@ -9,7 +9,7 @@
def create_secure_channel(
hostname: Optional[str] = None,
port: Optional[int] = None,
channel_options: Optional[Dict] = None,
channel_options: Optional[Dict[str, Any]] = None,
channel_credentials: Optional[grpc.ChannelCredentials] = None,
) -> grpc.aio.Channel:
"""
Expand Down
21 changes: 13 additions & 8 deletions pyzeebe/client/client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, Iterable, Optional, Tuple

import grpc
from typing_extensions import deprecated

from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from pyzeebe.types import Variables


class ZeebeClient(object):
class ZeebeClient:
"""A zeebe client that can connect to a zeebe instance and perform actions."""

def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = 10):
def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = 10) -> None:
"""
Args:
grpc_channel (grpc.aio.Channel): GRPC Channel connected to a Zeebe gateway
Expand All @@ -19,7 +20,11 @@ def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int =
self.zeebe_adapter = ZeebeAdapter(grpc_channel, max_connection_retries)

async def run_process(
self, bpmn_process_id: str, variables: Optional[Dict] = None, version: int = -1, tenant_id: Optional[str] = None
self,
bpmn_process_id: str,
variables: Optional[Variables] = None,
version: int = -1,
tenant_id: Optional[str] = None,
) -> int:
"""
Run process
Expand Down Expand Up @@ -50,12 +55,12 @@ async def run_process(
async def run_process_with_result(
self,
bpmn_process_id: str,
variables: Optional[Dict] = None,
variables: Optional[Variables] = None,
version: int = -1,
timeout: int = 0,
variables_to_fetch: Optional[List[str]] = None,
variables_to_fetch: Optional[Iterable[str]] = None,
tenant_id: Optional[str] = None,
) -> Tuple[int, Dict]:
) -> Tuple[int, Dict[str, Any]]:
"""
Run process and wait for the result.

Expand Down Expand Up @@ -153,7 +158,7 @@ async def publish_message(
self,
name: str,
correlation_key: str,
variables: Optional[Dict] = None,
variables: Optional[Variables] = None,
time_to_live_in_milliseconds: int = 60000,
message_id: Optional[str] = None,
tenant_id: Optional[str] = None,
Expand Down
13 changes: 7 additions & 6 deletions pyzeebe/client/sync_client.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import asyncio
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

import grpc
from typing_extensions import deprecated

from pyzeebe import ZeebeClient
from pyzeebe.types import Variables


class SyncZeebeClient:
def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = 10):
def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = 10) -> None:
self.loop = asyncio.get_event_loop()
self.client = ZeebeClient(grpc_channel, max_connection_retries)

def run_process(
self,
bpmn_process_id: str,
variables: Optional[Dict] = None,
variables: Optional[Variables] = None,
version: int = -1,
tenant_id: Optional[str] = None,
) -> int:
Expand All @@ -24,12 +25,12 @@ def run_process(
def run_process_with_result(
self,
bpmn_process_id: str,
variables: Optional[Dict] = None,
variables: Optional[Variables] = None,
version: int = -1,
timeout: int = 0,
variables_to_fetch: Optional[List[str]] = None,
tenant_id: Optional[str] = None,
) -> Tuple[int, Dict]:
) -> Tuple[int, Dict[str, Any]]:
return self.loop.run_until_complete(
self.client.run_process_with_result(
bpmn_process_id, variables, version, timeout, variables_to_fetch, tenant_id
Expand All @@ -50,7 +51,7 @@ def publish_message(
self,
name: str,
correlation_key: str,
variables: Optional[Dict] = None,
variables: Optional[Variables] = None,
time_to_live_in_milliseconds: int = 60000,
message_id: Optional[str] = None,
tenant_id: Optional[str] = None,
Expand Down
2 changes: 1 addition & 1 deletion pyzeebe/credentials/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pyzeebe.credentials.base import CredentialsABC


class AuthMetadataPlugin(grpc.AuthMetadataPlugin):
class AuthMetadataPlugin(grpc.AuthMetadataPlugin): # type: ignore[misc]
"""Custom authentication plugin with exception catching.

Args:
Expand Down
4 changes: 2 additions & 2 deletions pyzeebe/errors/credentials_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@


class InvalidOAuthCredentialsError(PyZeebeError):
def __init__(self, url: str, client_id: str, audience: str):
def __init__(self, url: str, client_id: str, audience: str) -> None:
super().__init__(
f"Invalid OAuth credentials supplied for {url} with audience {audience} and client id {client_id}"
)


class InvalidCamundaCloudCredentialsError(PyZeebeError):
def __init__(self, client_id: str, cluster_id: str):
def __init__(self, client_id: str, cluster_id: str) -> None:
super().__init__(f"Invalid credentials supplied for cluster {cluster_id} with client {client_id}")
6 changes: 3 additions & 3 deletions pyzeebe/errors/job_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class ActivateJobsRequestInvalidError(PyZeebeError):
def __init__(self, task_type: str, worker: str, timeout: int, max_jobs_to_activate: int):
def __init__(self, task_type: str, worker: str, timeout: int, max_jobs_to_activate: int) -> None:
msg = "Failed to activate jobs. Reasons:"
if task_type == "" or task_type is None:
msg = msg + "task_type is empty, "
Expand All @@ -17,12 +17,12 @@ def __init__(self, task_type: str, worker: str, timeout: int, max_jobs_to_activa


class JobAlreadyDeactivatedError(PyZeebeError):
def __init__(self, job_key: int):
def __init__(self, job_key: int) -> None:
super().__init__(f"Job {job_key} was already stopped (Completed/Failed/Error)")
self.job_key = job_key


class JobNotFoundError(PyZeebeError):
def __init__(self, job_key: int):
def __init__(self, job_key: int) -> None:
super().__init__(f"Job {job_key} not found")
self.job_key = job_key
16 changes: 10 additions & 6 deletions pyzeebe/function_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from typing import Awaitable, Callable, Dict, TypeVar, Union
from __future__ import annotations

Parameters = TypeVar("Parameters")
from typing import Any, Awaitable, Callable, Dict, Optional, TypeVar, Union

from typing_extensions import ParamSpec

Parameters = ParamSpec("Parameters")
ReturnType = TypeVar("ReturnType")

SyncFunction = Callable[[Parameters], ReturnType]
AsyncFunction = Callable[[Parameters], Awaitable[ReturnType]]
Function = Union[SyncFunction, AsyncFunction]
SyncFunction = Callable[Parameters, ReturnType]
AsyncFunction = Callable[Parameters, Awaitable[ReturnType]]
Function = Union[SyncFunction[Parameters, ReturnType], AsyncFunction[Parameters, ReturnType]]

DictFunction = Callable[[Parameters], Awaitable[Dict]]
DictFunction = Callable[Parameters, Awaitable[Optional[Dict[str, Any]]]]
22 changes: 14 additions & 8 deletions pyzeebe/function_tools/async_tools.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
from __future__ import annotations

import asyncio
import functools
from typing import List
from typing import Any, Iterable, List, TypeVar

from typing_extensions import ParamSpec, TypeIs

from pyzeebe.function_tools import AsyncFunction, Function, SyncFunction

P = ParamSpec("P")
R = TypeVar("R")


def asyncify_all_functions(functions: List[Function]) -> List[AsyncFunction]:
async_functions = []
def asyncify_all_functions(functions: Iterable[Function[..., Any]]) -> List[AsyncFunction[..., Any]]:
async_functions: list[AsyncFunction[..., Any]] = []
for function in functions:
if not is_async_function(function):
async_functions.append(asyncify(function))
else:
# Mypy doesn't correctly understand that this is an async function
async_functions.append(function) # type: ignore
async_functions.append(function)
return async_functions


def asyncify(task_function: SyncFunction) -> AsyncFunction:
def asyncify(task_function: SyncFunction[P, R]) -> AsyncFunction[P, R]:
@functools.wraps(task_function)
async def async_function(*args, **kwargs):
async def async_function(*args: P.args, **kwargs: P.kwargs) -> R:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, functools.partial(task_function, *args, **kwargs))

return async_function


def is_async_function(function: Function) -> bool:
def is_async_function(function: Function[P, R]) -> TypeIs[AsyncFunction[P, R]]:
# Not using inspect.iscoroutinefunction here because it doens't handle AsyncMock well
# See: https://bugs.python.org/issue40573
return asyncio.iscoroutinefunction(function)
10 changes: 8 additions & 2 deletions pyzeebe/function_tools/dict_tools.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import functools
from typing import Any, Dict, TypeVar

from typing_extensions import ParamSpec

from pyzeebe.function_tools import AsyncFunction, DictFunction

P = ParamSpec("P")
R = TypeVar("R")


def convert_to_dict_function(single_value_function: AsyncFunction, variable_name: str) -> DictFunction:
def convert_to_dict_function(single_value_function: AsyncFunction[P, R], variable_name: str) -> DictFunction[P]:
@functools.wraps(single_value_function)
async def inner_fn(*args, **kwargs):
async def inner_fn(*args: P.args, **kwargs: P.kwargs) -> Dict[str, Any]:
return {variable_name: await single_value_function(*args, **kwargs)}

return inner_fn
8 changes: 5 additions & 3 deletions pyzeebe/function_tools/parameter_tools.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

import inspect
from typing import List, Optional
from typing import Any, List, Optional

from pyzeebe.function_tools import Function
from pyzeebe.job.job import Job


def get_parameters_from_function(task_function: Function) -> Optional[List[str]]:
def get_parameters_from_function(task_function: Function[..., Any]) -> Optional[List[str]]:
function_signature = inspect.signature(task_function)
for _, parameter in function_signature.parameters.items():
if parameter.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
Expand All @@ -20,7 +22,7 @@ def get_parameters_from_function(task_function: Function) -> Optional[List[str]]
return [param.name for param in function_signature.parameters.values() if param.annotation != Job]


def get_job_parameter_name(function: Function) -> Optional[str]:
def get_job_parameter_name(function: Function[..., Any]) -> Optional[str]:
function_signature = inspect.signature(function)
params = list(function_signature.parameters.values())
for param in params:
Expand Down
2 changes: 1 addition & 1 deletion pyzeebe/grpc_internals/grpc_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import grpc


def is_error_status(rpc_error: grpc.aio.AioRpcError, *status_codes: grpc.StatusCode):
def is_error_status(rpc_error: grpc.aio.AioRpcError, *status_codes: grpc.StatusCode) -> bool:
return rpc_error.code() in status_codes
Loading
Loading