Context: I’m making a messaging application on the web, in which I’m implementing Django for the backend. For concurrency I am using websockets (channels). Well, for the user group management (channel layer) I’m using the following logic. There will be two types of groups, ‘chats’ and ‘notifications’. The notifications groups are groups of 1 user and 1 channel_name, they are used to track the user’s connection, they have the following form:
{user_id} : [‘{channel_name}’]
Example: ‘30’: [‘specific.363469477d6745289c5bc679b7947790!d813479fc71448d49592711a7f823e06’]
On the other hand, chat groups will be used to track the presence of users in chats, and report events such as ‘writing’, ‘connecting or disconnecting’, ‘updating’, ‘new messages’, etc … At group level they have the following form: {id_minor}-{id_major} : [‘{channel_of_id_1}, {channel_of_id_2}’]
Example: ‘1-30’: [‘specific.363469477d6745289c5bc679b7947790!d813479fc71448d49592711a7f823e06’]
Note : in group chats there can be a maximum of 2 members, i.e. if user 1 joins the chat with 2, the structure would be like this… 1-2‘ : [’{channel_name_of_1}"].
If 2 enters the chat with 1, the structure would be like this … ‘1-2’ : [‘{channel_name_of_2}’]
And if both are in the chat … ‘1-2’ : [‘{channel_name_of_2}, {channel_name_of_1}’]
Problem: in the django application I have the following function …
logger = logging.getLogger('django.channels')
async def broadcast_connection_inform(user_id, connected):
broadcast_data = {
'type' : 'broadcast_connection_inform_handler',
'value' : {
"user_id" : user_id,
"connected" : connected
}
}
opened_chat_groups = get_opened_chat_groups_with_id(str(user_id))
if (opened_chat_groups):
for group in opened_chat_groups:
logger.info(f"Haciendo broadcast a {group}:{get_redis_groups('chats')[group]} por connection inform")
await get_channel_layer().group_send(group, broadcast_data)
return
The idea of this function is that, when a user with an id X is disconnected, all the groups of chats opened with that id are searched to warn that this id is disconnected.
Well, here is the problem …
Application instance \<Task pending name='Task-35' coro=\<StaticFilesWrapper.__call__() running at /home/santiago/Escritorio/Projects/FriendNet/dependencies/lib/python3.8/site-packages/channels/staticfiles.py:44\> wait_for=\<Task pending name='Task-37' coro=\<RedisChannelLayer.receive() running at /home/santiago/Escritorio/Projects/FriendNet/dependencies/lib/python3.8/site-packages/channels_redis/core.py:353\> wait_for=\<Future pending cb=\[\<TaskWakeupMethWrapper object at 0x7f99542952b0\>()\]\> cb=\[\<TaskWakeupMethWrapper object at 0x7f9954e912b0\>()\]\>\> for connection \<WebSocketProtocol client=\['127.0.0.1', 43494\] path=b'/ws/notifications/1/'\> took too long to shut down and was killed.
I get this error when a user is disconnected and another user is in a chat with him, i.e. the broadcast_connection_inform function is executed …
Clear example:
{'30': ['specific.37ae0c3238cb486f960ee392cb64326a!49d141b1ed0f4e2f94f5b0cc360244ef'], '1': ['specific.37ae0c3238cb486f960ee392cb64326a!36dd5b94be0641b7a196534cc22b56dc']}
{'1-30': ['specific.37ae0c3238cb486f960ee392cb64326a!49d141b1ed0f4e2f94f5b0cc360244ef']}
WebSocket DISCONNECT /ws/notifications/1/ [127.0.0.1:43494]
Broadcast to 1-30:['specific.37ae0c3238cb486f960ee392cb64326a!49d141b1ed0f4e2f94f5b0cc360244ef'] by connection inform
{'30': ['specific.37ae0c3238cb486f960ee392cb64326a!49d141b1ed0f4e2f94f5b0cc360244ef']}
{'1-30': ['specific.37ae0c3238cb486f960ee392cb64326a!49d141b1ed0f4e2f94f5b0cc360244ef']}
Broadcast connection inform success !!
**( 10 seconds later )**
Application instance <Task pending name='Task-35' coro=<StaticFilesWrapper.__call__() running at /home/santiago/Escritorio/Projects/FriendNet/dependencies/lib/python3.8/site-packages/channels/staticfiles.py:44> wait_for=<Task pending name='Task-37' coro=<RedisChannelLayer.receive() running at /home/santiago/Escritorio/Projects/FriendNet/dependencies/lib/python3.8/site-packages/channels_redis/core.py:353> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f99542952b0>()]> cb=[<TaskWakeupMethWrapper object at 0x7f9954e912b0>()]>> for connection <WebSocketProtocol client=['127.0.0.1', 43494] path=b'/ws/notifications/1/'> took too long to shut down and was killed.
Note that this same function (broadcast_connection_inform) is also executed when the user connects, and in that case it works perfectly. Consumer code:
`
class NotificationsWSConsumer(AsyncWebsocketConsumer):
async def send_ping(self):
while True:
last_pong_timediff = (datetime.now() - self.last_pong).total_seconds()
user_id = str(self.scope['url_route']['kwargs']['user_id'])
logger_ping_pong.info(f"{user_id}, Enviando ping. Pong diff : {last_pong_timediff}")
if (last_pong_timediff > (self.ping_timing*2 + self.ping_timing/3)):
await self.disconnect(10)
break
else:
await self.send(text_data=json.dumps({
'type': 'ping',
}))
await asyncio.sleep(self.ping_timing)
async def _discard_channel_from_groups(self):
if (("group_name" in self.scope) and self.scope["group_name"]):
logger_channels.info(f"Eliminando websocket de chat, {self.scope['group_name']}:{self.channel_name}")
await self.channel_layer.group_discard(self.scope["group_name"], self.channel_name)
self.scope["group_name"] = None
async def _cancel_ping_task(self, user_id):
self.ping_task.cancel()
try:
await self.ping_task
except asyncio.CancelledError:
logger_ping_pong.info(f'Tarea de ping cancelada para el usuario {user_id}')
else:
logger_ping_pong.info(f'TAREA DE PING NO FUE CANCELADA para el usuario {user_id}')
async def connect(self):
self.scope["group_name"] = None
await self.accept()
user_id = str(self.scope['url_route']['kwargs']['user_id'])
groups = get_redis_groups("notifications")
logger_channels.info(f'-> Conectando websocket de notificacion, {user_id}')
if ((user_id not in groups) or ((user_id in groups) and (self.channel_name not in groups[user_id]))):
if (user_id in groups) and len(groups[user_id]) > 0:
logger_channels.info("Channel existente detectado, aplicando substitucion de channels")
await self.channel_layer.group_send(user_id,{"type" : "broadcast_connection_error_handler"})
await self.channel_layer.group_add(user_id,self.channel_name)
await broadcast_connection_inform(user_id=user_id, connected=True)
print_pretty_groups()
self.ping_timing = 30
self.ping_task = asyncio.create_task(self.send_ping())
self.last_pong = datetime.now()
async def disconnect(self, close_code):
user_id = str(self.scope['url_route']['kwargs']['user_id'])
logger_channels.info(f'-> Desconectando websocket de notificacion, {user_id}:{self.channel_name}')
await self._cancel_ping_task(user_id)
await self._discard_channel_from_groups()
if (user_id in get_redis_groups("notifications")):
logger_channels.info("-> La desconexion si se dara")
await self.channel_layer.group_discard(user_id, self.channel_name)
notifications_groups = get_redis_groups("notifications")
if ((user_id not in notifications_groups) or (len(notifications_groups[user_id]) == 0)):
handle_initial_notification_ids('delete', user_id )
cache.delete(f"message_pagination_ref_{user_id}")
await broadcast_connection_inform(user_id=user_id, connected=False)
print_pretty_groups()
return await super().disconnect(close_code)
async def receive(self, text_data):
data = json.loads(text_data)
user_id = int(self.scope['url_route']['kwargs']['user_id'])
# not
if (data["type"] == "typing_inform"):
value = data["value"]
if notification_websocket_is_opened(value["clicked_user_id"]):
await broadcast_typing_inform(user_id, **value)
# chat
if data['type'] == "group_creation":
await self._discard_channel_from_groups()
self.scope["group_name"] = messages_group_name(user_id, data['value']['clicked_user_id'])
groups = get_redis_groups("chats")
group_name = self.scope["group_name"]
if (((group_name in groups) and (self.channel_name not in groups[group_name])) or (group_name not in groups)):
logger_channels.info(f"Agregando websocket de chat, {self.scope['group_name']}:{self.channel_name}")
await self.channel_layer.group_add(self.scope["group_name"],self.channel_name)
if data['type'] == "group_delete":
await self._discard_channel_from_groups()
if data['type'] == "pong":
self.last_pong = datetime.now()
logger_ping_pong.info(f"{user_id}, Recibiendo pong : {self.last_pong}")
print_pretty_groups()
async def broadcast_typing_inform_handler(self, event):
await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="typing_inform", broadcast_value=event["value"])))
async def broadcast_notification_handler(self, event):
await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="new_notification", broadcast_value=event["value"])))
async def broadcast_connection_error_handler(self, event):
logger_channels.info(f"----------------------------------------- Alcance de broadcast_connection_error : {self.channel_name}")
try:
await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="connection_error")))
except:
logger_channels.info(f"----------------------------------------- Fallo al enviar 'broadcast_connection_error'")
await self.disconnect(10)
async def broadcast_updated_user_handler(self, event):
await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="updated_user", broadcast_value=event["value"])))
async def broadcast_message_handler(self, event):
await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="message_broadcast", broadcast_value=event["value"])))
async def broadcast_connection_inform_handler(self, event):
logger_channels.info("Broadcast connection inform exitoso !!")
await self.send(text_data=json.dumps(broadcast_dict(broadcast_type="connection_inform", broadcast_value=event["value"])))`
Any help would be greatly appreciated.
I have tried everything