On Delete cascade on OneToOne field with celery task

Hello everyone,

I am working on two models, which are related to each other with a one-to-many relationship, and each of which performs a Celery task in the background.

Here are the simplified models:

class TradingSystem(models.Model):
    strategy_id = models.ForeignKey(Strategy, on_delete=models.CASCADE, related_name="trading_systems")
    task_id = models.OneToOneField(TaskResult, on_delete=models.CASCADE, to_field="task_id", null=True, blank=True, related_name="strategy_taskID")

class Strategy(models.Model):
    optimization_celery_task = models.OneToOneField(TaskResult, on_delete=models.CASCADE, to_field="task_id", null=True, blank=True, related_name="optimization_taskID")

The issue is that the on_delete functionality does not work with Celery tasks because the relationship needs to be defined on the Celery task model, but I haven’t been able to figure out how to do that.

As a workaround, I modified the delete method of Strategy:

def delete(self, *args, **kwargs):
    # Deletes the Celery task associated with the strategy
    if self.optimization_celery_task:
        self.optimization_celery_task.delete()

    trading_systems = TradingSystem.objects.filter(strategy_id=self)

    for trading_system in trading_systems:
        if trading_system.task_id:
            trading_system.task_id.delete()

    super().delete(*args, **kwargs)

The problem is that trading_systems = TradingSystem.objects.filter(strategy_id=self) returns an empty queryset. Shouldn’t the related TradingSystem instances be deleted when calling super().delete()?

I need to understand if there is a way to write the corresponding ID of the trading system and strategy to the Celery task, or if I am making a mistake in the delete method of Strategy.

Thank you.

Welcome @federicovisioli !

I don’t understand what you’re saying by this.

Aren’t your celery tasks using the same models as your Django project?

But you are right, overriding the delete method should not be necessary. I’m not understanding why any of this would be necessary.

How are your celery tasks deleting the Strategy instance?

Keep in mind the following from the docs for .delete() when using delete on a queryset:

Keep in mind that this will, whenever possible, be executed purely in SQL, and so the delete() methods of individual object instances will not necessarily be called during the process. If you’ve provided a custom delete() method on a model class and want to ensure that it is called, you will need to “manually” delete instances of that model (e.g., by iterating over a QuerySet and calling delete() on each object individually) rather than using the bulk delete() method of a QuerySet.

Hi @KenWhitesell, thank you for your reply.

Blockquote
The issue is that the on_delete functionality does not work with Celery tasks because the relationship needs to be defined on the Celery task model

Currently, if I delete a strategy, the associated Celery task isn’t deleted because the one-to-one relationship is defined in the strategy model, not in the Celery model. As a result, the “on_delete” CASCADE doesn’t apply to the Celery task record.

In my application, I’m not deleting Celery tasks directly, but rather deleting the strategy itself. When I do this, I want all related Celery tasks to be deleted as well—both the one associated with the strategy model and those associated with the trading systems related to that strategy.

Here’s a reference topic discussing this behavior that I found on the forum: Does on_delete=models.CASCADE applies bidirectionally?

To achieve the desired behavior—deleting the Celery task when a strategy is deleted, as well as the Celery tasks related to the trading systems of that strategy—I understand that the one-to-one relationship would need to be defined on the TaskResult model. Unfortunately, I can’t modify the TaskResult model because it’s part of Celery and not under my control.

To work around this limitation, I attempted to create a custom delete method, but it isn’t functioning as expected.

def delete(self, *args, **kwargs):
    # Deletes the Celery task associated with the strategy
    if self.optimization_celery_task:
        self.optimization_celery_task.delete()

    trading_systems = TradingSystem.objects.filter(strategy_id=self)

    for trading_system in trading_systems:
        if trading_system.task_id:
            trading_system.task_id.delete()

    super().delete(*args, **kwargs)

This code successfully deletes the Celery task associated with self.optimization_celery_task, but it doesn’t delete the Celery tasks of the trading systems. This is because trading_systems = TradingSystem.objects.filter(strategy_id=self) returns an empty queryset.

I hope this provides a clearer description of the problem!

Unfortunately, I’m not following what you’re trying to describe as the structure of your celery tasks.

I’m getting confused because you keep referring to something called “Celery task”, but I don’t see any model by that name. Are you referring to other tasks in the Celery task queue? Or are you talking about other tasks that may be active in Celery? Or are you talking about a model, and if so, what model are you talking about here?

I’m sorry if it wasn’t clear, I’ll try to explain the context better.

I’m using a library called “django celery results”, here is the link: GitHub - celery/django-celery-results: Celery result back end with django.

In simple terms, this library allows you to store Celery task results using the Django ORM, with a model called TaskResult (as you can see, in my TradingSystem model, the task_id refers to this TaskResult model, imported from django_celery_results.models).

Here is the TaskResult model:

