Skip to content

Commit

Permalink
More refinements and proper typing
Browse files Browse the repository at this point in the history
- drop unneeded (and commented) internal cs allocating bits.
- bypass all task manager stuff if no generator is provided by the
  caller; i.e. just call `.start_soon()` as normal.
- fix `Generator` typing.
- add some prints around task manager.
- wrap in `TaskOutcome.lowlevel_task: Task`.
  • Loading branch information
goodboy committed May 19, 2023
1 parent 2a6ea78 commit e4d4434
Showing 1 changed file with 40 additions and 36 deletions.
76 changes: 40 additions & 36 deletions tractor/trionics/_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
contextmanager as cm,
nullcontext,
)
from typing import ContextManager
from typing import (
Generator,
Any,
)

from outcome import (
Outcome,
Expand All @@ -47,6 +50,7 @@ class TaskOutcome(Struct):
to the eventual boxed result/value or raised exception.
'''
lowlevel_task: Task
_exited: Event = trio.Event() # as per `trio.Runner.task_exited()`
_outcome: Outcome | None = None # as per `outcome.Outcome`
_result: Any | None = None # the eventual maybe-returned-value
Expand Down Expand Up @@ -105,7 +109,7 @@ class ScopePerTaskNursery(Struct):
tuple[CancelScope, Outcome]
] = {}

scope_manager: ContextManager | None = None
scope_manager: Generator[Any, Outcome, None] | None = None

async def start_soon(
self,
Expand Down Expand Up @@ -133,16 +137,13 @@ async def start_soon(
if sm is None:
return n.start_soon(async_fn, *args, name=None)

# per_task_cs = CancelScope()
new_task: Task | None = None
to_return: tuple[Any] | None = None

# NOTE: what do we enforce as a signature for the
# `@task_scope_manager` here?
mngr = sm(
nursery=n,
# scope=per_task_cs,
)
mngr = sm(nursery=n)

async def _start_wrapped_in_scope(
task_status: TaskStatus[
tuple[CancelScope, Task]
Expand All @@ -153,13 +154,6 @@ async def _start_wrapped_in_scope(
# TODO: this was working before?!
# nonlocal to_return

task = trio.lowlevel.current_task()
# self._scopes[per_task_cs] = task

# NOTE: we actually don't need this since the user can
# just to it themselves inside mngr!
# with per_task_cs:

# execute up to the first yield
try:
to_return: tuple[Any] = next(mngr)
Expand Down Expand Up @@ -206,13 +200,6 @@ async def _start_wrapped_in_scope(
return to_return


# TODO: you could wrap your output task handle in this?
# class TaskHandle(Struct):
# task: Task
# cs: CancelScope
# outcome: TaskOutcome


# TODO: maybe just make this a generator with a single yield that also
# delivers a value (of some std type) from the yield expression?
# @trio.task_scope_manager
Expand All @@ -224,33 +211,52 @@ def add_task_handle_and_crash_handling(
# they want below?
# scope: CancelScope,

) -> Generator[None, list[Any]]:
) -> Generator[
Any,
Outcome,
None,
]:
'''
A customizable, user defined "task scope manager".
task_outcome = TaskOutcome()
With this specially crafted single-yield generator function you can
add more granular controls around every task spawned by `trio` B)
'''
# if you need it you can ask trio for the task obj
task: Task = trio.lowlevel.current_task()
print(f'Spawning task: {task.name}')

# yields back when task is terminated, cancelled, returns.
# User defined "task handle" for more granular supervision
# of each spawned task as needed for their particular usage.
task_outcome = TaskOutcome(task)

# NOTE: if wanted the user could wrap the output task handle however
# they want!
# class TaskHandle(Struct):
# task: Task
# cs: CancelScope
# outcome: TaskOutcome

# this yields back when the task is terminated, cancelled or returns.
try:
# XXX: wait, this isn't doing anything right since we'd have to
# manually activate this scope using something like:
# `task._activate_cancel_status(cs._cancel_status)` ??
# oh wait, but `.__enter__()` does all that already?
with CancelScope() as cs:

# the yielded value(s) here are what are returned to the
# nursery's `.start_soon()` caller B)
lowlevel_outcome: Outcome = yield (task_outcome, cs)
task_outcome._set_outcome(lowlevel_outcome)

# Adds "crash handling" from `pdbp` by entering
# a REPL on std errors.
except Exception as err:
# Adds "crash handling" from `pdbp` by entering
# a REPL on std errors.
print(f'{task.name} crashed, entering debugger!')
pdbp.xpm()
raise

finally:
print(f'{task.name} Exitted')


@acm
async def open_nursery(
Expand All @@ -264,11 +270,6 @@ async def open_nursery(
)


async def sleep_then_err():
await trio.sleep(1)
assert 0


async def sleep_then_return_val(val: str):
await trio.sleep(0.2)
return val
Expand Down Expand Up @@ -309,7 +310,10 @@ async def main():
print(f'{res} -> GOT EXPECTED TASK VALUE')

await trio.sleep(0.6)
print('Cancelling and waiting for CRASH..')
print(
'Cancelling and waiting on {err_outcome.lowlevel_task} '
'to CRASH..'
)
cs.cancel()

trio.run(main)

0 comments on commit e4d4434

Please sign in to comment.