I am working on a Django project that uses Celery to execute a group of tasks asynchronously. I noticed that when one of the tasks in the group fails, the callback function is not executed.
Here is an overview of my setup:
- Task group definition: I use
group
fromcelery
to run multiple tasks together. - Callback function: A function should execute after all tasks in the group are completed, regardless of their success or failure.
- Problem: If any task in the group fails, the callback function is not triggered.
@shared_task
def fun1():
return "Task 1 completed successfully"
@shared_task
def fun22():
print('Task22')
raise ValueError('Error')
@shared_task
def fun3():
return "Task 3 completed successfully"
@shared_task(bind=True)
def callback_task(results):
print("Callback@@@@@@@@@@@@@@")
processed_results = []
errors = []
for result in results:
if isinstance(result, dict) and result.get("status") == "error":
errors.append(result)
else:
processed_results.append(result)
if errors:
print(f"Errors encountered: {errors}")
else:
print(f"All tasks succeeded. Results: {processed_results}")
return {"processed_results": processed_results, "errors": errors}
@shared_task
def error_handler():
print('Handler')
return "error handler"
@shared_task
def fun2():
task_group = [
fun1.s(),
fun22.s().on_error(error_handler.s()),
fun3.s(),
]
result = chord(task_group, callback_task.s()).apply_async()
print(f"Group Id: {result.id}")
return result.id
Expected Behavior:
Even if one task fails, the callback function should execute and handle the results or failures.
Actual Behavior:
When one task in the group fails (e.g., task2
), the callback function is not executed.