class TaskResult(models.Model):
    """Task result/status."""

    task_id = models.CharField(
        max_length=getattr(
            settings,
            'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',
            255
        ),
        unique=True,
        verbose_name=_('Task ID'),
        help_text=_('Celery ID for the Task that was run'))
    periodic_task_name = models.CharField(
        null=True, max_length=255,
        verbose_name=_('Periodic Task Name'),
        help_text=_('Name of the Periodic Task which was run'))
    task_name = models.CharField(
        null=True, max_length=getattr(
            settings,
            'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',
            255
        ),
        verbose_name=_('Task Name'),
        help_text=_('Name of the Task which was run'))
    task_args = models.TextField(
        null=True,
        verbose_name=_('Task Positional Arguments'),
        help_text=_('JSON representation of the positional arguments '
                    'used with the task'))
    task_kwargs = models.TextField(
        null=True,
        verbose_name=_('Task Named Arguments'),
        help_text=_('JSON representation of the named arguments '
                    'used with the task'))
    status = models.CharField(
        max_length=50, default=states.PENDING,
        choices=TASK_STATE_CHOICES,
        verbose_name=_('Task State'),
        help_text=_('Current state of the task being run'))
    worker = models.CharField(
        max_length=100, default=None, null=True,
        verbose_name=_('Worker'), help_text=_('Worker that executes the task')
    )
    content_type = models.CharField(
        max_length=128,
        verbose_name=_('Result Content Type'),
        help_text=_('Content type of the result data'))
    content_encoding = models.CharField(
        max_length=64,
        verbose_name=_('Result Encoding'),
        help_text=_('The encoding used to save the task result data'))
    result = models.TextField(
        null=True, default=None, editable=False,
        verbose_name=_('Result Data'),
        help_text=_('The data returned by the task.  '
                    'Use content_encoding and content_type fields to read.'))
    date_created = models.DateTimeField(
        auto_now_add=True,
        verbose_name=_('Created DateTime'),
        help_text=_('Datetime field when the task result was created in UTC'))
    date_done = models.DateTimeField(
        auto_now=True,
        verbose_name=_('Completed DateTime'),
        help_text=_('Datetime field when the task was completed in UTC'))
    traceback = models.TextField(
        blank=True, null=True,
        verbose_name=_('Traceback'),
        help_text=_('Text of the traceback if the task generated one'))
    meta = models.TextField(
        null=True, default=None, editable=False,
        verbose_name=_('Task Meta Information'),
        help_text=_('JSON meta information about the task, '
                    'such as information on child tasks'))

    objects = managers.TaskResultManager()

    class Meta:
        """Table information."""

        ordering = ['-date_done']

        verbose_name = _('task result')
        verbose_name_plural = _('task results')

        # Explicit names to solve https://code.djangoproject.com/ticket/33483
        indexes = [
            models.Index(fields=['task_name'],
                         name='django_cele_task_na_08aec9_idx'),
            models.Index(fields=['status'],
                         name='django_cele_status_9b6201_idx'),
            models.Index(fields=['worker'],
                         name='django_cele_worker_d54dd8_idx'),
            models.Index(fields=['date_created'],
                         name='django_cele_date_cr_f04a50_idx'),
            models.Index(fields=['date_done'],
                         name='django_cele_date_do_f59aad_idx'),
        ]

    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
            'worker': self.worker
        }

    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)

To show you what’s going on, here is part of the endpoint of my Django REST Framework, where I create the strategy and the Celery task:

user_task = strategyExecutor.delay(data_serializer.validated_data)
[...]
celery_task = TaskResult.objects.get(task_id=user_task.id)
[...]
serializer.save(task_id = celery_task)

So, when I create a strategy, a celery task starts to run and when it finishes, I take the id stored in the TaskResult model, and I save the serializer for the strategy whit that celery task (same with the TradingSystem model).

So, when I create a strategy, a Celery task starts running, and when it finishes, I take the ID stored in the TaskResult model, and I save the serializer for the strategy with that Celery task (same with the TradingSystem model).

I hope this helps clarify the problem further.

Are you referring to other tasks in the Celery task queue?
Answer: Nope, I’m refering to the celery task in the TaskResult model associated with the strategy

Or are you talking about a model, and if so, what model are you talking about here?
Answer: Yes, I’m talking about a model of a library I’m using

If there’s a better way to handle this, can you suggest some guidelines or documentation? I was stuck with Celery tasks, and this library helped me a lot.

1 Like

Hi everyone, I found a workaround for this (not clean, but it works).

In the delete request, I’m not just deleting the strategy; I’m also filtering and deleting all the associated trading systems. I know that on_delete=CASCADE already handles this, but on_delete=CASCADE doesn’t call the custom delete method, so:

def destroy(self, request, pk):
        strategy = Strategy.objects.get(pk=pk)
        trading_systems = TradingSystem.objects.filter(strategy_id=pk)

        for trading_system in trading_systems:
            trading_system.delete()
        
        strategy.delete()

        return Response(status=status.HTTP_204_NO_CONTENT)

After that, I’m going to customize the delete method of my models like this:

class Strategy(models.Model):
    [...]
    optimization_celery_task = models.OneToOneField(TaskResult, on_delete=models.CASCADE, to_field = "task_id", null=True, blank=True, related_name = "optimization_taskID")
    
    def delete(self, *args, **kwargs):
        if self.optimization_celery_task:
            self.optimization_celery_task.delete()

        super().delete(*args, **kwargs)

class TradingSystem(models.Model):
    [...]
    strategy_id = models.ForeignKey(Strategy, on_delete=models.CASCADE, related_name="trading_systems_array")
    task_id = models.OneToOneField(TaskResult, on_delete=models.CASCADE, to_field = "task_id", null=True, blank=True, related_name = "strategy_taskID")

    def delete(self, *args, **kwargs):
        if self.task_id:
            self.task_id.delete()

        super().delete(*args, **kwargs)

So, at the moment I’m cleaning up all the Celery tasks (from the “django-celery-results” model) associated with my models.

If anyone has a better way to handle this, I’m open to suggestions.

Thanks!

1 Like