Websocket connection failed(WSREJECT) in production. (Nginx, Gunicorn, Daphne)

I’m having issues deploying my Django app using nginx, gunicorn and daphne in production. I’m hosting it an ubuntu server. I’ve been able to deploy the wsgi section of the app but the asgi sockets are not connecting.
Note: The entire app works fine on my local machine but the asgi app fails in productions

Here’s my asgi.py file:

my consumers.py:

import json
import asyncio
from django.conf import settings
from asgiref.sync import sync_to_async
from django.core.paginator import Paginator
from channels.db import database_sync_to_async
from django.utils import timezone
#from django.contrib.auth.models import AnonymousUser
from account.utils import LazyAccountEncoder
from account.models import Account
from .utils import *
from notification.utils import *
from notification.utils import *
from notification.constants import *
from .constants import *
from .exceptions import ClientError
from .models import PrivateChatRoom, RoomChatMessage, UnreadChatRoomMessages
#from public_chat.constants import *



# Private Chat Room consumer

class ChatConsumer(AsyncJsonWebsocketConsumer):

	async def connect(self):
		"""
		Called when the websocket is handshaking as part of initial connection.
		"""
		print("ChatConsumer: connect: " + str(self.scope["user"].username))

		# let everyone connect. But limit read/write to authenticated users
		await self.accept()

		# the room_id will define what it means to be "connected". If it is not None, then the user is connected.
		self.room_id = None

	async def receive_json(self, content):
		"""
		Called when we get a text frame. Channels will JSON-decode the payload
		for us and pass it as the first argument.
		"""
		# Messages will have a "command" key we can switch on
		print("ChatConsumer: receive_json")
		command = content.get("command", None)
		try:
			if command == "join":
				await self.join_room(content['room'])
			elif command == "leave":
				# Leave the room
				await self.leave_room(content['room'])
			elif command == "send":
				if len(content['message'].lstrip()) != 0:
					await self.send_room(content['room'], content['message'])
			elif command == "get_room_chat_messages":
				await self.display_progress_bar(True)
				room = await get_room_or_error2(content['room_id'], self.scope['user'])
				payload = await get_room_chat_messages2(room, content['page_number'])
				if payload != None:
					payload = json.loads(payload)
					await self.send_messages_payload(payload['messages'], payload['new_page_number'])
				else:
					raise Exception("Something went wrong retrieving the chatroom messages.")
				await self.display_progress_bar(False)
			elif command == "get_user_info":
				await self.display_progress_bar(True)
				room = await get_room_or_error2(content['room_id'], self.scope["user"])
				payload = await get_user_info(room, self.scope["user"])
				if payload != None:
					payload = json.loads(payload)
					wanted_keys = ['pk', 'id', 'username', 'email', 'profile_image']
					# Extract 'fields' and retain only wanted keys
					fields = payload['user_info']['fields']
					filtered_fields = {key: value for key, value in fields.items() if key in wanted_keys}
					# Update 'fields' with filtered data
					payload['user_info']['fields'] = filtered_fields
					await self.send_user_info_payload(payload)
				else:
					raise Exception("Something went wrong retrieving the other users account details.")
		except Exception as e:
			pass


	async def disconnect(self, code):
		"""
		Called when the WebSocket closes for any reason.
		"""
		# Leave the room
		print("ChatConsumer: disconnect")
		try:
			if self.room_id != None:
				await self.leave_room(self.room_id)
		except Exception as e:
			pass

	async def join_room(self, room_id):
		"""
		Called by receive_json when someone sent a join command.
		"""
		# The logged-in user is in our scope thanks to the authentication ASGI middleware (AuthMiddlewareStack)
		print("ChatConsumer: join_room: " + str(room_id))
		try:
			room = await get_room_or_error2(room_id, self.scope['user'])
		except Exception as e:
			return
        
		# Add user to "users" list for room
		await connect_user(room, self.scope["user"])

        #store that we're in the room
		self.room_id = room.id

		await on_user_connected(room, self.scope["user"])

        # Add them to the group, so that they get room messages
		await self.channel_layer.group_add(
            room.group_name,
            self.channel_name
        )

		await self.send_json({
            "join": str(room.id)
        })
		

		# Add them to the group so they get room messages

		# Instruct their client to finish opening the room

	async def leave_room(self, room_id):
		"""
		Called by receive_json when someone sent a leave command.
		"""
		# The logged-in user is in our scope thanks to the authentication ASGI middleware
		print("ChatConsumer: leave_room")
		
		room = await get_room_or_error2(room_id, self.scope['user'])

		#Remove user from room 'connect_users' list
		await disconnect_user(room, self.scope["user"])
		
		# Notify the group that someone left
		await self.channel_layer.group_send(
            room.group_name,
            {
                "type": "chat.leave", # chat_leave function
                "room_id": room_id,
                "profile_image": self.scope['user'],
                "username": self.scope['user'].username,
                "user_id": self.scope['user'].pk,
            }
        )

		# Remove that we're in the room
		self.room_id = None

		# Remove them from the group so they no longer get room messages
		await self.channel_layer.group_discard(
            room.group_name,
            self.channel_name
        )
        
		# Instruct their client to finish closing the room

	async def send_room(self, room_id, message):
		"""
		Called by receive_json when someone sends a message to a room.
		"""
		print("ChatConsumer: send_room")
		# Check they are in this room
		if self.room_id != None:
			if str(room_id) != str(self.room_id):
				print("CLIENT ERRROR 1")
				raise ClientError("ROOM_ACCESS_DENIED", "Room access denied")
		else:
			print("CLIENT ERRROR 2")
			raise ClientError("ROOM_ACCESS_DENIED", "Room access denied")
		room = await get_room_or_error2(room_id, self.scope['user'])

		# get list of connected_users
		connected_users = room.connected_users.all()

		# Execute these functions asychronously
		await asyncio.gather(*[
			append_unread_msg_if_not_connected(room, room.user1, connected_users, message), 
			append_unread_msg_if_not_connected(room, room.user2, connected_users, message),
			create_room_chat_message(room, self.scope["user"], message)
		])
		try:
			await self.channel_layer.group_send(
            room.group_name,
            {
                "type": "chat_message", # chat_message async function
                #"profile_image": str(self.scope['user'].profile_image.url),
                "username": self.scope['user'].username,
                "user_id": self.scope['user'].email,
                "message": message,
            }
        )
		except Exception as e:
			print(f"Here is the reason: {e}")




	# These helper methods are named by the types we send - so chat.join becomes chat_join
	async def chat_join(self, event):
		"""
		Called when someone has joined our chat.
		"""
		# Send a message down to the client
		print("ChatConsumer: chat_join: " + str(self.scope["user"]))
		if event["username"]:
			await self.send_json(
				{
					"msg_type": MSG_TYPE_ENTER2,
					"room": event["room_id"],
					"profile_image": event["profile_image"],
					"username": event["username"],
					"user_id": event["user_id"],
					"message": event["username"] + " connected.",
				},
			)

	async def chat_leave(self, event):
		"""
		Called when someone has left our chat.
		"""
		# Send a message down to the client
		pass


	async def chat_message(self, event):
		"""
		Called when someone has messaged our chat.
		"""
		# Send a message down to the client
		timestamp = calculate_timestamp(timezone.now())
		
		await self.send_json({
            "msg_type": MSG_TYPE_MESSAGE,
            "username": event['username'],
            "user_id": event['user_id'],
            #"profile_image": event['profile_image'],
            "message": event['message'],
            "natural_timestamp": timestamp,

        })
    
	async def send_messages_payload(self, messages, new_page_number):
		"""
 		Send a payload of messages to the client sockets"""        
		print("ChatConsumer: send_messages_payload. ")
		await self.send_json({
			"messages_payload": "messages_payload",
			"messages": messages,
            "new_page_number": new_page_number,
		})
		

	async def send_user_info_payload(self, user_info):
		"""
		Send a payload of user information to the ui
		"""
		# print("ChatConsumer: send_user_info_payload. ")
		await self.send_json(
			{
				"user_info": user_info,
			},
		)

	async def display_progress_bar(self, is_displayed):
		"""
		1. is_displayed = True
		- Display the progress bar on UI
		2. is_displayed = False
		- Hide the progress bar on UI
		"""
		await self.send_json(
			{
				"display_progress_bar": is_displayed
			}
		)


	async def handle_client_error(self, e):
		"""
		Called when a ClientError is raised.
		Sends error data to UI.
		"""
		errorData = {}
		errorData['error'] = e.code
		if e.message:
			errorData['message'] = e.message
			await self.send_json(errorData)
		return

