When I start the celery worker using the option --pool=solo I receive the messages through WebSocket connection as expected:
But the disadvantage of --pool=solo is that I can run only one task, no parallel tasks execution is possible. If I remove the option --pool=solo I don’t receive the messages through WebSocket. Unfortunately I can’t tell what is wrong here since I wasn’t able to log the process of the celery task.
Based on that I receive the messages when using the option --pool=solo I assume the configuration should be correct, or maybe I’m missing some very obvious thing which causes the task to not work without --pool=solo.
Here is my project structure:
tree -L 2 ams -I '__pycache__|migrations' -L 4 core
ams
├── __init__.py
├── admin.py
├── apps.py
├── consumers.py
├── models.py
├── render.py
├── routing.py
├── serializers.py
├── signals.py
├── tasks.py
├── tests.py
├── urls.py
└── views.py
core
├── __init__.py
├── ams_admin.py
├── asgi.py
├── forms.py
├── settings
│ ├── __init__.py
│ ├── auth_backend.py
│ ├── base.py
│ ├── celery.py
│ ├── dev.py
│ ├── dev.template.py
│ ├── prod.py
│ └── routers.py
├── static
├── urls.py
└── wsgi.py
The celery.py content:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
envrole = os.getenv('ENVIRONMENT')
settings_module = f"core.settings.{envrole}"
os.environ.setdefault('DJANGO_SETTINGS_MODULE', settings_module)
app = Celery('ams')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
The content of settings.base.py
INSTALLED_APPS = [
'daphne',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django.contrib.gis',
...
]
...
WSGI_APPLICATION = 'core.wsgi.application'
ASGI_APPLICATION = 'core.asgi.application'
...
CELERY_BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
The content of settings.dev.py
from .base import *
...
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://localhost:6379/1",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"MAX_ENTRIES": 1000
}
}
}
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": ["redis://localhost:6379/1"],
},
},
}
The content of ams.task.py
...
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from celery import shared_task
...
logger = logging.getLogger(__name__)
...
@shared_task
def process_file_matching(file_id):
logger.info(f"Processing file {file_id}")
channel_layer = get_channel_layer()
try:
file = File.objects.get(id=file_id)
file_content = file.file # Binary data
...
df = read_file(file, file_content)
address_matches_to_create = []
for index, (row_index, row) in enumerate(df.iterrows(), start=1):
...
# Send the progress to the user over WebSocket
progress = (index / df.shape[0]) * 100
message = {
'type': 'file_matching_progress',
'progress': progress,
'file_id': file_id,
'message': f"Processed {index} of {df.shape[0]} rows"
}
async_to_sync(channel_layer.group_send)(f"user_{user.id}", message)
...
file.status = FileStatus.PROCESSED
file.save()
logger.info(f"Processing of file {file_id} finished in {duration:.2f} seconds")
except File.DoesNotExist:
logger.error(f"File with ID {file_id} does not exist.")
...
The content of ams.consumer.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json
class NotificationConsumer(AsyncWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user_id = None
async def connect(self):
user = self.scope["user"]
if "ws_error" in self.scope:
await self.send(text_data=self.scope["ws_error"])
await self.close(code=4000)
return
if user.is_authenticated:
self.user_id = user.id # Get user ID from the scope
await self.channel_layer.group_add(
f"user_{self.user_id}", # Create a group for this user
self.channel_name
)
await self.accept()
else:
await self.close(code=4001)
async def disconnect(self, close_code):
# This method is called when the WebSocket closes for any reason
pass
async def receive(self, text_data=None, bytes_data=None):
# Log or print that a message was received but not processed.
print(f"Ignored message from client: {text_data}")
async def file_matching_progress(self, message):
await self.send(text_data=json.dumps(message))
async def matching_process_completed(self, message):
await self.send(text_data=json.dumps(message))
In case I missed something which would be useful to figure out the cause, I will update the text.