Celery duplicating tasks

Hi,

I have an issue with celery and celery beats that the tasks are being duplicated.

I’ve been running Celery for about 2 months without any problems, then all of a sudden im getting duplicated entries in the database for the celery tasks.

I’m using redis as my backend/broker.

I’m finding it hard to debug the issue, i’ve checked how many workers are running on the backend and its showing only 1.

import os

from celery import Celery
from celery.schedules import crontab

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app_backend.settings')

app = Celery('app_backend',
             broker='redis://:#######ows.net:6379/__redisinstance__',
             include=['backend.tasks'])

app.conf.timezone = 'UTC'

app.conf.beat_schedule = {
    'CalculateMetrics-everyday-at-3-am': {
        'task': 'backend.tasks.CalculateMetrics',
        'schedule': crontab(hour=3, minute=0, day_of_week='*')
    },
    'CalculateMetrics-everyday-at-4-am': {
        'task': 'backend.tasks.CalculateWeeklyMetrics',
        'schedule': crontab(hour=4, minute=0, day_of_week='*')

    },
    'CalculateDailyProjectMetrics-everyday-at-2-am': {
        'task': 'backend.tasks.CalculateDailyProjectMetrics',
        'schedule': crontab(hour=1, minute=0, day_of_week='*')
    },
    'CalculateWeeklyProjectMetrics-everyday-at-2-am': {
        'task': 'backend.tasks.CalculateWeeklyProjectMetrics',
        'schedule': crontab(hour=2, minute=0, day_of_week='*')
    }
}


app.conf.broker_transport_options = {'visibility_timeout': 3600}
#app.conf.result_backend =  'redis://:#############.net:6379'
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Nothing has changed on these settings so I’m unsure why this has just started.

I’ve also written the tasks which are using projectkeymetrics, created = DailyProjectKeyMetric.objects.get_or_create(project=project, capture_date=yesterday)

But im getting two records created which i didn’t think was possible with get_or_create

Check your database to see if you’ve got duplicate entries in it for these tasks. (I think the table is django_celery_beat_periodictask.)

If you do have duplicate entries, then something is writing them.

The database is empty, Ken. Does it write to the db by default?

We use django-celery-beat · PyPI to manage periodic tasks. See Periodic Tasks — Celery 5.3.6 documentation

When you’re using that, then the periodic tasks are stored in the database. That’s what I was thinking of. (I’m not familiar with how Celery manages scheduled tasks aside from that, sorry.)

And without seeing the complete tasks, it’s going to be tough to identify why you’re getting duplicated data in the database.

Ok, maybe i’m not doing this correctly.

If i use celery -A proj beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler does that mean i don’t need to do this?

...

    'CalculateDailyProjectMetrics-everyday-at-2-am': {
        'task': 'backend.tasks.CalculateDailyProjectMetrics',
        'schedule': crontab(hour=1, minute=0, day_of_week='*')
    },
    'CalculateWeeklyProjectMetrics-everyday-at-2-am': {
        'task': 'backend.tasks.CalculateWeeklyProjectMetrics',
        'schedule': crontab(hour=2, minute=0, day_of_week='*')
    }
...

And i just manage the tasks in the admin?

I’ve already got the options available to me in admin as I have run the migrations.

It’s not a question of one being “proper” or “correct”, with the other being “incorrect”.

You can do it either way.

I have always done it with django-celery-beat and managing the tasks through the admin, but this is not a requirement.

But if you do, then no - those definitions would not be needed in the code.

Ok, it would be nicer in the admin. I will look at this. At least i will get some output to use to debug.

Thanks, Ken.

Hi, Ken.

I changed the scheduling to the database, but the tasks are not executing. The admin shows the last run as expected and the tasks are available, but they didn’t run.

Do i need to add any config to my settings.py

I only have this im my celery.py file

import os

from celery import Celery
from celery.schedules import crontab

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app_backend.settings')

app = Celery('app_backend',
             broker='redis://:#######ows.net:6379/__redisinstance__',
             include=['backend.tasks'])