@database_sync_to_async
def get_room_or_error2(room_id, user):
    """
     Tries to fetch a room for the user, checking permissions along the way
    """
    try:
        room = PrivateChatRoom.objects.get(pk=room_id)
    except PrivateChatRoom.DoesNotExist:
        print("Get room or error exception!")
        raise Exception("Invalid Room.")
    # Is this user allowed into this room?
    if user != room.user1 and user != room.user2:
        print("Not your chat bro!")
        raise Exception("You do not have permission to join this room.")
    return room



@sync_to_async
def get_user_info(room, user):
    """
    Retrieve the user info for the user you're chatting with.
    """
    try:
        # Determine who is who
        other_user = room.user1 if room.user1 != user else room.user2
        s = LazyAccountEncoder()
        final = s.serialize([other_user])[0]
        payload = {'user_info': final}
        # Using dumps() instead of dump() to return JSON as a string
        return json.dumps(payload, indent=4, sort_keys=True, default=str)

    except Exception as e:
        print("EXCEPTION: " + str(e))
        return None

    
@database_sync_to_async
def create_room_chat_message(room, user, message):
	return RoomChatMessage.objects.create(user=user, room=room, content=message)

@database_sync_to_async
def get_room_chat_messages2(room, page_number):
    try:
        qs = RoomChatMessage.objects.by_room(room)
        p = Paginator(qs, DEFAULT_ROOM_CHAT_MESSAGE_PAGE_SIZE1)

        payload = {}
        new_page_number = int(page_number)
        if new_page_number <= p.num_pages:
            new_page_number = new_page_number + 1
            s = LaxyRoomChatMessageEncoder2()
            payload['messages'] = s.serialize(p.page(page_number).object_list)
        else:
            payload['messages'] = None
        payload['new_page_number'] = new_page_number
        return json.dumps(payload)
    except Exception as e:
        print("EXCEPTION last: " + str(e))
    return None

