StreamingResponse driven by a TaskGroup

I’d like to be able to return a streaming response driven by a TaskGroup eg:

async def news_and_weather(request: HttpRequest) -> StreamingHttpResponse:
    async def gen():
        async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
            async with tx, connect_ws(ws_url) as conn:
                async for msg in conn:
                    await tx.send(msg)

        async with anyio.create_task_group() as tg:
            tx, rx =  anyio.create_memory_object_stream[bytes]()
            with tx, rx:
                tg.start_soon(push, "ws://example.com/news", tx.clone())
                tg.start_soon(push, "ws://example.com/weather", tx.clone())
                tx.close()
                async for msg in rx:
                    yield msg  # yield in async generator!! illegal inside TaskGroup!
    return StreamingHttpResponse(gen())

however this doesn’t work because I’m using a yield inside an async generator that’s not a context manager, and calling aclosing() on that async generator is not sufficient to allow a TaskGroup to cancel itself and catch the cancel error.

I’d like to be able to instead return a new class StreamingCmgrHttpResponse:

async def news_and_weather(request: HttpRequest) -> StreamingCmgrHttpResponse:
    @contextlib.asynccontextmanager
    async def acmgr_gen():
        async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
            async with tx, connect_ws(ws_url) as conn:
                async for msg in conn:
                    await tx.send(msg)

        async with anyio.create_task_group() as tg:
            tx, rx =  anyio.create_memory_object_stream[bytes]()
            with tx, rx:
                tg.start_soon(push, "ws://example.com/news", tx.clone())
                tg.start_soon(push, "ws://example.com/weather", tx.clone())
                tx.close()
                yield rx  # yield inside asynccontextmanager, permitted inside TaskGroup

    return StreamingCmgrHttpResponse(acmgr_gen())

Hi @graingert — what’s the question here?

As long as your class does the right thing and responds True to response.streaming you should be good to go no?

ASGIHandler does this:

            async with aclosing(aiter(response)) as content:
                async for part in content:
                     ...

What else would your example need to be able to work?

No this won’t work because there’s nowhere for the asynccontextmanager to collect the asyncio.CancelledError

the code here: django/django/core/handlers/asgi.py at a2f7b3a6a04c8c46c38040c9d9d5bdc6298bd714 · django/django · GitHub
needs an extra something like this:

        if response.streaming_cmgr:
            # - Consume via `__aiter__` and not `streaming_content` directly, to
            #   allow mapping of a sync iterator.
            # - Use aclosing() when consuming aiter. See
            #   https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e
            async with response.streaming_cmgr_content as content:
                async for part in content:
                    for chunk, _ in self.chunk_bytes(part):
                        await send(
                            {
                                "type": "http.response.body",
                                "body": chunk,
                                # Ignore "more" as there may be more parts; instead,
                                # use an empty final closing message with False.
                                "more_body": True,
                            }
                        )

we could have sync/async versions of this - but the sync one would be less useful as there’s not an easy way for a sync context manager to raise an exception into its body like an async one can.

Sorry @graingert — it’s early :coffee: — I’m not quite following…

All you’ve effectively done there is remove the aclosing() no? :thinking: (Assuming .streaming_cmgr_content is just tweaked streaming_content)

This may go easier if you create a branch on your fork with some test cases showing the behaviour?

Is the question, “You can’t wrap a task group in an async generator, so can we adjust ASGIHandler to handle task groups?”

If so, yes, probably.

That.

yep! I’ve got a branch up now

1 Like

well it doesn’t have any test cases yet

1 Like

here it is with just two basic end to end tests, add support for streaming with TaskGroups by graingert · Pull Request #19364 · django/django · GitHub probably will need a gzip test and a test with the TestClient too

OK, thanks. Let me have a play. :yoyo: