Skip to content

Commit

Permalink
Merge branch 'master' into renovate/pylint-3.x-lockfile
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastbk authored Sep 24, 2024
2 parents 9215f92 + f163708 commit adb27e8
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 36 deletions.
10 changes: 6 additions & 4 deletions pyzeebe/channel/camunda_cloud_channel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Optional

import grpc
from oauthlib import oauth2
Expand All @@ -10,14 +10,15 @@
InvalidCamundaCloudCredentialsError,
InvalidOAuthCredentialsError,
)
from pyzeebe.types import ChannelArgumentType


def create_camunda_cloud_channel(
client_id: str,
client_secret: str,
cluster_id: str,
region: str = "bru-2",
channel_options: Optional[Dict[str, Any]] = None,
channel_options: Optional[ChannelArgumentType] = None,
) -> grpc.aio.Channel:
"""
Create channel connected to a Camunda Cloud cluster
Expand All @@ -27,7 +28,8 @@ def create_camunda_cloud_channel(
client_secret (str): The client secret provided by Camunda Cloud
cluster_id (str): The zeebe cluster id to connect to
region (str): The cluster's region. Defaults to bru-2
channel_options (Optional[Dict], optional): GRPC channel options. See https://grpc.github.io/grpc/python/glossary.html
channel_options (Optional[Dict], optional): GRPC channel options.
See https://grpc.github.io/grpc/python/glossary.html
Returns:
grpc.aio.Channel: A GRPC Channel connected to the Zeebe gateway.
Expand All @@ -40,7 +42,7 @@ def create_camunda_cloud_channel(
return grpc.aio.secure_channel(
f"{cluster_id}.{region}.zeebe.camunda.io:443",
channel_credentials,
options=get_channel_options(channel_options or {}),
options=get_channel_options(channel_options),
)


Expand Down
35 changes: 21 additions & 14 deletions pyzeebe/channel/channel_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,34 @@
https://docs.camunda.io/docs/product-manuals/zeebe/deployment-guide/operations/setting-up-a-cluster/#keep-alive-intervals
"""

from typing import Any, Dict, Optional, Tuple
from typing import Optional

GRPC_CHANNEL_OPTIONS = {"grpc.keepalive_time_ms": 45_000}
from pyzeebe.types import ChannelArgumentType

GRPC_CHANNEL_OPTIONS_DEFAULT: ChannelArgumentType = (("grpc.keepalive_time_ms", 45_000),)

def get_channel_options(options: Optional[Dict[str, Any]] = None) -> Tuple[Tuple[str, Any], ...]:

def get_channel_options(options: Optional[ChannelArgumentType] = None) -> ChannelArgumentType:
"""
Convert options dict to tuple in expected format for creating the gRPC channel
Get default channel options for creating the gRPC channel.
Args:
options (Dict[str, Any]): A key/value representation of `gRPC channel arguments_`.
options (Optional[ChannelArgumentType]): A tuple of gRPC channel arguments tuple.
e.g. (("grpc.keepalive_time_ms", 45_000),)
Default: None (will use library defaults)
See https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
Returns:
Tuple[Tuple[str, Any], ...]: Options for the gRPC channel
.. _gRPC channel arguments:
https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
ChannelArgumentType: Options for the gRPC channel
"""
if options:
options = {**GRPC_CHANNEL_OPTIONS, **options}
else:
options = GRPC_CHANNEL_OPTIONS
return tuple((k, v) for k, v in options.items())
if options is not None:
existing = set()
_options = []

for a, b in (*options, *GRPC_CHANNEL_OPTIONS_DEFAULT):
if a not in existing: # NOTE: Remove duplicates, fist one wins
existing.add(a)
_options.append((a, b))

return tuple(_options) # (*options, GRPC_CHANNEL_OPTIONS)
return GRPC_CHANNEL_OPTIONS_DEFAULT
8 changes: 5 additions & 3 deletions pyzeebe/channel/insecure_channel.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
from typing import Any, Dict, Optional
from typing import Optional

import grpc