@database_sync_to_async
def connect_user(room, user):
	# add user to connected_users list
	account = Account.objects.get(pk=user.pk)
	return room.connect_user(account)


@database_sync_to_async
def disconnect_user(room, user):
	# remove from connected_users list
	account = Account.objects.get(pk=user.pk)
	return room.disconnect_user(account)


# If the user is not connected to the chat, increment "unread messages" count
@database_sync_to_async
def append_unread_msg_if_not_connected(room, user, connected_users, message):
	if not user in connected_users: 
		try:
			unread_msgs = UnreadChatRoomMessages.objects.get(room=room, user=user)
			unread_msgs.most_recent_message = message
			unread_msgs.count += 1
			unread_msgs.save()
		except UnreadChatRoomMessages.DoesNotExist:
			UnreadChatRoomMessages(room=room, user=user, count=1).save()
			pass
	return

# When a user connects, reset their unread message count
@database_sync_to_async
def on_user_connected(room, user):
	# confirm they are in the connected users list
	connected_users = room.connected_users.all()
	if user in connected_users:
		try:
			# reset count
			unread_msgs = UnreadChatRoomMessages.objects.get(room=room, user=user)
			unread_msgs.count = 0
			unread_msgs.save()
		except UnreadChatRoomMessages.DoesNotExist:
			UnreadChatRoomMessages(room=room, user=user).save()
			pass
	return

Here’s my ‘daphne.service’ file:

[Unit]
Description=WebSocket Daphne Service
After=network.target

[Service]
Type=simple
User=root
WorkingDirectory=/home/taskitly/taskily/src
ExecStart=/home/taskitly/taskily/venv/bin/python /home/taskitly/taskily/venv/bin/daphne -b 0.0.0.0 -p 8001 taskitly.asgi:application
Restart=on-failure

[Install]
WantedBy=multi-user.target


Here’s my gunicorn.service file:

[Unit]
Description=gunicorn daemon
Requires=gunicorn.socket
After=network.target

[Service]
User=taskitly
Group=www-data
WorkingDirectory=/home/taskitly/taskily/src
ExecStart=/home/taskitly/taskily/venv/bin/gunicorn \
        --access-logfile - \
        --workers 3 \
        --bind unix:/run/gunicorn.sock \
        taskitly.wsgi:application

[Install]
WantedBy=multi-user.target


Here’s my nginx conf file:

