Skip to content

Commit

Permalink
Merge pull request #300 from goodboy/msgpack_lists_by_default
Browse files Browse the repository at this point in the history
Use lists by default like `msgspec`, update to latest `msgspec`  and `msgpack` releases
  • Loading branch information
goodboy authored Feb 15, 2022
2 parents 26bebc4 + 76a0492 commit 6e5590d
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 50 deletions.
3 changes: 3 additions & 0 deletions nooz/300.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Update to and pin latest `msgpack` (1.0.3) and `msgspec` (0.4.0)
both of which required adjustments for backwards imcompatible API
tweaks.
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@
'pdbpp',

# serialization
'msgpack',
'msgpack>=1.0.3',

],
extras_require={

# serialization
'msgspec': ["msgspec >= 0.3.2'; python_version >= '3.9'"],
'msgspec': ['msgspec >= "0.4.0"'],

},
tests_require=['pytest'],
Expand Down
25 changes: 19 additions & 6 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,26 @@ async def stream_from(portal):
print(value)


async def unpack_reg(actor_or_portal):
'''
Get and unpack a "registry" RPC request from the "arbiter" registry
system.
'''
if getattr(actor_or_portal, 'get_registry', None):
msg = await actor_or_portal.get_registry()
else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry')

return {tuple(key.split('.')): val for key, val in msg.items()}


async def spawn_and_check_registry(
arb_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
with_streaming: bool = False,

) -> None:

