queryset.all() does not clear the cache in Celery

I have a web application in Django (4.0.4), Python 3.8 which has a GUI part and a backend tasks part implemented using Celery (5.2.6).

I use lists backed by database tables using stored querysets whose cache I clear before every new access to them using queryset.all()

self._query_set = self._query_set.all()
self._df = pd.DataFrame.from_records(self._query_set)

This works perfetcly in the web part but it SOMETIMES does not work in the backend CELERY part. Calling the .all() simply does not clear the queryset’s result cache resulting in not reflecting chenges made in the web part. Am I missing something substantial?

the code:

LOG.debug("Reloading content of the database list [{}].".format(self.get_list_names()))
LOG.debug("Querysets count before reload: {}".format(len(self._query_set)))
self._query_set = self._query_set.all()
self._df = pd.DataFrame.from_records(self._query_set)
LOG.debug("Querysets count after reload: {}".format(len(self._query_set)))

Celery backend running since 21.7.2022

Starting CELERY WORKER
 -------------- celery@8fcde8905395 v5.2.6 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-5.4.0-122-generic-x86_64-with-glibc2.2.5 2022-07-21 15:37:46

New records added in the web app 10.8.2022 11:32

[2022-08-10 11:27:49,138]|INFO|vavrova|middleware:log:81|POST|POST /vichr/admin/uzivatele/uzivatel/add/|...ADD USER
[2022-08-10 11:28:57,123]|INFO|vavrova|middleware:log:81|POST|POST /vichr/admin/uzivatele/uzivatel/add/|...ADD USER
[2022-08-10 11:31:01,172]|INFO|vavrova|middleware:log:81|POST|POST /vichr/admin/uzivatele/uzivatel/add/|...ADD USER
[2022-08-10 11:32:08,113]|INFO|vavrova|middleware:log:81|POST|POST /vichr/admin/uzivatele/uzivatel/add/|...ADD USER

backend task run in Celery at 10.8.2022 19:00 - the list was NOT correctly updated

[2022-08-10 19:00:27,924]|DEBUG|[Resources.ListsResource:__enter__:36]|Reloading lists ['seznam_uzivatele_osobni_asistence'] from database   (...LIST OF USERS)
[2022-08-10 19:00:27,924]|DEBUG|[Resources.DatabaseListsResource:_load_list_from_database:40]|Reloading content of the database list [['seznam_uzivatele_osobni_asistence']].
[2022-08-10 19:00:27,924]|DEBUG|[Resources.DatabaseListsResource:_load_list_from_database:41]|Querysets count before reload: 73
[2022-08-10 19:00:27,941]|DEBUG|[Resources.DatabaseListsResource:_load_list_from_database:44]|Querysets count after reload: 73  (... number of users BEFORE adding new ones - WRONG ...)

Manual restart of the Celery backend at 10.8.2022 22:29:

Starting CELERY WORKER
 -------------- celery@8fcde8905395 v5.2.6 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-5.4.0-124-generic-x86_64-with-glibc2.2.5 2022-08-10 22:29:05

consecutive task at 22:31 CORRECTLY read the contents of the list

[2022-08-10 22:31:38,976]|INFO|[UI.tasks:task_update_forms:85]|TASK started.
[2022-08-10 22:31:39,108]|DEBUG|[Resources.ListsResource:__enter__:36]|Reloading lists ['seznam_uzivatele_osobni_asistence'] from database    (...LIST OF USERS)
[2022-08-10 22:31:39,109]|DEBUG|[Resources.DatabaseListsResource:_load_list_from_database:40]|Reloading content of the database list [['seznam_uzivatele_osobni_asistence']].
[2022-08-10 22:31:39,116]|DEBUG|[Resources.DatabaseListsResource:_load_list_from_database:41]|Querysets count before reload: 77   (... number of users AFTER adding new ones - CORRECT ...)
[2022-08-10 22:31:39,127]|DEBUG|[Resources.DatabaseListsResource:_load_list_from_database:44]|Querysets count after reload: 77

I’ve run out of ideas what to try next - what am I doing wrong.

Thanks for any hint.

It would certainly help if you could provide more detail on the implementation of the tasks and how celery is being started.

Hi,

The whole app is wrapped in a docker container, Celery is started by the following scripts:
start_celery_worker.docker.sh:

#!/bin/bash

echo "Starting CELERY WORKER"
cd /app
. ./common_daemon_setup.sh
celery -A UI worker --pool=prefork -l INFO -E
echo "DONE"

start_celery_beat.docker.sh:

#!/bin/bash

echo "Starting CELERY BEAT"
cd /app
. ./common_daemon_setup.sh
celery -A UI beat -l INFO

common_daemon_setup.sh:

#!/bin/bash

export PYTHONPATH=$(pwd):$(pwd)/py_code:$(pwd)/py_code/Bussiness:$(pwd)/py_code/UI
export DJANGO_SETTINGS_MODULE=${DJANGO_SETTINGS_MODULE}_tasks

tasks.py:

from vichr_business import VichrBusiness
from UI import celery_app
from celery.schedules import crontab
from celery import Task
from datetime import datetime
import json

import logging
LOG = logging.getLogger(__name__)

backend = VichrBusiness()

class BaseTaskWithRetry(Task):
    autoretry_for = (Exception, KeyError)
    retry_kwargs = {'max_retries': 5}
    retry_backoff = 5

