Hey,
I am trying to stream an LLM response into a group with multiple consumers using async generators. Using RedisChannelLayer and running the app with uvicorn. Simplified example:
channels: 4.0.0
django: 4.2.13
channels-redis: 4.2.0
import asyncio
from channels.generic.websocket import AsyncJsonWebsocketConsumer
class MyConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.channel_layer.group_add("chat", self.channel_name)
async def disconnect(self):
await self.channel_layer.group_discard("chat", self.channel_name)
async def receive_json(self):
async for chunk in stream_response():
await self.channel_layer.group_send(
"chat",
{
"type": "send_to_client",
**chunk,
},
)
async def send_to_client(self, message):
await self.send_json(message)
async def stream_response():
response = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
text = ""
for chunk in response.split(" "):
text += chunk
await asyncio.sleep(0.1)
yield {"action": "message_chunk", "content": text}
The issue is that while the generator is producing, the active consumer is not processing the messages sent with group_send
, and it will send all the chunks at once after the generator is done producing.
Is it possible to send a message with group_send
in a way that’s not blocked by processing the generator?
A hacky workaround I could think of is to call both send_json
and group_send
in the generator to ensure all the consumers get the messages in time, and just have a way to handle duplicated messages - but that seems less than ideal.
Thanks!