I’m trying to execute a @database_sync_to_async function using asyncio.gather for two user operations but it fails for one particular user and succeeds for the other.
The command async function that’s affected is the ‘send’ command → send_room.
The await send_room returns this message-- ‘Exception occurred: You cannot call this from an async context - use a thread or sync_to_async.’
Here’s my consumer file:
class ChatConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
"""
Called when the websocket is handshaking as part of initial connection.
"""
print("ChatConsumer: connect: " + str(self.scope["user"].username))
# let everyone connect. But limit read/write to authenticated users
await self.accept()
# the room_id will define what it means to be "connected". If it is not None, then the user is connected.
self.room_id = None
async def receive_json(self, content):
"""
Called when we get a text frame. Channels will JSON-decode the payload
for us and pass it as the first argument.
"""
# Messages will have a "command" key we can switch on
print("ChatConsumer: receive_json")
command = content.get("command", None)
try:
if command == "join":
await self.join_room(content['room'])
elif command == "leave":
# Leave the room
await self.leave_room(content['room'])
elif command == "send":
if len(content['message'].lstrip()) != 0:
await self.send_room(content['room'], content['message'])
elif command == "get_room_chat_messages":
await self.display_progress_bar(True)
room = await get_room_or_error2(content['room_id'], self.scope['user'])
payload = await get_room_chat_messages2(room, content['page_number'])
if payload != None:
payload = json.loads(payload)
await self.send_messages_payload(payload['messages'], payload['new_page_number'])
else:
raise Exception("Something went wrong retrieving the chatroom messages.")
await self.display_progress_bar(False)
elif command == "get_user_info":
await self.display_progress_bar(True)
room = await get_room_or_error2(content['room_id'], self.scope["user"])
payload = await get_user_info(room, self.scope["user"])
if payload != None:
payload = json.loads(payload)
wanted_keys = ['pk', 'id', 'username', 'email', 'profile_image']
# Extract 'fields' and retain only wanted keys
fields = payload['user_info']['fields']
filtered_fields = {key: value for key, value in fields.items() if key in wanted_keys}
# Update 'fields' with filtered data
payload['user_info']['fields'] = filtered_fields
await self.send_user_info_payload(payload)
else:
raise Exception("Something went wrong retrieving the other users account details.")
except Exception as e:
pass
async def disconnect(self, code):
"""
Called when the WebSocket closes for any reason.
"""
# Leave the room
print("ChatConsumer: disconnect")
try:
if self.room_id != None:
await self.leave_room(self.room_id)
except Exception as e:
pass
async def join_room(self, room_id):
"""
Called by receive_json when someone sent a join command.
"""
# The logged-in user is in our scope thanks to the authentication ASGI middleware (AuthMiddlewareStack)
print("ChatConsumer: join_room: " + str(room_id))
try:
room = await get_room_or_error2(room_id, self.scope['user'])
except Exception as e:
return
# Add user to "users" list for room
await connect_user(room, self.scope["user"])
#store that we're in the room
self.room_id = room.id
await on_user_connected(room, self.scope["user"])
# Add them to the group, so that they get room messages
await self.channel_layer.group_add(
room.group_name,
self.channel_name
)
await self.send_json({
"join": str(room.id)
})
# Add them to the group so they get room messages
# Instruct their client to finish opening the room
async def leave_room(self, room_id):
"""
Called by receive_json when someone sent a leave command.
"""
# The logged-in user is in our scope thanks to the authentication ASGI middleware
print("ChatConsumer: leave_room")
room = await get_room_or_error2(room_id, self.scope['user'])
#Remove user from room 'connect_users' list
await disconnect_user(room, self.scope["user"])
# Notify the group that someone left
await self.channel_layer.group_send(
room.group_name,
{
"type": "chat.leave", # chat_leave function
"room_id": room_id,
"profile_image": self.scope['user'],
"username": self.scope['user'].username,
"user_id": self.scope['user'].pk,
}
)
# Remove that we're in the room
self.room_id = None
# Remove them from the group so they no longer get room messages
await self.channel_layer.group_discard(
room.group_name,
self.channel_name
)
# Instruct their client to finish closing the room
async def send_room(self, room_id, message):
"""
Called by receive_json when someone sends a message to a room.
"""
print("ChatConsumer: send_room")
# Check they are in this room
if self.room_id != None:
if str(room_id) != str(self.room_id):
print("CLIENT ERRROR 1")
raise ClientError("ROOM_ACCESS_DENIED", "Room access denied")
else:
print("CLIENT ERRROR 2")
raise ClientError("ROOM_ACCESS_DENIED", "Room access denied")
room = await get_room_or_error2(room_id, self.scope['user'])
# get list of connected_users
connected_users = room.connected_users.all()
# Create the message in the database
await create_room_chat_message(room, self.scope["user"], message)
# Sending a message to the room group
await self.channel_layer.group_send(
room.group_name,
{
"type": "chat_message",
"username": self.scope['user'].username,
"user_id": self.scope['user'].email,
"message": message,
}
)
# Functions to execute asynchronously
async def handle_async_tasks():
await asyncio.gather(
append_unread_msg_if_not_connected(room, room.user1, connected_users, message),
append_unread_msg_if_not_connected(room, room.user2, connected_users, message)
)
try:
await handle_async_tasks()
except Exception as e:
print(f"Exception occurred: {e}")
# These helper methods are named by the types we send - so chat.join becomes chat_join
async def chat_join(self, event):
"""
Called when someone has joined our chat.
"""
# Send a message down to the client
print("ChatConsumer: chat_join: " + str(self.scope["user"]))
if event["username"]:
await self.send_json(
{
"msg_type": MSG_TYPE_ENTER2,
"room": event["room_id"],
"profile_image": event["profile_image"],
"username": event["username"],
"user_id": event["user_id"],
"message": event["username"] + " connected.",
},
)
async def chat_leave(self, event):
"""
Called when someone has left our chat.
"""
# Send a message down to the client
pass
async def chat_message(self, event):
"""
Called when someone has messaged our chat.
"""
# Send a message down to the client
timestamp = calculate_timestamp(timezone.now())
await self.send_json({
"msg_type": MSG_TYPE_MESSAGE,
"username": event['username'],
"user_id": event['user_id'],
#"profile_image": event['profile_image'],
"message": event['message'],
"natural_timestamp": timestamp,
})
async def send_messages_payload(self, messages, new_page_number):
"""
Send a payload of messages to the client sockets"""
print("ChatConsumer: send_messages_payload. ")
await self.send_json({
"messages_payload": "messages_payload",
"messages": messages,
"new_page_number": new_page_number,
})
async def send_user_info_payload(self, user_info):
"""
Send a payload of user information to the ui
"""
# print("ChatConsumer: send_user_info_payload. ")
await self.send_json(
{
"user_info": user_info,
},
)
async def display_progress_bar(self, is_displayed):
"""
1. is_displayed = True
- Display the progress bar on UI
2. is_displayed = False
- Hide the progress bar on UI
"""
await self.send_json(
{
"display_progress_bar": is_displayed
}
)
async def handle_client_error(self, e):
"""
Called when a ClientError is raised.
Sends error data to UI.
"""
errorData = {}
errorData['error'] = e.code
if e.message:
errorData['message'] = e.message
await self.send_json(errorData)
return
@database_sync_to_async
def get_room_or_error2(room_id, user):
"""
Tries to fetch a room for the user, checking permissions along the way
"""
try:
room = PrivateChatRoom.objects.get(pk=room_id)
except PrivateChatRoom.DoesNotExist:
print("Get room or error exception!")
raise Exception("Invalid Room.")
# Is this user allowed into this room?
if user != room.user1 and user != room.user2:
print("Not your chat bro!")
raise Exception("You do not have permission to join this room.")
return room
@sync_to_async
def get_user_info(room, user):
"""
Retrieve the user info for the user you're chatting with.
"""
try:
# Determine who is who
other_user = room.user1 if room.user1 != user else room.user2
s = LazyAccountEncoder()
final = s.serialize([other_user])[0]
payload = {'user_info': final}
# Using dumps() instead of dump() to return JSON as a string
return json.dumps(payload, indent=4, sort_keys=True, default=str)
except Exception as e:
print("EXCEPTION: " + str(e))
return None
@database_sync_to_async
def create_room_chat_message(room, user, message):
return RoomChatMessage.objects.create(user=user, room=room, content=message)
@database_sync_to_async
def get_room_chat_messages2(room, page_number):
try:
qs = RoomChatMessage.objects.by_room(room)
p = Paginator(qs, DEFAULT_ROOM_CHAT_MESSAGE_PAGE_SIZE1)
payload = {}
new_page_number = int(page_number)
if new_page_number <= p.num_pages:
new_page_number = new_page_number + 1
s = LaxyRoomChatMessageEncoder2()
payload['messages'] = s.serialize(p.page(page_number).object_list)
else:
payload['messages'] = None
payload['new_page_number'] = new_page_number
return json.dumps(payload)
except Exception as e:
print("EXCEPTION last: " + str(e))
return None
@database_sync_to_async
def connect_user(room, user):
# add user to connected_users list
account = Account.objects.get(pk=user.pk)
return room.connect_user(account)
@database_sync_to_async
def disconnect_user(room, user):
# remove from connected_users list
account = Account.objects.get(pk=user.pk)
return room.disconnect_user(account)
# If the user is not connected to the chat, increment "unread messages" count
@database_sync_to_async
def append_unread_msg_if_not_connected(room, user, connected_users, message):
if not user in connected_users:
try:
print("We tried")
unread_msgs = UnreadChatRoomMessages.objects.get(room=room, user=user)
print("We couldn't get")
unread_msgs.most_recent_message = message
unread_msgs.count += 1
unread_msgs.save()
except UnreadChatRoomMessages.DoesNotExist:
print("Doesnot Exist")
UnreadChatRoomMessages(room=room, user=user, count=1).save()
print("we pass am")
pass
return
# When a user connects, reset their unread message count
@database_sync_to_async
def on_user_connected(room, user):
# confirm they are in the connected users list
connected_users = room.connected_users.all()
if user in connected_users:
try:
# reset count
unread_msgs = UnreadChatRoomMessages.objects.get(room=room, user=user)
unread_msgs.count = 0
unread_msgs.save()
except UnreadChatRoomMessages.DoesNotExist:
UnreadChatRoomMessages(room=room, user=user).save()
pass
return
The send command executes normally if I remove the ‘append_unread_msg_if_not_connected(room, room.user2, connected_users, message)’ async function as it only fails to execute for user2.
here’s the async function ‘append_unread_msg_if_not_connected(room, room.user2, connected_users, message)’ :
@database_sync_to_async
def append_unread_msg_if_not_connected(room, user, connected_users, message):
if not user in connected_users:
try:
print("We tried")
unread_msgs = UnreadChatRoomMessages.objects.get(room=room, user=user)
print("We couldn't get")
unread_msgs.most_recent_message = message
unread_msgs.count += 1
unread_msgs.save()
except UnreadChatRoomMessages.DoesNotExist:
print("Doesnot Exist")
UnreadChatRoomMessages(room=room, user=user, count=1).save()
print("we pass am")
pass
return
I also tried reviewing the ‘UnreacChatRoomMessage’ model but found nothing wrong with it.
Here:
class PrivateChatRoom(models.Model):
"""
Private chat between two users (friends)"""
user1 = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="user1")
user2 = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="user2")
is_active = models.BooleanField(default=True)
connected_users = models.ManyToManyField(settings.AUTH_USER_MODEL, blank=True, related_name="connected_users")
def __str__(self):
return "A chat between {} and {}".format(self.user1, self.user2)
def connect_user(self, user):
"""
return true if user is added to the connected_users list
"""
is_user_added = False
if not user in self.connected_users.all():
self.connected_users.add(user)
is_user_added = True
return is_user_added
def disconnect_user(self, user):
"""
return true if user is removed from connected_users list
"""
is_user_removed = False
if user in self.connected_users.all():
self.connected_users.remove(user)
is_user_removed = True
return is_user_removed
@property
def group_name(self):
""" Return the channels group name that sockets should subscribe to receive private chat messages (pair-wise) """
return f"PrivateChatRoom-{self.id}"
class RoomChatMessageManager(models.Manager):
def by_room(self, room):
qs = RoomChatMessage.objects.filter(room=room).order_by("-timestamp")
return qs
class RoomChatMessage(models.Model):
""" Chat message created by a user within a room (pair-wise) """
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
room = models.ForeignKey(PrivateChatRoom, on_delete=models.CASCADE)
timestamp = models.DateTimeField(auto_now_add=True)
content = models.TextField(unique=False, blank=False)
objects = RoomChatMessageManager()
def __st__(self):
return self.content
class UnreadChatRoomMessages(models.Model):
"""
Keep track of the number of unread messages by a specific user in a specific private chat.
When the user connects the chat room, the messages will be considered "read" and 'count' will be set to 0.
"""
room = models.ForeignKey(PrivateChatRoom, on_delete=models.CASCADE, related_name="room")
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
count = models.IntegerField(default=0)
most_recent_message = models.CharField(max_length=500, blank=True, null=True)
# last time msgs were read by the user
reset_timestamp = models.DateTimeField()
notifications = GenericRelation(Notification)
def __str__(self):
return f"Messages that {str(self.user.username)} has not read yet."
def save(self, *args, **kwargs):
if not self.id: # if just created, add a timestamp. Otherwise do not automatically change it ever.
self.reset_timestamp = timezone.now()
return super(UnreadChatRoomMessages, self).save(*args, **kwargs)
@property
def get_cname(self):
"""
For determining what kind of object is associated with a Notification
"""
return "UnreadChatRoomMessages"
@property
def get_other_user(self):
"""
Get the other user in the chat room
"""
if self.user == self.room.user1:
return self.room.user2
else:
return self.room.user1
@receiver(post_save, sender=PrivateChatRoom)
def create_unread_chatroom_messages_obj(sender, instance, created, **kwargs):
if created:
unread_msgs1 = UnreadChatRoomMessages(room=instance, user=instance.user1)
unread_msgs1.save()
unread_msgs2 = UnreadChatRoomMessages(room=instance, user=instance.user2)
unread_msgs2.save()
@receiver(pre_save, sender=UnreadChatRoomMessages)
def increment_unread_msg_count(sender, instance, **kwargs):
"""
When the unread message count increases, update the notification.
If one does not exist, create one. (This should never happen)
"""
if instance.id is None: # new object will be created
pass # create_unread_chatroom_messages_obj will handle this scenario
else:
previous = UnreadChatRoomMessages.objects.get(id=instance.id)
if previous.count < instance.count: # if count is incremented
content_type = ContentType.objects.get_for_model(instance)
if instance.user == instance.room.user1:
other_user = instance.room.user2
else:
other_user = instance.room.user1
try:
notification = Notification.objects.get(target=instance.user, content_type=content_type, object_id=instance.id)
notification.verb = instance.most_recent_message
notification.timestamp = timezone.now()
notification.save()
except Notification.DoesNotExist:
instance.notifications.create(
target=instance.user,
from_user=other_user,
redirect_url=f"Room_id={instance.room.id}", # we want to go to the chatroom
verb=instance.most_recent_message,
content_type=content_type,
)
@receiver(pre_save, sender=UnreadChatRoomMessages)
def remove_unread_msg_count_notification(sender, instance, **kwargs):
"""
If the unread messge count decreases, it means the user joined the chat. So delete the notification.
"""
if instance.id is None: # new object will be created
pass # create_unread_chatroom_messages_obj will handle this scenario
else:
previous = UnreadChatRoomMessages.objects.get(id=instance.id)
if previous.count > instance.count: # if count is decremented
content_type = ContentType.objects.get_for_model(instance)
try:
notification = Notification.objects.get(target=instance.user, content_type=content_type, object_id=instance.id)
notification.delete()
except Notification.DoesNotExist:
pass