async with tractor.open_root_actor(
Expand All @@ -134,13 +149,11 @@ async def spawn_and_check_registry(
assert not actor.is_arbiter

if actor.is_arbiter:

async def get_reg():
return await actor.get_registry()

extra = 1 # arbiter is local root actor
get_reg = partial(unpack_reg, actor)

else:
get_reg = partial(portal.run_from_ns, 'self', 'get_registry')
get_reg = partial(unpack_reg, portal)
extra = 2 # local root actor + remote arbiter

# ensure current actor is registered
Expand Down Expand Up @@ -266,7 +279,7 @@ async def close_chans_before_nursery(
):
async with tractor.get_arbiter(*arb_addr) as aportal:
try:
get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')
get_reg = partial(unpack_reg, aportal)

async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor(
Expand Down
75 changes: 42 additions & 33 deletions tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import inspect
import uuid
import typing
from typing import List, Tuple, Any, Optional, Union
from typing import Any, Optional, Union
from types import ModuleType
import sys
import os
Expand Down Expand Up @@ -199,7 +199,9 @@ async def _invoke(
assert chan.uid
ctx = actor._contexts.pop((chan.uid, cid))
if ctx:
log.runtime(f'Context entrypoint for {func} was terminated:\n{ctx}')
log.runtime(
f'Context entrypoint for {func} was terminated:\n{ctx}'
)

assert cs
if cs.cancelled_caught:
Expand Down Expand Up @@ -368,10 +370,10 @@ def __init__(
self,
name: str,
*,
enable_modules: List[str] = [],
enable_modules: list[str] = [],
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
arbiter_addr: Optional[tuple[str, int]] = None,
spawn_method: Optional[str] = None
) -> None:
"""This constructor is called in the parent actor **before** the spawning
Expand Down Expand Up @@ -421,25 +423,25 @@ def __init__(

# (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[
Tuple[Channel, str],
Tuple[trio.CancelScope, typing.Callable, trio.Event]
tuple[Channel, str],
tuple[trio.CancelScope, typing.Callable, trio.Event]
] = {}

# map {actor uids -> Context}
self._contexts: dict[
Tuple[Tuple[str, str], str],
tuple[tuple[str, str], str],
Context
] = {}

self._listeners: List[trio.abc.Listener] = []
self._listeners: list[trio.abc.Listener] = []
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None
tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa

async def wait_for_peer(
self, uid: Tuple[str, str]
) -> Tuple[trio.Event, Channel]:
self, uid: tuple[str, str]
) -> tuple[trio.Event, Channel]:
"""Wait for a connection back from a spawned actor with a given
``uid``.
"""
Expand Down Expand Up @@ -1010,8 +1012,8 @@ async def _process_messages(

async def _from_parent(
self,
parent_addr: Optional[Tuple[str, int]],
) -> Tuple[Channel, Optional[Tuple[str, int]]]:
parent_addr: Optional[tuple[str, int]],
) -> tuple[Channel, Optional[tuple[str, int]]]:
try:
# Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we
Expand All @@ -1024,7 +1026,7 @@ async def _from_parent(
# Initial handshake: swap names.
await self._do_handshake(chan)

accept_addr: Optional[Tuple[str, int]] = None
accept_addr: Optional[tuple[str, int]] = None

if self._spawn_method == "trio":
# Receive runtime state from our parent
Expand Down Expand Up @@ -1066,7 +1068,7 @@ async def _from_parent(

async def _async_main(
self,
accept_addr: Optional[Tuple[str, int]] = None,
accept_addr: Optional[tuple[str, int]] = None,

# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
Expand All @@ -1075,7 +1077,7 @@ async def _async_main(
# change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as
# a subactor.
parent_addr: Optional[Tuple[str, int]] = None,
parent_addr: Optional[tuple[str, int]] = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,

) -> None:
Expand Down Expand Up @@ -1261,7 +1263,7 @@ async def _serve_forever(
handler_nursery: trio.Nursery,
*,
# (host, port) to bind for channel server
accept_host: Tuple[str, int] = None,
accept_host: tuple[str, int] = None,
accept_port: int = 0,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
Expand All @@ -1273,7 +1275,7 @@ async def _serve_forever(
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
l: List[trio.abc.Listener] = await server_n.start(
l: list[trio.abc.Listener] = await server_n.start(
partial(
trio.serve_tcp,
self._stream_handler,
Expand Down Expand Up @@ -1427,7 +1429,7 @@ def cancel_server(self) -> None:
self._server_n.cancel_scope.cancel()

@property
def accept_addr(self) -> Optional[Tuple[str, int]]:
def accept_addr(self) -> Optional[tuple[str, int]]:
"""Primary address to which the channel server is bound.
"""
# throws OSError on failure
Expand All @@ -1438,23 +1440,23 @@ def get_parent(self) -> Portal:
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan)

def get_chans(self, uid: Tuple[str, str]) -> List[Channel]:
def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
"""Return all channels to the actor with provided uid."""
return self._peers[uid]

async def _do_handshake(
self,
chan: Channel

) -> Tuple[str, str]:
) -> tuple[str, str]:
"""Exchange (name, UUIDs) identifiers as the first communication step.
These are essentially the "mailbox addresses" found in actor model
parlance.
"""
await chan.send(self.uid)
value = await chan.recv()
uid: Tuple[str, str] = (str(value[0]), str(value[1]))
uid: tuple[str, str] = (str(value[0]), str(value[1]))

if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
Expand Down Expand Up @@ -1483,14 +1485,14 @@ class Arbiter(Actor):
def __init__(self, *args, **kwargs):

self._registry: dict[
Tuple[str, str],
Tuple[str, int],
tuple[str, str],
tuple[str, int],
] = {}
self._waiters = {}

super().__init__(*args, **kwargs)

async def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
async def find_actor(self, name: str) -> Optional[tuple[str, int]]:
for uid, sockaddr in self._registry.items():
if name in uid:
return sockaddr
Expand All @@ -1499,25 +1501,31 @@ async def find_actor(self, name: str) -> Optional[Tuple[str, int]]:

async def get_registry(
self
) -> dict[Tuple[str, str], Tuple[str, int]]:
'''Return current name registry.

) -> dict[str, tuple[str, int]]:
'''
Return current name registry.
This method is async to allow for cross-actor invocation.
'''
# NOTE: requires ``strict_map_key=False`` to the msgpack
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return self._registry
return {'.'.join(key): val for key, val in self._registry.items()}

async def wait_for_actor(
self,
name: str,
) -> List[Tuple[str, int]]:
'''Wait for a particular actor to register.

) -> list[tuple[str, int]]:
'''
Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently
registered.
'''
sockaddrs = []

Expand All @@ -1536,8 +1544,8 @@ async def wait_for_actor(

async def register_actor(
self,
uid: Tuple[str, str],
sockaddr: Tuple[str, int]
uid: tuple[str, str],
sockaddr: tuple[str, int]

) -> None:
uid = name, uuid = (str(uid[0]), str(uid[1]))
Expand All @@ -1552,7 +1560,8 @@ async def register_actor(

async def unregister_actor(
self,
uid: Tuple[str, str]
uid: tuple[str, str]

) -> None:
uid = (str(uid[0]), str(uid[1]))
self._registry.pop(uid)
15 changes: 8 additions & 7 deletions tractor/_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ def raddr(self) -> Tuple[str, int]:


class MsgpackTCPStream:
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
'''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgpack-python``.
'''
Expand All @@ -120,12 +121,12 @@ def __init__(
self.drained: list[dict] = []

async def _iter_packets(self) -> AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream.
"""
'''
Yield packets from the underlying stream.
'''
unpacker = msgpack.Unpacker(
raw=False,
use_list=False,
strict_map_key=False
)
while True:
try:
Expand Down Expand Up @@ -222,8 +223,8 @@ def __init__(
self.prefix_size = prefix_size

# TODO: struct aware messaging coders
self.encode = msgspec.Encoder().encode
self.decode = msgspec.Decoder().decode # dict[str, Any])
self.encode = msgspec.msgpack.Encoder().encode
self.decode = msgspec.msgpack.Decoder().decode # dict[str, Any])

async def _iter_packets(self) -> AsyncGenerator[dict, None]:
'''Yield packets from the underlying stream.
Expand Down
4 changes: 2 additions & 2 deletions tractor/trionics/_mngrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def gather_contexts(

mngrs: Sequence[AsyncContextManager[T]],

) -> AsyncGenerator[tuple[T, ...], None]:
) -> AsyncGenerator[tuple[Optional[T], ...], None]:
'''
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
Expand All @@ -84,7 +84,7 @@ async def gather_contexts(
entered and exited cancellation just works.
'''
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)
unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs)

all_entered = trio.Event()
parent_exit = trio.Event()
Expand Down

0 comments on commit 6e5590d

Please sign in to comment.