-
Notifications
You must be signed in to change notification settings - Fork 11
/
json_serialization.py
80 lines (60 loc) · 1.92 KB
/
json_serialization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import asyncio
import json
import logging
from typing import Any, Dict, Optional
import aiorun
from kstreams import ConsumerRecord, Stream, consts, create_engine, middleware
from kstreams.types import Headers
logger = logging.getLogger(__name__)
class JsonSerializer:
async def serialize(
self,
payload: Any,
headers: Optional[Headers] = None,
serializer_kwargs: Optional[Dict] = None,
) -> bytes:
"""
Serialize a payload to json
"""
value = json.dumps(payload)
return value.encode()
class JsonDeserializerMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord):
if cr.value is not None:
data = json.loads(cr.value.decode())
cr.value = data
return await self.next_call(cr)
stream_engine = create_engine(
title="my-stream-engine",
serializer=JsonSerializer(),
)
data = {"message": "Hello world!"}
topic = "local--kstreams-json"
@stream_engine.stream(
topic,
group_id="my-group",
middlewares=[middleware.Middleware(JsonDeserializerMiddleware)],
)
async def consume(cr: ConsumerRecord, stream: Stream):
logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
assert cr.value == data
async def produce():
for _ in range(5):
# Serialize the data with APPLICATION_JSON
metadata = await stream_engine.send(
topic,
value=data,
headers={
"content-type": consts.APPLICATION_JSON,
},
)
logger.info(f"Message sent: {metadata}")
await asyncio.sleep(3)
async def main():
await stream_engine.start()
await produce()
async def shutdown(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
aiorun.run(main(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)