Hello everyone, I have an application that uses celery for background tasks. I want to have automated tests for these tasks, including tests of the task_failure
signal receivers. A few years ago, when I initially built this system, I was using django’s built in unittest test runner and copied some hacky code to run tests. I have since switched to using pytest. I am adding some new celery tasks, with task_failure
receivers, and I wanted to see if there was a better way to build the tests than the ugly scab I built previously. The celery documentation provides information about the built-in pytest features, which looked promising.
Here is a simplified version of what I have built.
# tasks.py
@shared_task(bind=True)
def my_task(self):
print("my_task was called")
raise NotImplementedError
@task_failure.connect(sender=my_task)
def handle_my_task_failure(sender=None, **kwargs):
print("handle_my_task_failure was called")
# tests.py
from tasks import my_task
import pytest
@pytest.mark.celery(
task_always_eager=False,
task_eager_propagates=False,
result_serializer="json",
result_backend="file://results",
pool="thread",
)
@pytest.mark.django_db(transaction=True)
def test_my_task_failure(celery_session_worker):
with pytest.raises(NotImplementedError):
result = my_task.delay()
result.get()
When I run the test using pytest -s tests.py
, pytest prints
my_task called
No matter what I do, I just can’t get the celery signals to work.
Has anyone ever tried to test celery signals using the celery_session_worker
fixture? My ddg/google searching is failing me. Any clues anyone could provide would be greatly appreciated.
Thanks in advance.