RabbitMQ + Celery: Task Keeps Restarting

Hey All, thanks for making this such a great forum. I’ve come here to get many answers, this is the first time I’ve had to post a question.

I’ve got a problem with Django+RabbitMQ+Celery on a Windows machine. When I start my worker using:

celery -A string_project worker -l info --pool=solo

It automatically receives a task, indicating that a task is in the queue. When I let it run, it will complete the task, but then automatically start another one. I suspect an acknowledgement isn’t happening and my tasks are not getting deleted from the queue, but I haven’t been able to figure out how to troubleshoot this. It’s a long running task, about 12 minutes. I had this working perfectly last week, updated plots to use Plotly, and now this seemingly unrelated issue cropped up. Here’s my setup:

settings.py:

# Celery Config
CELERY_BROKER_URL = 'amqp://localhost'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_AMQP_TASK_RESULT_EXPIRES = 1000
CELERY_ACKS_LATE=False
CELERY_TASK_TRACK_STARTED=True
CELERY_SEND_TASK_EVENTS=True

celery.py:

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery 

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'string_project.settings')

app = Celery('string_project')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

init.py:

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__=['celery_app']

models.py

from django.db import models
from django.contrib.auth import get_user_model
from django.conf import settings
from datetime import date
from django.urls import reverse

# Create your models here.

class Application(models.Model):
	CATEGORY_CHOICES = (
		('S', 'Should Cost'),
		('D', 'Demand Prediction'),
		('C', 'Quote Optimizer'),
	)
	company=models.CharField(max_length=100, default="")
	category = models.CharField(max_length=1, choices=CATEGORY_CHOICES, default="")
	title=models.CharField(max_length=100)
	description = models.TextField(max_length=250)
	filepath = models.TextField(default="")
	date_created = models.DateTimeField(auto_now_add=True, null=True, blank=True)
	creator = models.ForeignKey(
		get_user_model(),
		on_delete=models.CASCADE,
	)

	#Additional information I need to add to the model so I can quickly view a completed analysis

	raw_data_table = models.TextField(default="", blank=True)
	result_table = models.TextField(default="", blank=True)
	feat_imp_fig = models.TextField(default="", blank=True)
	predictions_fig = models.TextField(default="", blank=True)
	top_ten = models.TextField(default="", blank=True)
	run_date = models.DateTimeField(auto_now=True, null=True, blank=True)

	def __str__(self):
		return self.description

	def get_absolute_url(self):
		return reverse ('applications')

tasks.py:

@shared_task
def run_should_cost(filepath, logged_in_user):
... #this is a really long machine learning algorithm that ends with saving data to my db here:
	p=Application(
		id=logged_in_user,
		raw_data_table=raw_data_table,
		result_table=result_table,
		feat_imp_fig=feat_imp_fig,
		predictions_fig=predictions_fig,
		top_ten=top_ten,
		run_date=current_time
		)
	p.save(update_fields=['raw_data_table', 'result_table', 'feat_imp_fig', 'predictions_fig', 'top_ten', 'run_date'])

views.py:

class ShouldCostRunView(CompanyCheckMixin, DetailView):
	model=Application
	template_name='submit_success.html'

	def get_context_data(self, **kwargs):
		context=super(ShouldCostRunView, self).get_context_data(**kwargs)
		filepath=self.object.filepath
		logged_in_user=self.object.pk
		run_should_cost.delay(filepath, logged_in_user)
		return context

Any help would be appreciated!

