Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ems to bidir streaming #190

Merged
merged 18 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 112 additions & 60 deletions piker/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ async def open_pikerd(
assert _services is None

# XXX: this may open a root actor as well
async with tractor.open_root_actor(
async with (
tractor.open_root_actor(

# passed through to ``open_root_actor``
arbiter_addr=_tractor_kwargs['arbiter_addr'],
name=_root_dname,
Expand All @@ -113,10 +115,10 @@ async def open_pikerd(
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
# enable_modules=[__name__],
enable_modules=_root_modules,

) as _, tractor.open_nursery() as actor_nursery:
) as _,
tractor.open_nursery() as actor_nursery,
):
async with trio.open_nursery() as service_nursery:

# setup service mngr singleton instance
Expand All @@ -137,6 +139,7 @@ async def open_pikerd(
async def maybe_open_runtime(
loglevel: Optional[str] = None,
**kwargs,

) -> None:
"""
Start the ``tractor`` runtime (a root actor) if none exists.
Expand All @@ -159,6 +162,7 @@ async def maybe_open_runtime(
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
**kwargs,

) -> Union[tractor._portal.Portal, Services]:
"""If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
Expand Down Expand Up @@ -197,6 +201,67 @@ async def maybe_open_pikerd(
]


class Brokerd:
locks = defaultdict(trio.Lock)


@asynccontextmanager
async def maybe_spawn_daemon(

service_name: str,
spawn_func: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
**kwargs,

) -> tractor.Portal:
"""
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.

"""
if loglevel:
get_console_log(loglevel)

# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[service_name]
await lock.acquire()

# attach to existing brokerd if possible
async with tractor.find_actor(service_name) as portal:
if portal is not None:
lock.release()
yield portal
return

# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(

loglevel=loglevel,
**kwargs,

) as pikerd_portal:

if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
# await spawn_brokerd(brokername, loglevel=loglevel)
goodboy marked this conversation as resolved.
Show resolved Hide resolved
await spawn_func(**spawn_args)

else:
await pikerd_portal.run(
spawn_func,
**spawn_args,
)

async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal


async def spawn_brokerd(

brokername: str,
Expand All @@ -205,8 +270,6 @@ async def spawn_brokerd(

) -> tractor._portal.Portal:

from .data import _setup_persistent_brokerd

log.info(f'Spawning {brokername} broker daemon')

brokermod = get_brokermod(brokername)
Expand All @@ -226,13 +289,9 @@ async def spawn_brokerd(
**tractor_kwargs
)

# TODO: so i think this is the perfect use case for supporting
# a cross-actor async context manager api instead of this
# shoort-and-forget task spawned in the root nursery, we'd have an
# async exit stack that we'd register the `portal.open_context()`
# call with and then have the ability to unwind the call whenevs.

# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd

await _services.open_remote_ctx(
portal,
_setup_persistent_brokerd,
Expand All @@ -242,68 +301,31 @@ async def spawn_brokerd(
return dname


class Brokerd:
locks = defaultdict(trio.Lock)


@asynccontextmanager
async def maybe_spawn_brokerd(

brokername: str,
loglevel: Optional[str] = None,
**kwargs,

) -> tractor._portal.Portal:
"""
If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.

"""
if loglevel:
get_console_log(loglevel)

dname = f'brokerd.{brokername}'

# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[brokername]
await lock.acquire()
) -> tractor.Portal:
'''Helper to spawn a brokerd service.

# attach to existing brokerd if possible
async with tractor.find_actor(dname) as portal:
if portal is not None:
lock.release()
yield portal
return
'''
async with maybe_spawn_daemon(

# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
f'brokerd.{brokername}',
spawn_func=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as pikerd_portal:

if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_brokerd(brokername, loglevel=loglevel)

else:
await pikerd_portal.run(
spawn_brokerd,
brokername=brokername,
loglevel=loglevel,
)

async with tractor.wait_for_actor(dname) as portal:
lock.release()
yield portal
) as portal:
yield portal


async def spawn_emsd(

brokername: str,
loglevel: Optional[str] = None,
**extra_tractor_kwargs

Expand All @@ -314,10 +336,10 @@ async def spawn_emsd(
"""
log.info('Spawning emsd')

# TODO: raise exception when _services == None?
global _services
assert _services

await _services.actor_n.start_actor(
portal = await _services.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
Expand All @@ -327,4 +349,34 @@ async def spawn_emsd(
debug_mode=_services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)

# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd

await _services.open_remote_ctx(
portal,
_setup_persistent_emsd,
)

return 'emsd'


@asynccontextmanager
async def maybe_open_emsd(

brokername: str,
loglevel: Optional[str] = None,
**kwargs,

) -> tractor._portal.Portal: # noqa

async with maybe_spawn_daemon(

'emsd',
spawn_func=spawn_emsd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,

) as portal:
yield portal
Loading