Skip to content

Commit

Permalink
Merge pull request #190 from pikers/ems_to_bidir_streaming
Browse files Browse the repository at this point in the history
Ems to bidir streaming
  • Loading branch information
goodboy authored Jun 10, 2021
2 parents ee65382 + a9cc321 commit 689bc0c
Show file tree
Hide file tree
Showing 10 changed files with 1,343 additions and 671 deletions.
171 changes: 111 additions & 60 deletions piker/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ async def open_pikerd(
assert _services is None

# XXX: this may open a root actor as well
async with tractor.open_root_actor(
async with (
tractor.open_root_actor(

# passed through to ``open_root_actor``
arbiter_addr=_tractor_kwargs['arbiter_addr'],
name=_root_dname,
Expand All @@ -113,10 +115,10 @@ async def open_pikerd(
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
# enable_modules=[__name__],
enable_modules=_root_modules,

) as _, tractor.open_nursery() as actor_nursery:
) as _,
tractor.open_nursery() as actor_nursery,
):
async with trio.open_nursery() as service_nursery:

# setup service mngr singleton instance
Expand All @@ -137,6 +139,7 @@ async def open_pikerd(
async def maybe_open_runtime(
loglevel: Optional[str] = None,
**kwargs,

) -> None:
"""
Start the ``tractor`` runtime (a root actor) if none exists.
Expand All @@ -159,6 +162,7 @@ async def maybe_open_runtime(
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
**kwargs,

) -> Union[tractor._portal.Portal, Services]:
"""If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
Expand Down Expand Up @@ -197,6 +201,66 @@ async def maybe_open_pikerd(
]


class Brokerd:
locks = defaultdict(trio.Lock)


@asynccontextmanager
async def maybe_spawn_daemon(

service_name: str,
spawn_func: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
**kwargs,

) -> tractor.Portal:
"""
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
"""
if loglevel:
get_console_log(loglevel)

# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[service_name]
await lock.acquire()

# attach to existing brokerd if possible
async with tractor.find_actor(service_name) as portal:
if portal is not None:
lock.release()
yield portal
return

# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(

loglevel=loglevel,
**kwargs,

) as pikerd_portal:

if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_func(**spawn_args)

else:
await pikerd_portal.run(
spawn_func,
**spawn_args,
)

async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal


async def spawn_brokerd(

brokername: str,
Expand All @@ -205,8 +269,6 @@ async def spawn_brokerd(

) -> tractor._portal.Portal:

from .data import _setup_persistent_brokerd

log.info(f'Spawning {brokername} broker daemon')

brokermod = get_brokermod(brokername)
Expand All @@ -226,13 +288,9 @@ async def spawn_brokerd(
**tractor_kwargs
)

# TODO: so i think this is the perfect use case for supporting
# a cross-actor async context manager api instead of this
# shoort-and-forget task spawned in the root nursery, we'd have an
# async exit stack that we'd register the `portal.open_context()`
# call with and then have the ability to unwind the call whenevs.

# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd

await _services.open_remote_ctx(
portal,
_setup_persistent_brokerd,
Expand All @@ -242,68 +300,31 @@ async def spawn_brokerd(
return dname


class Brokerd:
locks = defaultdict(trio.Lock)


@asynccontextmanager
async def maybe_spawn_brokerd(

brokername: str,
loglevel: Optional[str] = None,
**kwargs,

) -> tractor._portal.Portal:
"""
If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
"""
if loglevel:
get_console_log(loglevel)

dname = f'brokerd.{brokername}'

# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[brokername]
await lock.acquire()
) -> tractor.Portal:
'''Helper to spawn a brokerd service.
# attach to existing brokerd if possible
async with tractor.find_actor(dname) as portal:
if portal is not None:
lock.release()
yield portal
return
'''
async with maybe_spawn_daemon(

# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
f'brokerd.{brokername}',
spawn_func=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as pikerd_portal:

if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_brokerd(brokername, loglevel=loglevel)

else:
await pikerd_portal.run(
spawn_brokerd,
brokername=brokername,
loglevel=loglevel,
)

async with tractor.wait_for_actor(dname) as portal:
lock.release()
yield portal
) as portal:
yield portal


async def spawn_emsd(

brokername: str,
loglevel: Optional[str] = None,
**extra_tractor_kwargs

Expand All @@ -314,10 +335,10 @@ async def spawn_emsd(
"""
log.info('Spawning emsd')

# TODO: raise exception when _services == None?
global _services
assert _services

await _services.actor_n.start_actor(
portal = await _services.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
Expand All @@ -327,4 +348,34 @@ async def spawn_emsd(
debug_mode=_services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)

# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd

await _services.open_remote_ctx(
portal,
_setup_persistent_emsd,
)

return 'emsd'


@asynccontextmanager
async def maybe_open_emsd(

brokername: str,
loglevel: Optional[str] = None,
**kwargs,

) -> tractor._portal.Portal: # noqa

async with maybe_spawn_daemon(

'emsd',
spawn_func=spawn_emsd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,

) as portal:
yield portal
Loading

0 comments on commit 689bc0c

Please sign in to comment.