Celery task does not send messages through Websocket when running Celery without --pool=solo

When I start the celery worker using the option --pool=solo I receive the messages through WebSocket connection as expected:

But the disadvantage of --pool=solo is that I can run only one task, no parallel tasks execution is possible. If I remove the option --pool=solo I don’t receive the messages through WebSocket. Unfortunately I can’t tell what is wrong here since I wasn’t able to log the process of the celery task.

Based on that I receive the messages when using the option --pool=solo I assume the configuration should be correct, or maybe I’m missing some very obvious thing which causes the task to not work without --pool=solo.

Here is my project structure:

tree -L 2 ams -I '__pycache__|migrations' -L 4 core
ams
├── __init__.py
├── admin.py
├── apps.py
├── consumers.py
├── models.py
├── render.py
├── routing.py
├── serializers.py
├── signals.py
├── tasks.py
├── tests.py
├── urls.py
└── views.py
core
├── __init__.py
├── ams_admin.py
├── asgi.py
├── forms.py
├── settings
│   ├── __init__.py
│   ├── auth_backend.py
│   ├── base.py
│   ├── celery.py
│   ├── dev.py
│   ├── dev.template.py
│   ├── prod.py
│   └── routers.py
├── static
├── urls.py
└── wsgi.py

The celery.py content:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

envrole = os.getenv('ENVIRONMENT')
settings_module = f"core.settings.{envrole}"
os.environ.setdefault('DJANGO_SETTINGS_MODULE', settings_module)

app = Celery('ams')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

The content of settings.base.py

INSTALLED_APPS = [
    'daphne',
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django.contrib.gis',

	...
]

...

WSGI_APPLICATION = 'core.wsgi.application'
ASGI_APPLICATION = 'core.asgi.application'

...

CELERY_BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True

The content of settings.dev.py

from .base import *

...


CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://localhost:6379/1",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "MAX_ENTRIES": 1000
        }
    }
}


CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": ["redis://localhost:6379/1"],
        },
    },
}

The content of ams.task.py


...
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from celery import shared_task

...

logger = logging.getLogger(__name__)

...

@shared_task
def process_file_matching(file_id):
    logger.info(f"Processing file {file_id}")
    channel_layer = get_channel_layer()
    try:
        file = File.objects.get(id=file_id)
        file_content = file.file  # Binary data

        ...

        df = read_file(file, file_content)

        address_matches_to_create = []
        for index, (row_index, row) in enumerate(df.iterrows(), start=1):
            
			...

            # Send the progress to the user over WebSocket
            progress = (index / df.shape[0]) * 100
            message = {
                'type': 'file_matching_progress',
                'progress': progress,
                'file_id': file_id,
                'message': f"Processed {index} of {df.shape[0]} rows"
            }
            async_to_sync(channel_layer.group_send)(f"user_{user.id}", message)

		...

        file.status = FileStatus.PROCESSED
        file.save()

        logger.info(f"Processing of file {file_id} finished in {duration:.2f} seconds")

    except File.DoesNotExist:
        logger.error(f"File with ID {file_id} does not exist.")
	...

The content of ams.consumer.py

from channels.generic.websocket import AsyncWebsocketConsumer
import json


class NotificationConsumer(AsyncWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.user_id = None

    async def connect(self):
        user = self.scope["user"]
        if "ws_error" in self.scope:
            await self.send(text_data=self.scope["ws_error"])
            await self.close(code=4000)
            return

        if user.is_authenticated:
            self.user_id = user.id  # Get user ID from the scope
            await self.channel_layer.group_add(
                f"user_{self.user_id}",  # Create a group for this user
                self.channel_name
            )
            await self.accept()
        else:
            await self.close(code=4001)

    async def disconnect(self, close_code):
        # This method is called when the WebSocket closes for any reason
        pass

    async def receive(self, text_data=None, bytes_data=None):
        # Log or print that a message was received but not processed.
        print(f"Ignored message from client: {text_data}")

    async def file_matching_progress(self, message):
        await self.send(text_data=json.dumps(message))

    async def matching_process_completed(self, message):
        await self.send(text_data=json.dumps(message))

In case I missed something which would be useful to figure out the cause, I will update the text.

In my Redis database I also see some content:

Here is the task message which was successfully completed when I was running the celery with --pool=solo option:

Whatever the issue is, it’s not directly (or solely) related to running Celery without -P solo. I created a test project that demonstrates that a celery worker with -P process will send messages.

I think if you want to debug this, you’re going to need to do things like add some print statements at key points to see what is being done when, along with possibly using something like wireshark or tcpdump to see the messages being passed through redis - and see what else you might do to gather more information, like creating a worker process that connects to that group to get messages.

You might also want to simplify it to see if you can narrow down what is causing the problem.

Note: Also be aware that channels have a capacity limit. If you’re sending too many messages to a group, they will be dropped - see Capacity

The easiest way would be of course to use the print statements, but they are not printed out in celery logs. I tried to use both plain python print and the logger, both does not print out anything in celery logs.

Python print statements in the celery task show up on the console where you’re running the celery worker. Where logger statements get written depend upon your logging configuration and what logger you’re using in the task.

The solution was not obvious. I found it in one of the responses in StackOverflow. I had to change the pool ot eventlet packege and now it works. I lost 3 days by trying different things and even tried to switch to RabbitMQ broker.

Install the eventlet:

pip install eventlet

And change the run command to:

celery -A myapp.celeryapp worker --loglevel=info --pool=eventlet

Why this works?

@alias51, @Hussain: It worked also in my case. After some research I found this article. Long story short: default concurrency pool prefork doesn’t work on Windows.

Wow. It’s been ages since I’ve even thought about trying to deploy anything Django-related on Windows. (That also would have been helpful to know up front.)

Last I checked (and it’s still there), the docs at Install Redis on Windows | Redis say this:

Redis is not officially supported on Windows. However, you can install Redis on Windows for development by following the instructions below.

Anyway, glad you found an explanation and a solution for your issue.

Thank you for your time anyway. The problem is that there was nothing obvious to figure out that the problem was the Windows operating system. The process started as normal and there were no warnings or information in the console, even the task receiving messages were there. It would be helpful if celery had mentioned this in the “Getting Started” docs, this is the place where people start integrating it into their projects First steps with Django — Celery 5.3.6 documentation

Agreed. Or, if they were to add some code to detect the operating system being used, and adjust the worker startup accordingly.

I went looking at the project on github to see if there was a ticket or some other information about this, and I found this on the front page:

Celery is a project with minimal funding, so we don’t support Microsoft Windows but it should be working. Please don’t open any issues related to that platform.

(In which case I think it would be nice if they were to state this in the docs as well.)

1 Like