celery signals pytest celery_session_worker

Hello everyone, I have an application that uses celery for background tasks. I want to have automated tests for these tasks, including tests of the task_failure signal receivers. A few years ago, when I initially built this system, I was using django’s built in unittest test runner and copied some hacky code to run tests. I have since switched to using pytest. I am adding some new celery tasks, with task_failure receivers, and I wanted to see if there was a better way to build the tests than the ugly scab I built previously. The celery documentation provides information about the built-in pytest features, which looked promising.

Here is a simplified version of what I have built.

# tasks.py
def my_task(self):
   print("my_task was called")
   raise NotImplementedError

def handle_my_task_failure(sender=None, **kwargs):
    print("handle_my_task_failure was called")
# tests.py
from tasks import my_task
import pytest

def test_my_task_failure(celery_session_worker):
    with pytest.raises(NotImplementedError):
        result = my_task.delay()

When I run the test using pytest -s tests.py, pytest prints

my_task called

No matter what I do, I just can’t get the celery signals to work.

Has anyone ever tried to test celery signals using the celery_session_worker fixture? My ddg/google searching is failing me. Any clues anyone could provide would be greatly appreciated.

Thanks in advance.

I’m looking for something similar. I have a @task_postrun task that I want to include in my unit test, but it’s not getting run.

I believe it’s because the signal is not getting registered in the worker that pytest starts to do the processing.

I’ve seen other articles (task_postrun signal handler not being called when sender specified · Issue #1873 · celery/celery · GitHub) that suggest using @signals.worker_init.connect to explicitly associate the postrun task with the celery task, but I’m not sure how to do that in pytest.