Async generators and group_send

Hey,

I am trying to stream an LLM response into a group with multiple consumers using async generators. Using RedisChannelLayer and running the app with uvicorn. Simplified example:

channels: 4.0.0
django: 4.2.13
channels-redis: 4.2.0

import asyncio
from channels.generic.websocket import AsyncJsonWebsocketConsumer

class MyConsumer(AsyncJsonWebsocketConsumer):
    async def connect(self):
        await self.channel_layer.group_add("chat", self.channel_name)

    async def disconnect(self):
        await self.channel_layer.group_discard("chat", self.channel_name)

    async def receive_json(self):
        async for chunk in stream_response():
            await self.channel_layer.group_send(
                "chat",
                {
                    "type": "send_to_client",
                    **chunk,
                },
            )

    async def send_to_client(self, message):
        await self.send_json(message)

async def stream_response():
    response = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
    text = ""
    for chunk in response.split(" "):
        text += chunk
        await asyncio.sleep(0.1)
        yield {"action": "message_chunk", "content": text}

The issue is that while the generator is producing, the active consumer is not processing the messages sent with group_send, and it will send all the chunks at once after the generator is done producing.

Is it possible to send a message with group_send in a way that’s not blocked by processing the generator?

A hacky workaround I could think of is to call both send_json and group_send in the generator to ensure all the consumers get the messages in time, and just have a way to handle duplicated messages - but that seems less than ideal.

Thanks!

1 Like

You could try asyncio.create_task in the receive_json method.