...

@celery_app.task(base=BaseTaskWithRetry)
def task_refresh_all_data():
    LOG.info("TASK started.")

    try:
        backend.refresh_all_data()
    except Exception as e:
        LOG.exception('TASK failed.')
        raise

    LOG.info("TASK finished.")


@celery_app.task(base=BaseTaskWithRetry)
def task_periodic_test():
    LOG.info("PERIODIC TASK started.")
    LOG.info("PERIODIC TASK finished.")


@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    # sender.add_periodic_task(
    #     crontab(minute='*'),
    #     task_periodic_test.s(),
    # )

    # sender.add_periodic_task(
    #     crontab(minute='*'),
    #     task_get_datasource_new_data.s('kniha-jizd'),
    # )

    sender.add_periodic_task(
        crontab(hour=4, minute=5),
        task_refresh_all_data.s(),
    )


def _deserialize_time_range(time_range):
    time_range = json.loads(time_range)
    if isinstance(time_range, str):
        return time_range
    else:
        dt_from = datetime.strptime(time_range[0], '%Y-%m-%d')
        dt_to = datetime.strptime(time_range[1], '%Y-%m-%d')
        return (dt_from.date(), dt_to.date())

the app is quite complex, the issue logic is somewhere inside the

backend = VichrBusiness()

which is based on Django models. The crucial spot is (mentioned also above) here:

LOG.debug("Reloading content of the database list [{}].".format(self.get_list_names()))
LOG.debug("Querysets count before reload: {}".format(len(self._query_set)))
self._query_set = self._query_set.all()
self._df = pd.DataFrame.from_records(self._query_set)
LOG.debug("Querysets count after reload: {}".format(len(self._query_set)))

config:

CELERY_SETTING = {
    'broker_url': 'redis://NONEXISTING',
    'result_backend': 'django-db',
    'task_default_queue': 'RR_vichr_tasks',
    'worker_concurrency': 1,
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json'],
    'timezone': TIME_ZONE,
    'enable_utc': True,
    'task_track_started': True,
    'task_time_limit': 30 * 60,
    'beat_max_loop_interval': 10,
    'worker_hijack_root_logger': False,
    'worker_max_tasks_per_child': 1,

    'broker_transport_options': {
        'max_retries': 3,
        'interval_start': 0,
        'interval_step': 0.2,
        'interval_max': 3,
    },
    # 'broker_connection_timeout': 1,
    # 'broker_connection_max_retries': 1,
    # 'redis_socket_timeout': 2,
    # 'redis_socket_connect_timeout': 1,
    # 'broker_connection_timeout': 1,
    # 'broker_connection_retry': False,
    # 'broker_connection_max_retries': 1,
}

What else may help?

Thanks

Is there any reason you define backend on the module level?
Maybe the results are being cached on the backend instance. If there’s no specific reason to keep the state between tasks, i suggest that you create a instance of VichrBusiness inside each task.

well, there are other sources that are intended to be cached therefore I did it this way to allow caching across tasks. Only for the crucial parts where I need fresh data I call .all() on respective querysets. As all documentation says, queryset.all() should return a copy of the original queryset with empty caches. Isn’t this true?
Thanks

Note that a queryset is not evaluated when it’s defined. Querysets are lazy.

This statement does not cause the queryset to be evaluated, and as such does not necessarily clear the cache. Most likely, it’s not until the next statement that the queryset may be evaluated.

Yes, I know that this statement itself does not cause the queryset to be evaluated but the next one should I guess (or not)?

self._df = pd.DataFrame.from_records(self._query_set)
LOG.debug("Querysets count after reload: {}".format(len(self._query_set)))

I don’t know. You’re passing a query set to a function of unknown (to me) properties. It may or may not cause the queryset to be evaluated.

OK and what reliably causes the queryset to evaluate?

See the docs for When querysets are evaluated for the situations in which that will occur.

strange is that len() should cause evaluation of the queryset

len(). A QuerySet is evaluated when you call len() on it. This, as you might expect, returns the length of the result list.

but it seems not to do so … (or the all() call does not work as expected …)

LOG.debug("Querysets count after reload: {}".format(len(self._query_set)))

Again, there’s the question of what exactly is happening in this VichrBusiness object.

You’re creating an instance of it at the module level as mentioned above.

It’s going to be impossible to answer your questions about this without seeing the complete class.

Otherwise, my only suggestion is to create a new instance of the object within the task.

Ok, the easiest way seems to sacrify any inter-task caching and create the VichrBusiness object for each task again.

The VichrBusiness object covers quite a complex functionality part of which are prefetched lists from database. Some of them are changing and need a fresh copy for each processing. Therefore an all() call is issued on them to force flushing of their caches … but it seems not to work as expected … :frowning:

Thanks

Again, the issue here is this object being created at the module layer. There are side effects to that that can only be fully understood in the context of the entire class, especially as it relates to names being bound to objects.

Edit / Addendum: My point here is that I believe the query is being executed, but something in the way that that object is managing its references isn’t reflecting that. You can verify that the query is being executed a number of different ways - for example by adding per-query logging either to your database or to your Django application, or by using something like tcpdump / wireshark to watch the network traffic between your app and the database.

Hi, I moved creation of the VichrBusiness() object from the module level to the method level but withou any effect :frowning: