Django channels function group_send causing problems

Context: I’m making a messaging application on the web, in which I’m implementing Django for the backend. For concurrency I am using websockets (channels). Well, for the user group management (channel layer) I’m using the following logic. There will be two types of groups, ‘chats’ and ‘notifications’. The notifications groups are groups of 1 user and 1 channel_name, they are used to track the user’s connection, they have the following form:

{user_id} : [‘{channel_name}’]

Example: ‘30’: [‘specific.363469477d6745289c5bc679b7947790!d813479fc71448d49592711a7f823e06’]

On the other hand, chat groups will be used to track the presence of users in chats, and report events such as ‘writing’, ‘connecting or disconnecting’, ‘updating’, ‘new messages’, etc … At group level they have the following form: {id_minor}-{id_major} : [‘{channel_of_id_1}, {channel_of_id_2}’]

Example: ‘1-30’: [‘specific.363469477d6745289c5bc679b7947790!d813479fc71448d49592711a7f823e06’]

Note : in group chats there can be a maximum of 2 members, i.e. if user 1 joins the chat with 2, the structure would be like this… 1-2‘ : [’{channel_name_of_1}"].

If 2 enters the chat with 1, the structure would be like this … ‘1-2’ : [‘{channel_name_of_2}’]

And if both are in the chat … ‘1-2’ : [‘{channel_name_of_2}, {channel_name_of_1}’]

Problem: in the django application I have the following function …

logger = logging.getLogger('django.channels')
async def broadcast_connection_inform(user_id, connected):
    broadcast_data =  {
            'type' : 'broadcast_connection_inform_handler',
            'value' : {
                "user_id" : user_id,
                "connected" : connected
            }
        }
    opened_chat_groups = get_opened_chat_groups_with_id(str(user_id))
    if (opened_chat_groups):
        for group in opened_chat_groups:
            logger.info(f"Haciendo broadcast a {group}:{get_redis_groups('chats')[group]} por connection inform")
            await get_channel_layer().group_send(group, broadcast_data)
    return

The idea of this function is that, when a user with an id X is disconnected, all the groups of chats opened with that id are searched to warn that this id is disconnected.

Well, here is the problem …

Application instance \<Task pending name='Task-35' coro=\<StaticFilesWrapper.__call__() running at /home/santiago/Escritorio/Projects/FriendNet/dependencies/lib/python3.8/site-packages/channels/staticfiles.py:44\> wait_for=\<Task pending name='Task-37' coro=\<RedisChannelLayer.receive() running at /home/santiago/Escritorio/Projects/FriendNet/dependencies/lib/python3.8/site-packages/channels_redis/core.py:353\> wait_for=\<Future pending cb=\[\<TaskWakeupMethWrapper object at 0x7f99542952b0\>()\]\> cb=\[\<TaskWakeupMethWrapper object at 0x7f9954e912b0\>()\]\>\> for connection \<WebSocketProtocol client=\['127.0.0.1', 43494\] path=b'/ws/notifications/1/'\> took too long to shut down and was killed.

I get this error when a user is disconnected and another user is in a chat with him, i.e. the broadcast_connection_inform function is executed

Clear example:

{'30': ['specific.37ae0c3238cb486f960ee392cb64326a!49d141b1ed0f4e2f94f5b0cc360244ef'], '1': ['specific.37ae0c3238cb486f960ee392cb64326a!36dd5b94be0641b7a196534cc22b56dc']}
{'1-30': ['specific.37ae0c3238cb486f960ee392cb64326a!49d141b1ed0f4e2f94f5b0cc360244ef']}


WebSocket DISCONNECT /ws/notifications/1/ [127.0.0.1:43494]

Broadcast to 1-30:['specific.37ae0c3238cb486f960ee392cb64326a!49d141b1ed0f4e2f94f5b0cc360244ef'] by connection inform


{'30': ['specific.37ae0c3238cb486f960ee392cb64326a!49d141b1ed0f4e2f94f5b0cc360244ef']}
{'1-30': ['specific.37ae0c3238cb486f960ee392cb64326a!49d141b1ed0f4e2f94f5b0cc360244ef']}

Broadcast connection inform success !!


**( 10 seconds later )**


Application instance <Task pending name='Task-35' coro=<StaticFilesWrapper.__call__() running at /home/santiago/Escritorio/Projects/FriendNet/dependencies/lib/python3.8/site-packages/channels/staticfiles.py:44> wait_for=<Task pending name='Task-37' coro=<RedisChannelLayer.receive() running at /home/santiago/Escritorio/Projects/FriendNet/dependencies/lib/python3.8/site-packages/channels_redis/core.py:353> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f99542952b0>()]> cb=[<TaskWakeupMethWrapper object at 0x7f9954e912b0>()]>> for connection <WebSocketProtocol client=['127.0.0.1', 43494] path=b'/ws/notifications/1/'> took too long to shut down and was killed.

