Skip to content

Commit

Permalink
refactor: use abstract class for DI Middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
woile committed Nov 27, 2024
1 parent da9cb25 commit e6be4ca
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
{
"machine_info": {
"node": "mac.home",
"processor": "arm",
"machine": "arm64",
"python_compiler": "Clang 16.0.6 ",
"python_implementation": "CPython",
"python_implementation_version": "3.11.10",
"python_version": "3.11.10",
"python_build": [
"main",
"Sep 7 2024 01:03:31"
],
"release": "24.1.0",
"system": "Darwin",
"cpu": {
"python_version": "3.11.10.final.0 (64 bit)",
"cpuinfo_version": [
9,
0,
0
],
"cpuinfo_version_string": "9.0.0",
"arch": "ARM_8",
"bits": 64,
"count": 12,
"arch_string_raw": "arm64",
"brand_raw": "Apple M3 Pro"
}
},
"commit_info": {
"id": "da9cb25b0f84535841637d318f379657f4a1f632",
"time": "2024-11-27T12:34:46+01:00",
"author_time": "2024-11-27T12:34:46+01:00",
"dirty": true,
"project": "kstreams",
"branch": "fix/generic-consumer-record"
},
"benchmarks": [
{
"group": null,
"name": "test_startup_and_processing_single_consumer_record",
"fullname": "tests/test_benchmarks.py::test_startup_and_processing_single_consumer_record",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 4.629199975170195e-05,
"max": 0.010518291994230822,
"mean": 0.00011184708857215268,
"stddev": 0.00014957945906818188,
"rounds": 5163,
"median": 0.00010833298438228667,
"iqr": 5.845900159329176e-05,
"q1": 7.954100146889687e-05,
"q3": 0.00013800000306218863,
"iqr_outliers": 15,
"stddev_outliers": 14,
"outliers": "14;15",
"ld15iqr": 4.629199975170195e-05,
"hd15iqr": 0.00023312499979510903,
"ops": 8940.77809951127,
"total": 0.5774665182980243,
"iterations": 1
}
},
{
"group": null,
"name": "test_startup_and_inject_all",
"fullname": "tests/test_benchmarks.py::test_startup_and_inject_all",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 4.620800609700382e-05,
"max": 0.017021667008521035,
"mean": 0.00021773011284045453,
"stddev": 0.00022200387249880765,
"rounds": 13762,
"median": 0.00021172950800973922,
"iqr": 0.00016691599739715457,
"q1": 0.0001298749994020909,
"q3": 0.0002967909967992455,
"iqr_outliers": 17,
"stddev_outliers": 39,
"outliers": "39;17",
"ld15iqr": 4.620800609700382e-05,
"hd15iqr": 0.0005522920109797269,
"ops": 4592.84196822498,
"total": 2.996401812910335,
"iterations": 1
}
},
{
"group": null,
"name": "test_consume_many",
"fullname": "tests/test_benchmarks.py::test_consume_many",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 0.0007156250067055225,
"max": 0.000864708999870345,
"mean": 0.0007345807816986966,
"stddev": 1.2547058035859212e-05,
"rounds": 1027,
"median": 0.0007324170146603137,
"iqr": 9.614763257559389e-06,
"q1": 0.0007281249927473255,
"q3": 0.0007377397560048848,
"iqr_outliers": 60,
"stddev_outliers": 154,
"outliers": "154;60",
"ld15iqr": 0.0007156250067055225,
"hd15iqr": 0.0007525830005761236,
"ops": 1361.320667398253,
"total": 0.7544144628045615,
"iterations": 1
}
}
],
"datetime": "2024-11-27T13:29:43.583338+00:00",
"version": "5.1.0"
}
24 changes: 22 additions & 2 deletions kstreams/middleware/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import typing

from kstreams import types
from kstreams.streams_utils import StreamErrorPolicy
from kstreams.streams_utils import StreamErrorPolicy, UDFType

if typing.TYPE_CHECKING:
from kstreams import Stream, StreamEngine # pragma: no cover
Expand All @@ -14,6 +14,10 @@


class MiddlewareProtocol(typing.Protocol):
next_call: types.NextMiddlewareCall
send: types.Send
stream: "Stream"

