diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 72903aea..d99d30a2 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -29,6 +29,9 @@ This library adheres to `Semantic Versioning 2.0 `_.
- Fixed ``to_process.run_sync()`` failing to initialize if ``__main__.__file__`` pointed
to a file in a nonexistent directory
(`#696 `_)
+- Fixed ``AssertionError: feed_data after feed_eof`` on asyncio when a subprocess is
+ closed early, before its output has been read
+ (`#490 `_)
- Fixed ``TaskInfo.has_pending_cancellation()`` on asyncio not respecting shielded
scopes (`#771 `_; PR by @gschaffner)
- Fixed ``SocketStream.receive()`` returning ``bytearray`` instead of ``bytes`` when
diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py
index b88b9cc1..97edb8ad 100644
--- a/src/anyio/_backends/_asyncio.py
+++ b/src/anyio/_backends/_asyncio.py
@@ -930,7 +930,7 @@ async def receive(self, max_bytes: int = 65536) -> bytes:
raise EndOfStream
async def aclose(self) -> None:
- self._stream.feed_eof()
+ self._stream.set_exception(ClosedResourceError())
await AsyncIOBackend.checkpoint()
diff --git a/tests/test_subprocesses.py b/tests/test_subprocesses.py
index 84c4b4dc..e8020a7c 100644
--- a/tests/test_subprocesses.py
+++ b/tests/test_subprocesses.py
@@ -13,7 +13,13 @@
import pytest
from pytest import FixtureRequest
-from anyio import CancelScope, ClosedResourceError, open_process, run_process
+from anyio import (
+ CancelScope,
+ ClosedResourceError,
+ create_task_group,
+ open_process,
+ run_process,
+)
from anyio.streams.buffered import BufferedByteReceiveStream
pytestmark = pytest.mark.anyio
@@ -289,3 +295,33 @@ async def test_py39_arguments(
pytest.skip(f"the {argname!r} argument is not supported by uvloop yet")
raise
+
+
+async def test_close_early() -> None:
+ """Regression test for #490."""
+ code = dedent("""\
+ import sys
+ for _ in range(100):
+ sys.stdout.buffer.write(bytes(range(256)))
+ """)
+
+ async with await open_process([sys.executable, "-c", code]):
+ pass
+
+
+async def test_close_while_reading() -> None:
+ code = dedent("""\
+ import time
+
+ time.sleep(3)
+ """)
+
+ async with await open_process(
+ [sys.executable, "-c", code]
+ ) as process, create_task_group() as tg:
+ assert process.stdout
+ tg.start_soon(process.stdout.aclose)
+ with pytest.raises(ClosedResourceError):
+ await process.stdout.receive()
+
+ process.terminate()