Note that this same function (broadcast_connection_inform) is also executed when the user connects, and in that case it works perfectly. Consumer code:

`

class NotificationsWSConsumer(AsyncWebsocketConsumer):

async def send_ping(self):
    while True:
        last_pong_timediff = (datetime.now() - self.last_pong).total_seconds()
        user_id = str(self.scope['url_route']['kwargs']['user_id'])
        logger_ping_pong.info(f"{user_id}, Enviando ping. Pong diff : {last_pong_timediff}")
        if (last_pong_timediff > (self.ping_timing*2 + self.ping_timing/3)):
            await self.disconnect(10)
            break
        else:
            await self.send(text_data=json.dumps({
                'type': 'ping',
            }))
            await asyncio.sleep(self.ping_timing)

async def _discard_channel_from_groups(self):
    if (("group_name" in self.scope) and self.scope["group_name"]):
        logger_channels.info(f"Eliminando websocket de chat, {self.scope['group_name']}:{self.channel_name}")
        await self.channel_layer.group_discard(self.scope["group_name"], self.channel_name)
        self.scope["group_name"] = None

async def _cancel_ping_task(self, user_id):
    self.ping_task.cancel()
    try:
        await self.ping_task
    except asyncio.CancelledError:
        logger_ping_pong.info(f'Tarea de ping cancelada para el usuario {user_id}')
    else:
        logger_ping_pong.info(f'TAREA DE PING NO FUE CANCELADA para el usuario {user_id}')



async def connect(self):
    self.scope["group_name"] = None
    await self.accept()
    user_id = str(self.scope['url_route']['kwargs']['user_id'])
    groups = get_redis_groups("notifications")
    logger_channels.info(f'-> Conectando websocket de notificacion, {user_id}')
    if ((user_id not in groups) or ((user_id in groups) and (self.channel_name not in groups[user_id]))):

        if (user_id in groups) and len(groups[user_id]) > 0:
            logger_channels.info("Channel existente detectado, aplicando substitucion de channels")
            await self.channel_layer.group_send(user_id,{"type" : "broadcast_connection_error_handler"})

        await self.channel_layer.group_add(user_id,self.channel_name)

        await broadcast_connection_inform(user_id=user_id, connected=True)
        print_pretty_groups()
        self.ping_timing = 30
        self.ping_task = asyncio.create_task(self.send_ping())
        self.last_pong = datetime.now()

async def disconnect(self, close_code):
    user_id = str(self.scope['url_route']['kwargs']['user_id'])
    logger_channels.info(f'-> Desconectando websocket de notificacion, {user_id}:{self.channel_name}')
    await self._cancel_ping_task(user_id)
    await self._discard_channel_from_groups()
    if (user_id in get_redis_groups("notifications")):
        logger_channels.info("-> La desconexion si se dara")
        await self.channel_layer.group_discard(user_id, self.channel_name)
        notifications_groups = get_redis_groups("notifications")
        if ((user_id not in notifications_groups) or (len(notifications_groups[user_id]) == 0)):
            handle_initial_notification_ids('delete', user_id )
            cache.delete(f"message_pagination_ref_{user_id}")
            await broadcast_connection_inform(user_id=user_id, connected=False)
    print_pretty_groups()
    return await super().disconnect(close_code)



async def receive(self, text_data):
    data = json.loads(text_data)
    user_id = int(self.scope['url_route']['kwargs']['user_id'])
    # not
    if (data["type"] == "typing_inform"):
        value = data["value"]
        if notification_websocket_is_opened(value["clicked_user_id"]):
            await broadcast_typing_inform(user_id, **value)


    # chat
    if data['type'] == "group_creation":
        await self._discard_channel_from_groups()
        self.scope["group_name"] = messages_group_name(user_id, data['value']['clicked_user_id'])

        groups = get_redis_groups("chats")
        group_name = self.scope["group_name"]
        if (((group_name in groups) and (self.channel_name not in groups[group_name])) or (group_name not in groups)):
            logger_channels.info(f"Agregando websocket de chat, {self.scope['group_name']}:{self.channel_name}")
            await self.channel_layer.group_add(self.scope["group_name"],self.channel_name)

    if data['type'] == "group_delete":
        await self._discard_channel_from_groups()
    
    if data['type'] == "pong":
        self.last_pong = datetime.now()
        logger_ping_pong.info(f"{user_id}, Recibiendo pong : {self.last_pong}")
    print_pretty_groups()

async def broadcast_typing_inform_handler(self, event):
    await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="typing_inform", broadcast_value=event["value"])))
async def broadcast_notification_handler(self, event):
    await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="new_notification", broadcast_value=event["value"])))
async def broadcast_connection_error_handler(self, event):
    logger_channels.info(f"----------------------------------------- Alcance de broadcast_connection_error : {self.channel_name}")
    try:
        await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="connection_error")))
    except:
        logger_channels.info(f"----------------------------------------- Fallo al enviar 'broadcast_connection_error'")
    await self.disconnect(10)
async def broadcast_updated_user_handler(self, event):
    await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="updated_user", broadcast_value=event["value"])))
async def broadcast_message_handler(self, event):
    await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="message_broadcast", broadcast_value=event["value"])))
async def broadcast_connection_inform_handler(self, event):
    logger_channels.info("Broadcast connection inform exitoso !!")
    await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="connection_inform", broadcast_value=event["value"])))`