from pyzeebe.channel.channel_options import get_channel_options
from pyzeebe.channel.utils import create_address
from pyzeebe.types import ChannelArgumentType


def create_insecure_channel(
hostname: Optional[str] = None, port: Optional[int] = None, channel_options: Optional[Dict[str, Any]] = None
hostname: Optional[str] = None, port: Optional[int] = None, channel_options: Optional[ChannelArgumentType] = None
) -> grpc.aio.Channel:
"""
Create an insecure channel
Args:
hostname (Optional[str], optional): Zeebe gateway hostname
port (Optional[int], optional): Zeebe gateway port
channel_options (Optional[Dict], optional): GRPC channel options. See https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
channel_options (Optional[Dict], optional): GRPC channel options.
See https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
Returns:
grpc.aio.Channel: A GRPC Channel connected to the Zeebe gateway.
Expand Down
11 changes: 7 additions & 4 deletions pyzeebe/channel/secure_channel.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from typing import Any, Dict, Optional
from typing import Optional

import grpc

from pyzeebe.channel.channel_options import get_channel_options
from pyzeebe.channel.utils import create_address
from pyzeebe.types import ChannelArgumentType


def create_secure_channel(
hostname: Optional[str] = None,
port: Optional[int] = None,
channel_options: Optional[Dict[str, Any]] = None,
channel_options: Optional[ChannelArgumentType] = None,
channel_credentials: Optional[grpc.ChannelCredentials] = None,
) -> grpc.aio.Channel:
"""
Expand All @@ -18,8 +19,10 @@ def create_secure_channel(
Args:
hostname (Optional[str], optional): Zeebe gateway hostname
port (Optional[int], optional): Zeebe gateway port
channel_options (Optional[Dict], optional): GRPC channel options. See https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
channel_credentials (Optional[grpc.ChannelCredentials]): Channel credentials to use. Will use grpc.ssl_channel_credentials() if not provided.
channel_options (Optional[Dict], optional): GRPC channel options.
See https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
channel_credentials (Optional[grpc.ChannelCredentials]): Channel credentials to use.
Will use grpc.ssl_channel_credentials() if not provided.
Returns:
grpc.aio.Channel: A GRPC Channel connected to the Zeebe gateway.
Expand Down
4 changes: 3 additions & 1 deletion pyzeebe/types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Any, Mapping
from typing import Any, Mapping, Sequence, Tuple

from typing_extensions import TypeAlias

Headers: TypeAlias = Mapping[str, Any]
Variables: TypeAlias = Mapping[str, Any]

ChannelArgumentType: TypeAlias = Sequence[Tuple[str, Any]]
20 changes: 10 additions & 10 deletions tests/unit/channel/channel_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@


def test_get_channel_options_returns_tuple_of_tuple_with_options():
assert get_channel_options() == (("grpc.keepalive_time_ms", 45000),)
assert get_channel_options() == (("grpc.keepalive_time_ms", 45_000),)


def test_overrides_default_values_if_provided():
grpc_options = {"grpc.keepalive_time_ms": 4000}
grpc_options = (("grpc.keepalive_time_ms", 4_000),)

assert get_channel_options(grpc_options) == (("grpc.keepalive_time_ms", 4000),)
assert get_channel_options(grpc_options) == (("grpc.keepalive_time_ms", 4_000),)


def test_adds_custom_options():
grpc_options = {
"grpc.keepalive_timeout_ms": 120000,
"grpc.http2.min_time_between_pings_ms": 60000,
}
grpc_options = (
("grpc.keepalive_timeout_ms", 120_000),
("grpc.http2.min_time_between_pings_ms", 60_000),
)

assert get_channel_options(grpc_options) == (
("grpc.keepalive_time_ms", 45000),
("grpc.keepalive_timeout_ms", 120000),
("grpc.http2.min_time_between_pings_ms", 60000),
("grpc.keepalive_timeout_ms", 120_000),
("grpc.http2.min_time_between_pings_ms", 60_000),
("grpc.keepalive_time_ms", 45_000),
)

0 comments on commit adb27e8

Please sign in to comment.