So I have followed the these celery docs to prepare my django project: First steps with Django — Celery 5.2.7 documentation
I have added these files to my project:
curated/landscape/celery.py
:
from __future__ import absolute_import, unicode_literals
from celery import Celery
import os
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'landscape.settings')
app = Celery('landscape')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
I have edited the curated/landscape/__init__.py
:
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
and added a task to my app here curated/landscape/self_hosted/tasks.py
from celery import shared_task
# app = Celery('tasks', broker='pyamqp://guest@localhost:9001//')
@shared_task
def add(x, y):
return x + y
I have a rabbitmq
running which also works since I have the following output from my celery worker:
celery -A landscape worker -l INFO
[2022-06-30 14:48:36,670: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
-------------- celery@devbox v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.15.0-40-generic-x86_64-with-glibc2.35 2022-06-30 14:48:36
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: landscape:0x7f0b3861f160
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
[2022-06-30 14:48:36,889: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2022-06-30 14:48:36,890: INFO/MainProcess] mingle: searching for neighbors
[2022-06-30 14:48:36,893: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2022-06-30 14:48:37,900: INFO/MainProcess] mingle: all alone
[2022-06-30 14:48:37,912: INFO/MainProcess] celery@devbox ready.
You can see that I have no tasks registered in celery which is weird.
I try to add the task manually:
python manage.py shell
from self_hosted.tasks import add
add.delay(4,4)
No hostname was supplied. Reverting to default 'localhost'
<AsyncResult: 610baac9-1c72-42d9-8ae0-8762684e325f>
but then I get the following output from the celery worker:
[2022-06-30 14:49:45,964: ERROR/MainProcess] Received unregistered task of type 'self_hosted.tasks.add'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[[4, 4], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (81b)
Thw full contents of the message headers:
{'lang': 'py', 'task': 'self_hosted.tasks.add', 'id': '610baac9-1c72-42d9-8ae0-8762684e325f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '610baac9-1c72-42d9-8ae0-8762684e325f', 'parent_id': None, 'argsrepr': '(4, 4)', 'kwargsrepr': '{}', 'origin': 'gen1013307@devbox', 'ignore_result': False}
The delivery info for this task is:
{'consumer_tag': 'None4', 'delivery_tag': 1, 'redelivered': False, 'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "/home/andrej/.pyenv/versions/3.10.5/envs/curated/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
strategy = strategies[type_]
KeyError: 'self_hosted.tasks.add'
I think I followed through with the docs but I am missing something.
If it is any help here is the repo with the current commit: https://github.com/ajfriesen/curated/tree/4a0f33233fdf1d4e493dacbe66569fad1275cbba
Maybe it is because of my settings module but I am kind of lost now and wasted enough time and hope someone can push me in the right direction