Skip to content

Commit

Permalink
Try out msgspec in our msgpack stream channel
Browse files Browse the repository at this point in the history
Can only really use an encoder currently since there is no streaming api
in `msgspec` as of currently. See jcrist/msgspec#27.

Not sure if any encoding speedups are currently noticeable especially
without any validation going on yet XD.

First experiments toward #196
  • Loading branch information
goodboy committed Jun 2, 2021
1 parent 770a9e5 commit 1954f05
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions tractor/_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
Inter-process comms abstractions
"""
import typing
from typing import Any, Tuple, Optional
from typing import Any, Tuple, Optional, Callable
from functools import partial

import msgpack
import msgspec
import trio
from async_generator import asynccontextmanager

Expand All @@ -21,16 +22,32 @@
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)


ms_decode = msgspec.Encoder().encode


class MsgpackStream:
"""A ``trio.SocketStream`` delivering ``msgpack`` formatted data.
"""
def __init__(self, stream: trio.SocketStream) -> None:
def __init__(
self,
stream: trio.SocketStream,
serialize: Callable = Unpacker(
raw=False,
use_list=False,
).feed,
deserialize: Callable = msgpack.dumps,

) -> None:

self.stream = stream
assert self.stream.socket

# should both be IP sockets
lsockname = stream.socket.getsockname()
assert isinstance(lsockname, tuple)
self._laddr = lsockname[:2]

rsockname = stream.socket.getpeername()
assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2]
Expand All @@ -45,6 +62,7 @@ async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
raw=False,
use_list=False,
)
# decoder = msgspec.Decoder() #dict[str, Any])
while True:
try:
data = await self.stream.receive_some(2**10)
Expand All @@ -57,6 +75,7 @@ async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
log.debug(f"Stream connection {self.raddr} was closed")
return

# yield decoder.decode(data)
unpacker.feed(data)
for packet in unpacker:
yield packet
Expand All @@ -73,7 +92,9 @@ def raddr(self) -> Tuple[Any, ...]:
async def send(self, data: Any) -> None:
async with self._send_lock:
return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True))
# msgpack.dumps(data, use_bin_type=True))
ms_decode(data)
)

async def recv(self) -> Any:
return await self._agen.asend(None)
Expand Down

0 comments on commit 1954f05

Please sign in to comment.