def __init__(
self,
*,
Expand Down Expand Up @@ -45,6 +49,10 @@ def __repr__(self) -> str:


class BaseMiddleware:
next_call: types.NextMiddlewareCall
send: types.Send
stream: "Stream"

def __init__(
self,
*,
Expand Down Expand Up @@ -92,7 +100,7 @@ async def __call__(self, cr: types.ConsumerRecord) -> typing.Any:

async def cleanup_policy(self, exc: Exception) -> None:
"""
Execute clenup policicy according to the Stream configuration.
Execute cleanup policy according to the Stream configuration.
At this point we are inside the asyncio.Lock `is_processing`
as an event is being processed and an exeption has occured.
Expand Down Expand Up @@ -145,3 +153,15 @@ async def cleanup_policy(self, exc: Exception) -> None:
await self.engine.stop()
await self.stream.is_processing.acquire()
signal.raise_signal(signal.SIGTERM)


class BaseDependcyMiddleware(MiddlewareProtocol, typing.Protocol):
"""Base class for Dependency Injection Middleware.
`get_type` is used to identify the way to call the user defined function,
whether to use DI or not.
On top of that, this middleware helps **avoid circular dependencies**.
"""

def get_type(self) -> UDFType: ...
3 changes: 3 additions & 0 deletions kstreams/middleware/udf_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def __init__(self, *args, **kwargs) -> None:
]
self.type: UDFType = setup_type(self.params)

def get_type(self) -> UDFType:
return self.type

def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
# NOTE: When `no typing` support is deprecated then this can
# be more eficient as the CR will be always there.
Expand Down
13 changes: 8 additions & 5 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@

from kstreams import TopicPartition
from kstreams.exceptions import BackendNotSet
from kstreams.middleware.middleware import ExceptionMiddleware
from kstreams.middleware.middleware import (
BaseDependcyMiddleware,
ExceptionMiddleware,
Middleware,
)
from kstreams.structs import TopicPartitionOffset

from .backends.kafka import Kafka
from .clients import Consumer
from .middleware import Middleware, udf_middleware
from .rebalance_listener import RebalanceListener
from .serializers import Deserializer
from .streams_utils import StreamErrorPolicy, UDFType
Expand Down Expand Up @@ -172,7 +175,7 @@ def __init__(
self.seeked_initial_offsets = False
self.rebalance_listener = rebalance_listener
self.middlewares = middlewares or []
self.udf_handler: typing.Optional[udf_middleware.UdfHandler] = None
self.udf_handler: typing.Optional[BaseDependcyMiddleware] = None
self.topics = [topics] if isinstance(topics, str) else topics
self.subscribe_by_pattern = subscribe_by_pattern
self.error_policy = error_policy
Expand Down Expand Up @@ -345,7 +348,7 @@ async def start(self) -> None:
self.running = True

if self.udf_handler is not None:
if self.udf_handler.type == UDFType.NO_TYPING:
if self.udf_handler.get_type() == UDFType.NO_TYPING:
# deprecated use case
msg = (
"Streams with `async for in` loop approach are deprecated.\n"
Expand Down Expand Up @@ -439,7 +442,7 @@ async def __anext__(self) -> ConsumerRecord:

if (
self.udf_handler is not None
and self.udf_handler.type == UDFType.NO_TYPING
and self.udf_handler.get_type() == UDFType.NO_TYPING
):
return cr
return await self.func(cr)
Expand Down
2 changes: 1 addition & 1 deletion scripts/bench-compare
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ if [ -d '.venv' ] ; then
fi

# Commented out until after merge, so there will be date to compare with.
${PREFIX}pytest tests/test_benchmarks.py --benchmark-compare --benchmark-compare-fail=mean:5%
${PREFIX}pytest tests/test_benchmarks.py --benchmark-compare --benchmark-compare-fail=mean:10%
18 changes: 18 additions & 0 deletions tests/test_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@
in one second (calculated as 1 / Mean).
- Rounds: The number of times the test was run.
- Iterations: Number of iterations per round.
Performance may be affected by:
- Power-saving modes
- CPU frequency scaling
- Background Processes
To get accurate results, run benchmarks on a dedicated machine with no other
applications running.
## Profiling
Profile and visualize your code with `py-spy`:
```python
pip install py-spy
sudo py-spy record -o profile.svg -- python tests/test_benchmarks.py
```
"""

from typing import Callable, List
Expand Down

0 comments on commit e6be4ca

Please sign in to comment.