I Created AN Notification App By Using The Django Channels, Websockt and Successfully Sending Notification To The
Webscoketking.com
, Now I Want To Create The API of This App For Sending The Notifications Of the Websitemodels.py
from django.db import models
from django.contrib.auth.models import AbstractUser
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
import json
# Create your models here.
class CustomUser(AbstractUser):
"""Model class for users"""
USER_CHOICES = [
('expert', 'Expert'),
('business', 'Business'),
('admin', 'Admin'),
]
username = models.CharField(max_length=40, null=True, blank=True)
full_name = models.CharField(max_length=40)
email = models.EmailField(unique=True)
password = models.CharField(max_length=40, null=False, blank=False)
confirm_password = models.CharField(max_length=40, null=False, blank=False)
user_choices = models.CharField(max_length=30, choices=USER_CHOICES, default='expert')
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
def __str__(self) -> str:
return f"{self.email}"
class Notification(models.Model):
account_user = models.ForeignKey(CustomUser, on_delete=models.CASCADE)
message = models.TextField()
is_read = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
def save(self, *args, **kwargs):
print('saving notification')
channel_layer = get_channel_layer()
notification_objs = Notification.objects.filter(is_read=False).count()
data = {'count': notification_objs, 'current_notification': self.message}
async_to_sync(channel_layer.group_send)(
"test_group", {
"type": "send_notification",
"value": json.dumps(data)
}
)
super(Notification, self).save(*args, **kwargs)
class AccountApproval(models.Model):
account_user = models.ForeignKey(CustomUser, on_delete=models.CASCADE)
approved = models.BooleanField(default=False)
approval_message = models.TextField(blank=True, null=True)
COnsumers.py
from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync
import json
class TestConsumer(WebsocketConsumer):
def connect(self):
self.room_name = "test"
self.room_group_name = "test_group"
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name,
)
self.accept()
print("connected")
self.send(text_data=json.dumps({'status': 'connected Through Django Channels'}))
def receive(self, text_data):
print(text_data)
self.send(text_data=json.dumps({'status': 'We Got You'}))
def disconneted(self, *args, **kwargs):
print("disconnected")
def send_notification(self, event):
print('Send Notification: ')
data = json.loads(event.get('value'))
self.send(text_data=json.dumps({'status': data}))
print('Send Notification: ')
asgi.py
"""
ASGI config for hamza project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/howto/deployment/asgi/
"""
from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import path
from bilal.consumers import TestConsumer
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hamza.settings')
asgiapplication = get_asgi_application()
ws_pattern = [
path('ws/test/', TestConsumer.as_asgi())
]
application = ProtocolTypeRouter({
'http': get_asgi_application(),
'websocket': URLRouter(ws_pattern)
})