First, when you’re posting code here, please enclose it between lines of three backtick - ` characters. This forces the forum software to maintain the formatting of the code - very important with Python.

You’re creating a new Application object, but then saving it with the update_fields parameter when this new object hasn’t been saved yet - that combination of events makes no sense.
(Creating the object with p=Application(... does not insert it into the database. You have only created a Python object in memory.)

Ken, thanks for the reply. I’ve updated so hopefully it’s easier to read. I also added my models.py file. The Application object has to be created before the analysis can be run so I am trying to replicate this from the docs:

https://docs.djangoproject.com/en/3.2/ref/models/instances/#saving-objects

Does that make more sense or is it still incorrect? When I had this working previously, it seemed like it was working correctly.

Steve

Is the reference to the Application object in run_should_cost actually supposed to be a reference to an existing instance of the Application model?

If so, then you want to retrieve that instance from the database and not create a new one. You can then update the relevant fields.

If not, if you’re really intending to create a new instance of Application in run_should_cost, then you need to remove the update_fields parameter.

Yes, it is. I thought I was retrieving the object from the database in the get_context_data function:

The logged_in_user variable is actually the primary key (confusing because it got changed halfway through, apologies)

The filepath, is also specific to each Application object.

Filepath and logged_in_user (pk) get passed to the run_should_cost so it knows which file to run the analysis on and which object to update when it’s complete.

Not relevant here. I am only talking about what’s happening within the run_should_cost method. No code anywhere else matters. That’s different code running at a different point in time in a different process. There’s no direct sharing of data between the two other than what’s passed as parameters.

If I’m understanding, you’re saying that my last statement is also not correct - the filepath and logged_in_user are actually not getting passed to the run_should_cost function? Is your thought that this is causing an error and retry loop with the worker?

I was under the impression that the analysis was completing successfully based on the worker log:

[2021-08-08 11:48:28,838: INFO/MainProcess] Task applications.tasks.run_should_cost[b0486967-c495-4708-b553-c73a7982d94f] succeeded in 736.8439999999973s: None
[2021-08-08 11:48:28,838: INFO/MainProcess] Task applications.tasks.run_should_cost[03a5e266-8606-40e6-9334-3a5c00c02c60] received
[2021-08-08 12:00:33,150: INFO/MainProcess] Task applications.tasks.run_should_cost[03a5e266-8606-40e6-9334-3a5c00c02c60] succeeded in 724.3279999999941s: None
[2021-08-08 12:00:33,150: INFO/MainProcess] Task applications.tasks.run_should_cost[e335c3e3-d4ed-4ece-ab1f-61e24d801b68] received
[2021-08-08 12:12:51,493: INFO/MainProcess] Task applications.tasks.run_should_cost[e335c3e3-d4ed-4ece-ab1f-61e24d801b68] succeeded in 738.3280000000086s: None
[2021-08-08 12:12:51,493: INFO/MainProcess] Task applications.tasks.run_should_cost[2a3c5809-23aa-49b1-9a7b-2bded838d549] received

I also have this ListView that shows the objects, the last run date/time, and it matches the worker log completion date/time.

class ApplicationListView(ListView):
	model=Application
	template_name='applications.html'

	def get_queryset(self):
		return super(ApplicationListView, self).get_queryset().filter(company=self.request.user.company)

This all led me to believe it was something with the worker queue not getting deleted, but you think I need to re-reference the model object in the run_should_cost function?

Steve

Please stay focused on your run_should_cost function and ignore everything else at the moment. We can come back to other issues later if questions still remain.

I’m guessing from your subsequent responses that the answer is “Yes”.

This means that you need to replace the
p=Application(... statement with something more like:
p = Application.objects.get(...)
to retrieve the existing object. You can then save it with the updated fields.

Here’s the update I made:

tasks.py:

@shared_task
def run_should_cost(filepath, logged_in_user):
... #this is a really long machine learning algorithm that ends with saving data to my db here:
	
p=Application.objects.get(id=logged_in_user)

p.raw_data_table=raw_data_table,
p.result_table=result_table,
p.feat_imp_fig=feat_imp_fig,
p.predictions_fig=predictions_fig,
p.top_ten=top_ten,
p.run_date=current_time

p.save(update_fields=['raw_data_table', 'result_table', 'feat_imp_fig', 'predictions_fig', 'top_ten', 'run_date'])
	

I just re-read the docs, and I think I found where I has mis-interpreted it.

Is this what you were thinking?

Yes.

You want to remove the commas at the end of each assignment, and since these are the only fields being changed within that model, you can also remove the update_fields parameter in your save.

Ok, made the updates and tried re-started the worker. It looks like it’s completing the analysis. I’m still getting the auto-restart. Any thoughts?

(string_v3) PS C:\Users\steve\Desktop\Coding\string_v3> celery -A string_project worker -l info --pool=solo -E

 -------------- celery@DESKTOP-IDPTUF3 v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Windows-10-10.0.19043-SP0 2021-08-08 18:02:17
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         string_project:0x1ae9985bbe0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 12 (solo)
-- ******* ---- .> task events: ON
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . applications.tasks.run_should_cost
  . string_project.celery.debug_task

[2021-08-08 18:02:17,638: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2021-08-08 18:02:17,654: INFO/MainProcess] mingle: searching for neighbors
[2021-08-08 18:02:18,685: INFO/MainProcess] mingle: all alone
[2021-08-08 18:02:18,700: WARNING/MainProcess] c:\users\steve\desktop\coding\string_v3\lib\site-packages\celery\fixups\django.py:203: UserWarning:

Using settings.DEBUG leads to a memory
            leak, never use this setting in production environments!


[2021-08-08 18:02:18,700: INFO/MainProcess] celery@DESKTOP-IDPTUF3 ready.
[2021-08-08 18:02:18,700: INFO/MainProcess] Task applications.tasks.run_should_cost[b0486967-c495-4708-b553-c73a7982d94f] received
[2021-08-08 18:15:56,755: INFO/MainProcess] Task applications.tasks.run_should_cost[b0486967-c495-4708-b553-c73a7982d94f] succeeded in 818.0470000000059s: None
[2021-08-08 18:15:56,770: INFO/MainProcess] Task applications.tasks.run_should_cost[03a5e266-8606-40e6-9334-3a5c00c02c60] received
[2021-08-08 18:47:56,595: INFO/MainProcess] Task applications.tasks.run_should_cost[03a5e266-8606-40e6-9334-3a5c00c02c60] succeeded in 1919.8280000000086s: None
[2021-08-08 18:47:56,595: INFO/MainProcess] Task applications.tasks.run_should_cost[e335c3e3-d4ed-4ece-ab1f-61e24d801b68] received

Are you seeing the updated data in the database? (Is the save saving the results you’re expecting to see?)

Yeah, it’s updating. I changed a few parameters in the file before restarting to verify and the results are different.

Check the length of your current task queue. If it’s not empty, empty it.

Then start up your worker and try it again.

While it’s running, check the task queue to see if there is another instance waiting to be run.

(I’m wondering if there’s some large number of tasks waiting that got generated for some reason. You log strongly implies that there was a task waiting for it when the worker started.)

Ken, I just ran a few more tries. It looks like everything is working. Thanks a lot for the help!

1 Like