Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for send client disconnect to HTTP #2732

Merged
merged 4 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions starlette/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from starlette.background import BackgroundTask
from starlette.concurrency import iterate_in_threadpool
from starlette.datastructures import URL, Headers, MutableHeaders
from starlette.requests import ClientDisconnect
from starlette.types import Receive, Scope, Send


Expand Down Expand Up @@ -249,14 +250,22 @@ async def stream_response(self, send: Send) -> None:
await send({"type": "http.response.body", "body": b"", "more_body": False})

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
async with anyio.create_task_group() as task_group:
spec_version = tuple(map(int, scope.get("asgi", {}).get("spec_version", "2.0").split(".")))

async def wrap(func: typing.Callable[[], typing.Awaitable[None]]) -> None:
await func()
task_group.cancel_scope.cancel()
if spec_version >= (2, 4):
try:
await self.stream_response(send)
except OSError:
raise ClientDisconnect()
else:
async with anyio.create_task_group() as task_group:

async def wrap(func: typing.Callable[[], typing.Awaitable[None]]) -> None:
await func()
task_group.cancel_scope.cancel()

task_group.start_soon(wrap, partial(self.stream_response, send))
await wrap(partial(self.listen_for_disconnect, receive))
task_group.start_soon(wrap, partial(self.stream_response, send))
await wrap(partial(self.listen_for_disconnect, receive))

if self.background is not None:
await self.background()
Expand Down
37 changes: 35 additions & 2 deletions tests/test_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import time
from http.cookies import SimpleCookie
from pathlib import Path
from typing import Any, AsyncIterator, Iterator
from typing import Any, AsyncGenerator, AsyncIterator, Iterator

import anyio
import pytest

from starlette import status
from starlette.background import BackgroundTask
from starlette.datastructures import Headers
from starlette.requests import Request
from starlette.requests import ClientDisconnect, Request
from starlette.responses import FileResponse, JSONResponse, RedirectResponse, Response, StreamingResponse
from starlette.testclient import TestClient
from starlette.types import Message, Receive, Scope, Send
Expand Down Expand Up @@ -542,6 +542,39 @@ async def stream_indefinitely() -> AsyncIterator[bytes]:
assert not cancel_scope.cancel_called, "Content streaming should stop itself."


@pytest.mark.anyio
async def test_streaming_response_on_client_disconnects() -> None:
chunks = bytearray()
streamed = False

async def receive_disconnect() -> Message:
raise NotImplementedError

async def send(message: Message) -> None:
nonlocal streamed
if message["type"] == "http.response.body":
if not streamed:
chunks.extend(message.get("body", b""))
streamed = True
else:
raise OSError

async def stream_indefinitely() -> AsyncGenerator[bytes, None]:
while True:
await anyio.sleep(0)
yield b"chunk"

stream = stream_indefinitely()
response = StreamingResponse(content=stream)

with anyio.move_on_after(1) as cancel_scope:
with pytest.raises(ClientDisconnect):
await response({"asgi": {"spec_version": "2.4"}}, receive_disconnect, send)
assert not cancel_scope.cancel_called, "Content streaming should stop itself."
assert chunks == b"chunk"
await stream.aclose()


README = """\
# BáiZé

Expand Down