Any help would be greatly appreciated.

I have tried everything

I can suggest to use AsyncWebsocketConsumer.close() method to force disconnection from the server-side, because AsyncWebsocketConsumer.disconnect() is just a hook which gets called when the client is closing the connection.

Also pay attention that .close does not calls .disconnect, thus the ping/pong cleanup should be awaited before calling the closure.

An other tip is to use wait_for with a timeout to ensure that it ends in a reasonable amount of time:

async def _cancel_ping_task(self, user_id):
    self.ping_task.cancel()
    try:
        await asyncio.wait_for(self.ping_task, timeout=5)
    except (asyncio.CancelledError, asyncio.TimeoutError):
        logger_ping_pong.info(f'Tarea de ping cancelada para el usuario {user_id}')
    else:
        logger_ping_pong.info(f'TAREA DE PING NO FUE CANCELADA para el usuario {user_id}')

Something like this?

async def disconnect(self, close_code):
    user_id = str(self.scope['url_route']['kwargs']['user_id'])
    logger_channels.info(f'-> Desconectando websocket de notificacion, {user_id}:{self.channel_name}')
    await self._cancel_ping_task(user_id)
    await self._discard_channel_from_groups()
    if (user_id in get_redis_groups("notifications")):
        logger_channels.info("-> La desconexion si se dara")
        await self.channel_layer.group_discard(user_id, self.channel_name)
        notifications_groups = get_redis_groups("notifications")
        if ((user_id not in notifications_groups) or (len(notifications_groups[user_id]) == 0)):
            handle_initial_notification_ids('delete', user_id )
            cache.delete(f"message_pagination_ref_{user_id}")
            await broadcast_connection_inform(user_id=user_id, connected=False)
    print_pretty_groups()
    AsyncWebsocketConsumer.close(self, close_code)
    return await super().disconnect(close_code)

No. What I mean is to use self.close instead of self.disconnect in broadcast_connection_error_handler and send_ping.

Also for debug purpose it is useful to provide a name to asyncio.create_task which helps a lot to interpret the error.

PS: It is an method defined on the class, there is no reason to call it like AsyncWebsocketConsumer.close(self, ...).

Well, i modified the code with your sugestions, the problem is still happening. Is there any way to log the lists of tasks, and try to find the pending task?

Thanks for the help btw

Well, I haven’t tried to implement a solution yet, but I noticed a bug that is occurring in my consumer.

When the broadcast_connection_inform function is executed, the self.disconnect method is being executed before the signal arrives to the handler (broadcast_connection_inform_handler), it is very possible that this is the cause of the bug. I will be posting the solution.