Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try msgspec #212

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,20 @@
'tractor.testing',
],
install_requires=[

# trio related
'trio>0.8',
'msgpack',
'async_generator',
'tricycle',
'trio_typing',

'colorlog',
'wrapt',
'trio_typing',
'pdbpp',

# serialization
'msgpack',
'msgspec',
],
tests_require=['pytest'],
python_requires=">=3.7",
Expand Down
59 changes: 49 additions & 10 deletions tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
ModuleNotExposed,
is_multi_cancelled,
ContextCancelled,
TransportClosed,
)
from . import _debug
from ._discovery import get_arbiter
Expand Down Expand Up @@ -249,7 +250,7 @@ def __init__(
enable_modules: List[str] = [],
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
arbiter_addr: Optional[Tuple[str, int]] = (None, None),
spawn_method: Optional[str] = None
) -> None:
"""This constructor is called in the parent actor **before** the spawning
Expand Down Expand Up @@ -279,7 +280,7 @@ def __init__(
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.loglevel = loglevel
self._arb_addr = arbiter_addr
self._arb_addr = tuple(arbiter_addr)

# marked by the process spawning backend at startup
# will be None for the parent most process started manually
Expand Down Expand Up @@ -385,7 +386,18 @@ async def _stream_handler(
# send/receive initial handshake response
try:
uid = await self._do_handshake(chan)
except StopAsyncIteration:

except (
trio.BrokenResourceError,
trio.ClosedResourceError,
TransportClosed,
):
# XXX: This may propagate up from ``Channel._aiter_recv()``
# and ``MsgpackStream._inter_packets()`` on a read from the
# stream particularly when the runtime is first starting up
# inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(f"Channel {chan} failed to handshake")
return

Expand Down Expand Up @@ -529,6 +541,7 @@ async def _process_messages(
# ``scope = Nursery.start()``
task_status.started(loop_cs)
async for msg in chan:

if msg is None: # loop terminate sentinel

log.debug(
Expand Down Expand Up @@ -635,22 +648,39 @@ async def _process_messages(
)
await self.cancel_rpc_tasks(chan)

except (
TransportClosed,
trio.BrokenResourceError,
trio.ClosedResourceError
):
# channels "breaking" is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out
# of the message loop and expect the teardown sequence
# to clean up.
log.error(f"{chan} form {chan.uid} closed abruptly")
# raise

except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke")

except (Exception, trio.MultiError) as err:
# ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent
log.exception("Actor errored:")
if self._parent_chan:
await self._parent_chan.send(pack_error(err))
raise

# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
raise

except trio.Cancelled:
# debugging only
log.debug(f"Msg loop was cancelled for {chan}")
raise

finally:
# msg debugging for when he machinery is brokey
log.debug(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
Expand Down Expand Up @@ -690,7 +720,15 @@ async def _from_parent(
_state._runtime_vars.update(rvs)

for attr, value in parent_data.items():
setattr(self, attr, value)

if attr == '_arb_addr':
# XXX: msgspec doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
self._arb_addr = tuple(value)

else:
setattr(self, attr, value)

# Disable sigint handling in children if NOT running in
# debug mode; we shouldn't need it thanks to our
Expand Down Expand Up @@ -1075,10 +1113,10 @@ async def _do_handshake(
parlance.
"""
await chan.send(self.uid)
uid: Tuple[str, str] = await chan.recv()
uid: Tuple[str, str] = tuple(await chan.recv())

if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
# if not isinstance(uid, tuple):
# raise ValueError(f"{uid} is not a valid uid?!")

chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
Expand Down Expand Up @@ -1145,8 +1183,9 @@ async def wait_for_actor(
async def register_actor(
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
) -> None:
uid = tuple(uid)
name, uuid = uid
self._registry[uid] = sockaddr
self._registry[uid] = tuple(sockaddr)

# pop and signal all waiter events
events = self._waiters.pop(name, ())
Expand All @@ -1156,4 +1195,4 @@ async def register_actor(
event.set()

async def unregister_actor(self, uid: Tuple[str, str]) -> None:
self._registry.pop(uid)
self._registry.pop(tuple(uid))
7 changes: 7 additions & 0 deletions tractor/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class ContextCancelled(RemoteActorError):
"Inter-actor task context cancelled itself on the callee side."


class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"


class NoResult(RuntimeError):
"No final result is expected for this actor"

Expand All @@ -66,12 +70,15 @@ def pack_error(exc: BaseException) -> Dict[str, Any]:


def unpack_error(

msg: Dict[str, Any],
chan=None,
err_type=RemoteActorError

) -> Exception:
"""Unpack an 'error' message from the wire
into a local ``RemoteActorError``.

"""
error = msg['error']

Expand Down
Loading