server {

    server_name 127.0.0.1 mydomain;

    location = /favicon.ico { access_log off; log_not_found off; }
    location /static {
                autoindex on;
                alias /var/www/taskitly/static/;
                        }
    location / {
        include proxy_params;
        proxy_pass http://unix:/run/gunicorn.sock;
    }

    location /ws/ {
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_redirect off;
        proxy_pass http://127.0.0.1:8001;
    }
        listen 443 ssl; # managed by Certbot
    ssl_certificate /etc/letsencrypt/live/mydomain/fullchain.pem; # man>
    ssl_certificate_key /etc/letsencrypt/live/mydomain/privkey.pem; # m>
    include /etc/letsencrypt/options-ssl-nginx.conf; # managed by Certbot
    ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; # managed by Certbot



        }

I tried testing the socket connects from postman:
request : ‘ws://mydomain:8001/chat/’
response : Could not connect to ws://taskitly.com:8001/chat/

It connects after I tried removing ‘AllowedHostsOriginValidator’ from my asgi file:

import os
from channels.auth import AuthMiddlewareStack
from channels.routing import URLRouter, ProtocolTypeRouter
from channels.security.websocket import AllowedHostsOriginValidator  # new
from django.core.asgi import get_asgi_application
from django.urls import path

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskitly.settings')
django_asgi_app = get_asgi_application()

from notification.consumers import NotificationConsumer
from .tokenauth_middleware import TokenAuthMiddleware  # new
from chat.consumers import ChatConsumer

application = ProtocolTypeRouter({
    "http": django_asgi_app,
    "websocket": (  
        TokenAuthMiddleware(
            URLRouter([
                    path("chat/", ChatConsumer.as_asgi()),
                    path("notification/", NotificationConsumer.as_asgi()),
            ])
        )
    )
})

The socket connects after I removed the ‘AllowedHostsOriginValidator’.

So Please, how can I retain ‘AllowedHostsOriginValidator’ and get the sockets to connect?

Actually, what we’ll need to see here are your nginx configuration for these urls, and the commands being used to run those gunicorn and daphne processes, and the JavaScript that is opening the websocket.

Also, is there a particular reason why you’re running an asgi app under gunicorn? (As opposed to, say, unicorn.)

1 Like

Okay, I’ve edited the post.
I meant running asgi under daphne.
I haven’t tested with Javascript yet just on postman.

First, nginx is serving as your proxy. It uses the url path to determine which backend service receives the request.

You don’t connect directly to daphne. This means that you shouldn’t specify the port with your connection. You’re still wanting to send your request to nginx. (Doing it this way for testing is fine, but make sure your JavaScript is connecting to the proper port.)

Also, what is your ALLOWED_HOSTS setting in your settings file?

1 Like

Yes, I added the port during testing with postman.

ALLOWED_HOSTS = [“mydomainIP”, “mydomain”]

Then I think you’ll need to check the postman docs to see if there’s a way to have it properly set the Host header. I don’t see that header in the snippet you posted above.

1 Like

Here’s the header:

Ok, but are you seeing it being sent in the request?

If so, what is the value you’re seeing?

1 Like

You mean server log or postman response?

From whatever you can use to see the actual request being sent.

If you’re logging them on the server side, fine. If postman will show it to you, great. If you need to use wireshark or tcpdump, that’s ok too.

Your server is verifying the Host header (or X-Forwarded-Host if you’ve got USE_X_FORWARDED_HOST set.), so you need to see what you’re sending in the request.

1 Like

From postman, I can’t find “X-Forwarded-Host” in the resquest sent.

Are you saying then that you have the USE_X_FORWARDED_HOST setting in your settings.py file?

1 Like

I don’t have it in settings.py

Then it’s not going to look for X-Forwarded-Host.

1 Like

Should I then remove it from nginx conf?

Two separate thoughts here:

  • Remove what from nginx.conf?

  • When you’re connecting directly to Daphne as you’re doing here, then whatever you have in nginx is irrelevent for the purposes of this test.

1 Like

Okay, I thought X-Forwarded-Host was in nginx conf but it’s not.

Since it’s irrelevant on the nginx conf, shouldn’t the connection succeed without removing ‘AllowedHostsOriginValidator’?

Your request needs to include the appropriate Host header - that’s what Daphne is going to check.

So you need to show what the Host header is that is being sent, along with the actual content of your ALLOWED_HOSTS setting from your settings.py.

Nginx plays no part in this test.

1 Like

Okay.
From postman :
Host: “domain:8001”

From settings file
ALLOWED_HOST = [“domainIP”, “domain”]

If I understand, this should resolve it?