Skip to content

Commit

Permalink
Merge pull request #219 from goodboy/bi_streaming_no_debugger_stuff
Browse files Browse the repository at this point in the history
Initial bi-directional streaming support!
  • Loading branch information
goodboy authored Jul 31, 2021
2 parents 4d530de + 240f591 commit 54d8c93
Show file tree
Hide file tree
Showing 22 changed files with 1,961 additions and 331 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ jobs:
mypy:
name: 'MyPy'
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '3.8'
python-version: '3.9'

- name: Install dependencies
run: pip install -U . --upgrade-strategy eager
run: pip install -U . --upgrade-strategy eager -r requirements-test.txt

- name: Run MyPy check
run: mypy tractor/ --ignore-missing-imports

Expand Down
95 changes: 94 additions & 1 deletion docs/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ Zombie safe: self-destruct a process tree
print('This process tree will self-destruct in 1 sec...')
await trio.sleep(1)
# you could have done this yourself
# raise an error in root actor/process and trigger
# reaping of all minions
raise Exception('Self Destructed')
Expand Down Expand Up @@ -197,6 +198,98 @@ And, yes, there's a built-in crash handling mode B)
We're hoping to add a respawn-from-repl system soon!


SC compatible bi-directional streaming
--------------------------------------
Yes, you saw it here first; we provide 2-way streams
with reliable, transitive setup/teardown semantics.

Our nascent api is remniscent of ``trio.Nursery.start()``
style invocation:

.. code:: python
import trio
import tractor
@tractor.context
async def simple_rpc(
ctx: tractor.Context,
data: int,
) -> None:
'''Test a small ping-pong 2-way streaming server.
'''
# signal to parent that we're up much like
# ``trio_typing.TaskStatus.started()``
await ctx.started(data + 1)
async with ctx.open_stream() as stream:
count = 0
async for msg in stream:
assert msg == 'ping'
await stream.send('pong')
count += 1
else:
assert count == 10
async def main() -> None:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'rpc_server',
enable_modules=[__name__],
)
# XXX: this syntax requires py3.9
async with (
portal.open_context(
simple_rpc,
data=10,
) as (ctx, sent),
ctx.open_stream() as stream,
):
assert sent == 11
count = 0
# receive msgs using async for style
await stream.send('ping')
async for msg in stream:
assert msg == 'pong'
await stream.send('ping')
count += 1
if count >= 9:
break
# explicitly teardown the daemon-actor
await portal.cancel_actor()
if __name__ == '__main__':
trio.run(main)
See original proposal and discussion in `#53`_ as well
as follow up improvements in `#223`_ that we'd love to
hear your thoughts on!

.. _#53: https://github.com/goodboy/tractor/issues/53
.. _#223: https://github.com/goodboy/tractor/issues/223


Worker poolz are easy peasy
---------------------------
The initial ask from most new users is *"how do I make a worker
Expand Down
72 changes: 72 additions & 0 deletions examples/rpc_bidir_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import trio
import tractor


@tractor.context
async def simple_rpc(

ctx: tractor.Context,
data: int,

) -> None:
'''Test a small ping-pong 2-way streaming server.
'''
# signal to parent that we're up much like
# ``trio_typing.TaskStatus.started()``
await ctx.started(data + 1)

async with ctx.open_stream() as stream:

count = 0
async for msg in stream:

assert msg == 'ping'
await stream.send('pong')
count += 1

else:
assert count == 10


async def main() -> None:

async with tractor.open_nursery() as n:

portal = await n.start_actor(
'rpc_server',
enable_modules=[__name__],
)

# XXX: syntax requires py3.9
async with (

portal.open_context(
simple_rpc, # taken from pytest parameterization
data=10,

) as (ctx, sent),

ctx.open_stream() as stream,
):

assert sent == 11

count = 0
# receive msgs using async for style
await stream.send('ping')

async for msg in stream:
assert msg == 'pong'
await stream.send('ping')
count += 1

if count >= 9:
break

# explicitly teardown the daemon-actor
await portal.cancel_actor()


if __name__ == '__main__':
trio.run(main)
Loading

0 comments on commit 54d8c93

Please sign in to comment.