Skip to content

Commit

Permalink
Change to new doc string style
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Aug 23, 2022
1 parent ea112cc commit 1eca145
Showing 1 changed file with 48 additions and 25 deletions.
73 changes: 48 additions & 25 deletions tractor/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,11 @@ def __init__(
arbiter_addr: Optional[tuple[str, int]] = None,
spawn_method: Optional[str] = None
) -> None:
"""This constructor is called in the parent actor **before** the spawning
'''
This constructor is called in the parent actor **before** the spawning
phase (aka before a new process is executed).
"""
'''
self.name = name
self.uid = (name, uid or str(uuid.uuid4()))

Expand All @@ -439,9 +441,6 @@ def __init__(

self.enable_modules = mods
self._mods: dict[str, ModuleType] = {}

# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.loglevel = loglevel

self._arb_addr = (
Expand Down Expand Up @@ -482,22 +481,26 @@ def __init__(
async def wait_for_peer(
self, uid: tuple[str, str]
) -> tuple[trio.Event, Channel]:
"""Wait for a connection back from a spawned actor with a given
'''
Wait for a connection back from a spawned actor with a given
``uid``.
"""
'''
log.runtime(f"Waiting for peer {uid} to connect")
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.runtime(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1]

def load_modules(self) -> None:
"""Load allowed RPC modules locally (after fork).
'''
Load allowed RPC modules locally (after fork).
Since this actor may be spawned on a different machine from
the original nursery we need to try and load the local module
code (if it exists).
"""
'''
try:
if self._spawn_method == 'trio':
parent_data = self._parent_main_data
Expand Down Expand Up @@ -949,11 +952,13 @@ async def _serve_forever(
accept_port: int = 0,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Start the channel server, begin listening for new connections.
'''
Start the channel server, begin listening for new connections.
This will cause an actor to continue living (blocking) until
``cancel_server()`` is called.
"""
'''
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
Expand All @@ -978,16 +983,19 @@ async def _serve_forever(
self._server_down.set()

def cancel_soon(self) -> None:
"""Cancel this actor asap; can be called from a sync context.
'''
Cancel this actor asap; can be called from a sync context.
Schedules `.cancel()` to be run immediately just like when
cancelled by the parent.
"""
'''
assert self._service_n
self._service_n.start_soon(self.cancel)

async def cancel(self) -> bool:
"""Cancel this actor's runtime.
'''
Cancel this actor's runtime.
The "deterministic" teardown sequence in order is:
- cancel all ongoing rpc tasks by cancel scope
Expand All @@ -996,7 +1004,8 @@ async def cancel(self) -> bool:
- cancel the "service" nursery reponsible for
spawning new rpc tasks
- return control the parent channel message loop
"""
'''
log.cancel(f"{self.uid} is trying to cancel")
self._cancel_called = True

Expand Down Expand Up @@ -1082,9 +1091,11 @@ async def cancel_rpc_tasks(
self,
only_chan: Optional[Channel] = None,
) -> None:
"""Cancel all existing RPC responder tasks using the cancel scope
'''
Cancel all existing RPC responder tasks using the cancel scope
registered for each.
"""
'''
tasks = self._rpc_tasks
if tasks:
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
Expand All @@ -1105,39 +1116,51 @@ async def cancel_rpc_tasks(
await self._ongoing_rpc_tasks.wait()

def cancel_server(self) -> None:
"""Cancel the internal channel server nursery thereby
'''
Cancel the internal channel server nursery thereby
preventing any new inbound connections from being established.
"""
'''
if self._server_n:
log.runtime("Shutting down channel server")
self._server_n.cancel_scope.cancel()

@property
def accept_addr(self) -> Optional[tuple[str, int]]:
"""Primary address to which the channel server is bound.
"""
'''
Primary address to which the channel server is bound.
'''
# throws OSError on failure
return self._listeners[0].socket.getsockname() # type: ignore

def get_parent(self) -> Portal:
"""Return a portal to our parent actor."""
'''
Return a portal to our parent actor.
'''
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan)

def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
"""Return all channels to the actor with provided uid."""
'''
Return all channels to the actor with provided uid.
'''
return self._peers[uid]

async def _do_handshake(
self,
chan: Channel

) -> tuple[str, str]:
"""Exchange (name, UUIDs) identifiers as the first communication step.
'''
Exchange (name, UUIDs) identifiers as the first communication step.
These are essentially the "mailbox addresses" found in actor model
parlance.
"""
'''
await chan.send(self.uid)
value = await chan.recv()
uid: tuple[str, str] = (str(value[0]), str(value[1]))
Expand Down

0 comments on commit 1eca145

Please sign in to comment.