app.conf.timezone = 'UTC'


app.conf.broker_transport_options = {'visibility_timeout': 3600}
#app.conf.result_backend =  'redis://:#############.net:6379'
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

But im thinking i need some settings in the settings.py, although i can’t see anything in the documentation telling me where to define any extra settings?

Thanks

When you’re using Django-Celery-Beat as the scheduler, you need to run the Beat worker in addition to running your Celery worker.

See the section in the docs for Example running periodic tasks

I’m starting my app with:

gunicorn -w 2 -b 0.0.0.0:8000 app_backend.wsgi:application & celery -A app_backend worker -l INFO -B & celery -A app_backend beat --loglevel=INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

Do you have django_celery_beat in your INSTALLED_APPS?

Yep

INSTALLED_APPS = [
...

    'storages',
    'celery',
    'django_celery_beat',

]

I’d try setting the log level to DEBUG to see if there’s any usable / worthwhile information being logged.

(To confirm - I don’t have anything materially different in the configuration of a project that I’m looking at right now that uses Celery / Celery Beat.)

Ok thanks, Ken.

I will add the debug to see if it highlights any issues.

Thanks

Ok im getting this error within the logs.

[2023-12-04 16:53:39,278: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
Trying again in 2.00 seconds... (1/100)

which is odd as im not using any local rabbit ampq broker?

and im also getting this

LocalTime -> 2023-12-04 16:55:43
Configuration ->
    . broker -> amqp://guest:**@localhost:5672//
    . loader -> celery.loaders.default.Loader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)
[2023-12-04 16:55:43,522: WARNING/MainProcess] Reset: Timezone changed from 'UTC' to None

when running celery beat from the terminal.

In my celeryconfig.py i have this:

app = Celery('app_backend',
             broker='redis://:#######ows.net:6379/__redisinstance__',
             include=['backend.tasks'])

app.conf.timezone = 'UTC'

Is this where you are setting the broker details, Ken?

That’s a good catch.

No it’s not.

With this setting:

we use the setting CELERY_BROKER_URL=redis://localhost:6379/0

I’d make the guess that setting it in the Celery class is working fine for Celery, but not for Beat.

(If you use the new “lower-case” configuration settings, it would be broker_url. See Configuration and defaults)

Im getting all sorts of errors now.

In your tasks.py do you add

app = Celery('tasks', 
             broker='redis://:#####E=@#####ws.net:6379/__redisinstance__',
             backend='redis://:######E=@r####ws.net:6379/__redisbackendinstance__')

I then have

@app.task
def CalculateDailyProjectMetrics():
...

If i remove



app = Celery('tasks', 
             broker='redis://:#####E=@#####ws.net:6379/__redisinstance__',
             backend='redis://:######E=@r####ws.net:6379/__redisbackendinstance__')

The app breaks when starting up cause it throws an error
TypeError: 'module' object is not callable

No. Our celery definition in the celery.py file is just:
app = Celery('pmcs')
(“pmcs” is the name of the primary app in the project.)

The complete celery.py file looks like this:

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pmcs.settings')

app = Celery('pmcs')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

Everything else associated with the Celery configuration is in the settings.py file.

This seems like a stupid question, but If i am using this app.config_from_object('django.conf:settings', namespace='CELERY') and using the lower case configuration values within my settings.py

Do i need to prefix broker_url with CELERY?

i.e. CELERY_broker_url or celery_broker_url or just broker_url

because for some reason its still throwing the error: cannot connect to amqp://guest:**@127.0.0.1:5672//:

Thanks

Nope, that’s a good question. (We’re still using the caps-version, so I don’t have a good example there to look at.)

Actually, from what I’m reading, it still should be CELERY_BROKER_URL in your settings.py. It appears to me that you would use the lower-case version only if you were pulling the configuration from something other than the settings.py file. (If you had a separate file for the Celery settings.)

See the docs at Using celery with Django, especially about half-way down where it talks about the namespace parameter.