Hello, I am new to django channels. I tried using it for the ride hailing app’s backend in order to have realtime connection between the rider and the driver.
I am using these version of the libs:
channels==4.0.0
channels-rabbitmq==4.0.1
channels-redis==4.1.0
redis==4.6.0
this is my channel layer settings
CHANNEL_LAYERS = {
"default": {
"BACKEND":"channels_redis.pubsub.RedisPubSubChannelLayer",
"CONFIG": {
"hosts": [env.str('REDIS_CONNECTION_URL')],
"capacity": 50000
},
},
}
this is my consumers.py code
class WebsocketConsumer(WebsocketConsumer):
def connect(self):
self.room_id = self.scope['url_route']['kwargs']['room_id']
if self.room_id in [None, '']:
self.accept()
event_data = {
"type": "connection_failed",
"message": "Socket connection failed due to invalid room_id",
"data": {}
}
self.response_handler(event_data)
self.close()
return
async_to_sync(self.channel_layer.group_add)(
self.room_id,
self.channel_name
)
if self.room_id not in [admin_websocket_room_id, admin_sos_websocket_room_id]:
try:
socket_room = WebsocketRoom.objects.get(room_id=self.room_id)
except WebsocketRoom.DoesNotExist:
# Handle the case where the room_id does not exist
self.accept()
event_data = {
"type": "connection_failed",
"message": "Socket connection failed due to invalid room_id",
"data": {}
}
self.response_handler(event_data)
self.close()
return
self.room_access_token = socket_room.access_token
self.accept()
event_data = {
"type": "connection_pass",
"message": "Socket connected successfully",
"data": {}
}
self.response_handler(event_data)
try:
#Check for saved latest message, if exists then send
redis_client = redis_connect()
redis_key = self.room_id
is_valid_redis_key = redis_key.split('_')[0] == 'ride'
print("redis key >> ", redis_key)
print("is valid redis key >> ", is_valid_redis_key)
print("redis key >> ", redis_key.split('_')[0])
if redis_key is not None and is_valid_redis_key:
redis_value = redis_client.get(redis_key)
print("Redis value >> ", redis_value)
if redis_value is not None:
json_redis_value = json.loads(redis_value)
self.response_handler(json_redis_value)
except Exception as err:
print("Redis data error >> ", str(err))
else:
self.room_access_token = admin_websocket_room_token if self.room_id == admin_websocket_room_id else admin_sos_websocket_room_token
self.accept()
event_data = {
"type": "connection_pass",
"message": "Socket connected successfully",
"data": {}
}
self.response_handler(event_data)
def disconnect(self, error_code):
self.channel_layer.group_discard(
self.room_id,
self.channel_name
)
def receive(self, text_data):
json_data = json.loads(text_data)
access_token = json_data["token"]
if self.room_access_token and access_token == self.room_access_token:
try:
#Save latest message data in redis
redis_key = self.room_id
is_valid_redis_key = redis_key.split('_')[0] == 'ride'
print("receive_data: redis key >> ", redis_key)
print("receive_data: is valid redis key >> ", is_valid_redis_key)
print("receive_data: redis key >> ", redis_key.split('_')[0])
if redis_key is not None and is_valid_redis_key:
redis_value = json.dumps({
"type": json_data.get('type', 'response_handler'),
"message": json_data.get('message', 'Data fetched successfully'),
"data": json_data.get('data', {})
})
redis_client = redis_connect()
redis_client.setex(redis_key, 3600*4, redis_value) #As of now it's expiry time is 4hr
disconnect(redis_client)
except Exception as err:
print("Redis data error >> ", str(err))
async_to_sync(self.channel_layer.group_send) (
self.room_id,
{
"type": "response_handler",
"data": json_data
}
)
else:
event_data = {
"type": "access_denied",
"message": "Provided access token is invalid",
"data": {}
}
self.response_handler(event_data)
def response_handler(self, event_data):
event = event_data.get('data', {}) if event_data.get('type', 'response_handler') == 'response_handler' else event_data
json_dump_message = json.dumps({
"type": event.get('type', 'response_handler'),
"message": event.get('message', 'Data fetched successfully'),
"data": event.get('data', {})
})
self.send(text_data=json_dump_message)
I deployed this using nginx, daphne and supervisor on a t3.xlarge ec2 instance which offers 4vcpu and 15gb memory. The redis server is running inside the same ec2 instance.
this is my nginx file
upstream channels-backend {
server localhost:8000;
}
server {
server_name dev.websocket.app www.dev.websocket.app;
location /ws/ {
proxy_pass http://channels-backend;
include proxy_params;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Host $server_name;
}
listen 443 ssl; # managed by Certbot
ssl_certificate /etc/letsencrypt/live/dev.websocket.app/fullchain.pem; # managed by Certbot
ssl_certificate_key /etc/letsencrypt/live/dev.websocket.app/privkey.pem; # managed by Certbot
include /etc/letsencrypt/options-ssl-nginx.conf; # managed by Certbot
ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; # managed by Certbot
}
server {
if ($host = dev.websocket.app) {
return 301 https://$host$request_uri;
} # managed by Certbot
listen 80;
server_name dev.websocket.app www.dev.websocket.app 13.121.16.266 #this urls are changed to fake ones
location /ws/ {
proxy_pass http://channels-backend;
include proxy_params;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Host $server_name;
}
#return 404; #managed by Certbot
}
this is supervisor config
[fcgi-program:daphne]
# TCP socket used by Nginx backend upstream
socket=tcp://localhost:8000
# Directory where your site's project files are located
directory=/home/ubuntu/my-backend/src/
# Each process needs to have a separate socket file, so we use process_num
command=/home/ubuntu/my-backend/venv/bin/daphne -u /run/daphne/daphne%(process_num)d.sock --fd 0 --access-log - --proxy-headers config.asgi:application
# Number of processes to startup, roughly the number of CPUs you have
numprocs=4
# Give each process a unique name so they can be told apart
process_name=asgi%(process_num)d
# Automatically start and recover processes
autostart=true
autorestart=true
# Choose where you want your log to go
stdout_logfile=/var/log/socket.log
redirect_stderr=true
still this server giving this errors:
- [Errno 111] ECONNREFUSED
- server rejected WebSocket connection: HTTP 502
- Sometimes it gives 1011 error
getting this error: [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007) when I hit the django channels socket server’s url using this code:
try:
# Create a WebSocket connection
with sync_connect(websocket_url, ssl_context=ssl.SSLContext(ssl.PROTOCOL_TLS)) as websocket:
if sleep is not None: time_sleep(sleep)
websocket.send(json.dumps(data))
websocket.close()
except (websockets.WebSocketException, ConnectionError) as e:
print(f"An error occurred: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
Also this server cpu touched 90% in all cores when I loadtest that connection using k6 with 500vus. Please let me know what I am doing wrong.