Skip to content

Commit

Permalink
Add 2-way streaming example to readme and scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Jul 31, 2021
1 parent 69bbf6a commit a35b938
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 3 deletions.
72 changes: 72 additions & 0 deletions NEWS.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
=========
Changelog
=========

tractor 0.1.0a1 (2021-08-01)
============================

Experiments and WIPs
--------------------
- Initial optional ``msgspec`` serialization support in
`#214 <https://github.com/goodboy/tractor/pull/214>`
which should hopefully land by next release.

- Improved "infect ``asyncio``" cross-loop task cancellation
and error propagation by vastly simplifying the approach.
We may end up just going fully ``anyio`` in the medium term
despite all this work.


Improved Documentation
----------------------
.. todo: hyperlinks in this one
- Updated our readme to include more (and better) examples (with
matching multi-terminal process monitoring shell commands) as well as
added many more examples to the repo set.
- Added a readme "actors under the hood" section in an effort to guard against
suggestitons for chaning the API away from ``trio``'s
*tasks-as-functions* style.


Trivial/Internal Changes
------------------------
- Added a new ``TransportClosed`` internal exception/signal for catching
TCP channel gentle closes instead of silently falling through the
message handler loop via an async generator ``return```.


Deprecations and Removals
-------------------------
- Dropped support for invoking sync functions in other actors/processes
since you can always wrap a sync function from an async one.
Users can instead consider using ``trio-parallel`` which is a project
specifically geared for purely synchronous calls in sub-processes.

- Deprecated our ``tractor.run()`` entrypoint; the runtime is now either
started implicitly in first actor nursery use or via an explicit call
to


Features
--------
- Updated our uni-directional streaming API to require a context manager style
``async Portal.stream_from(target) as stream:`` which explicitly
determines when to stop a stream in the calling (portal opening) actor.

- Improved the ``multiprocessing`` backend process reaping durning
actor nursery exit, particulary during cancellation scenarios that
previously resulted in hangs.


tractor 0.1.0a0 (2021-02-28)
============================

..
TODO: fill out the details of the initial feature set in some TLDR form
Features
--------
- ``trio`` based process spawner (using ``subprocess``)
- initial multi-process debugging with ``pdb++``
- windows support using both ``trio`` and ``multiprocessing`` spawn
backends
95 changes: 94 additions & 1 deletion docs/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ Zombie safe: self-destruct a process tree
print('This process tree will self-destruct in 1 sec...')
await trio.sleep(1)
# you could have done this yourself
# raise an error in root actor/process and trigger
# reaping of all minions
raise Exception('Self Destructed')
Expand Down Expand Up @@ -197,6 +198,98 @@ And, yes, there's a built-in crash handling mode B)
We're hoping to add a respawn-from-repl system soon!


SC compatible bi-directional streaming
--------------------------------------
Yes, you saw it here first; we provide 2-way streams
with reliable, transitive setup/teardown semantics.

Our nascent api is remniscent of ``trio.Nursery.start()``
style invocation:

.. code:: python
import trio
import tractor
@tractor.context
async def simple_rpc(
ctx: tractor.Context,
data: int,
) -> None:
'''Test a small ping-pong 2-way streaming server.
'''
# signal to parent that we're up much like
# ``trio_typing.TaskStatus.started()``
await ctx.started(data + 1)
async with ctx.open_stream() as stream:
count = 0
async for msg in stream:
assert msg == 'ping'
await stream.send('pong')
count += 1
else:
assert count == 10
async def main() -> None:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'rpc_server',
enable_modules=[__name__],
)
# XXX: this syntax requires py3.9
async with (
portal.open_context(
simple_rpc,
data=10,
) as (ctx, sent),
ctx.open_stream() as stream,
):
assert sent == 11
count = 0
# receive msgs using async for style
await stream.send('ping')
async for msg in stream:
assert msg == 'pong'
await stream.send('ping')
count += 1
if count >= 9:
break
# explicitly teardown the daemon-actor
await portal.cancel_actor()
if __name__ == '__main__':
trio.run(main)
See original proposal and discussion in `#53`_ as well
as follow up improvements in `#223`_ that we'd love to
hear your thoughts on!

.. _#53: https://github.com/goodboy/tractor/issues/53
.. _#223: https://github.com/goodboy/tractor/issues/223


Worker poolz are easy peasy
---------------------------
The initial ask from most new users is *"how do I make a worker
Expand Down
72 changes: 72 additions & 0 deletions examples/rpc_bidir_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import trio
import tractor


@tractor.context
async def simple_rpc(

ctx: tractor.Context,
data: int,

) -> None:
'''Test a small ping-pong 2-way streaming server.
'''
# signal to parent that we're up much like
# ``trio_typing.TaskStatus.started()``
await ctx.started(data + 1)

async with ctx.open_stream() as stream:

count = 0
async for msg in stream:

assert msg == 'ping'
await stream.send('pong')
count += 1

else:
assert count == 10


async def main() -> None:

async with tractor.open_nursery() as n:

portal = await n.start_actor(
'rpc_server',
enable_modules=[__name__],
)

# XXX: syntax requires py3.9
async with (

portal.open_context(
simple_rpc, # taken from pytest parameterization
data=10,

) as (ctx, sent),

ctx.open_stream() as stream,
):

assert sent == 11

count = 0
# receive msgs using async for style
await stream.send('ping')

async for msg in stream:
assert msg == 'pong'
await stream.send('ping')
count += 1

if count >= 9:
break

# explicitly teardown the daemon-actor
await portal.cancel_actor()


if __name__ == '__main__':
trio.run(main)
8 changes: 6 additions & 2 deletions tests/test_docs_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def run(script_code):
if '__' not in f
and f[0] != '_'
and 'debugging' not in p[0]
],
and 'debugging' not in p[0]],
ids=lambda t: t[1],
)
def test_example(run_example_in_subproc, example_script):
Expand All @@ -98,6 +98,10 @@ def test_example(run_example_in_subproc, example_script):
test_example``.
"""
ex_file = os.path.join(*example_script)

if 'rpc_bidir_streaming' in ex_file and not sys.version >= (3, 9):
pytest.skip("2-way streaming example requires py3.9 async with syntax")

with open(ex_file, 'r') as ex:
code = ex.read()

Expand Down

0 comments on commit a35b938

Please sign in to comment.