celery shared_task ignoring flow-control statements?

Hello, I’m running into flow-control issues in a celery shared_task. My continue statement is seemingly being ignored, so I added a state variable. “if not skipping:” but that condition seems to also be ignored.

I’m reading a log file line-by-line, getting a json dict from each line, and processing it to create new objects in my database.
In testing, I created a bad json line, so it should hit my except clause, log the exception, and then hit my continue statement - but it seems to gloss over that continue statement:

for i, line in enumerate(input_data):
    try:
        line_data = json.loads(line)
    except Exception as e:
        logger.error(e)
        continue
    ts = line_data["timestamp"]

For my testing, I have a bad JSON line for line 1, then a perfectly good line for line 2 of my input_data. I get the JSON error, but then I get a ForkPoolWorker - UnboundLocalError saying it can’t access line_data where it’s not associated with a value. So I created a state variable to control flow, but that seems to be ignored as well:

for i, line in enumerate(input_data):
    skipping = False
    try:
        line_data = json.loads(line)
    except Exception as e:
        logger.error(e)
        skipping = True
    if not skipping:
        ts = line_data["timestamp"]

I still get the same thing - the log of the JSON error, followed by the ForkPoolWorker - UnboundLocalError saying line_data is undefined.

Is this because of flow control issues, or could this be a symptom of Celery pre-assigning workers to blocks of code regardless of the flow control?

If I want to abandon an iteration which raises errors (during JSON import or during Pydantic validation), what’s the best way to do that?

1 Like

Welcome @scottk4096 !

Please post the complete error with the traceback. (And when you do, post it between lines of ``` just like you would post a block of code.)

Probably.

It might also be helpful if we saw the complete function.

The break statement can be used to terminate a for loop at any point in time.

I’m not wanting to break the loop, but just continue to the next line after the Exception is caught. The weird thing is that it sort-of looks like the Except block is garbage-collecting line_data, since I’d already set line_data = None.

What versions of the key modules are you using? (What versions of Django, Python, and Celery?)

Are you using any Celery “add-on” modules?

How are you running Celery? (What is the command line you are using?)

Thanks Ken, nice to meet you. Sorry for the delay, I had to get my boss’ permission before posting the code. I’m also struggling with the formatting a little, so I really hope this comes through readable. I’m not wanting to terminate the loop on an exception, but just continue to the next loop iteration. However, I’m wondering if some shared_task worker assignment is making that a little more tricky than I’m used to. Okay, it looks like the format codes don’t like Python comments too much:

‘‘‘send_notification executes:
async_to_sync(channel_layer.send)(
channel_name, {
“type”: “send_notification”, “message”: message})

@shared_task()
def process_logs(req, proj_id, api_version=1):
logger = logging.getLogger(“django”)
results = [ ]
time.sleep(3)
send_notification(req, “We are processing your request”)

filename = req["filename"]
try:
    with open(filename, "r") as logs:
        _import_data = logs.readlines()
except Exception as e:
    logger.error(e)
    send_notification(
        req,
        "We faced some issues while importing the log file.",
    )
    time.sleep(3)
    return

# Process log lines
for i, line in enumerate(_import_data):
    # JSON parsing
    line_data = None
    try:
        line_data = json.loads(line)
    except Exception:
        ts_label, ts_text = extract_timestamp(line)
        linestr = f"JSON error encountered on line {i} with a {ts_label} of {ts_text}.  Line {i} skipped."
        logger.warning(linestr)
        send_notification(
            req,
            linestr)
        continue

    # Check for pre-existing record
    ts = line_data["timestamp"]

The output includes the logger messages, followed by the stack trace. What’s interesting to me is that I had assigned line_data = None before trying the json.loads. Could that exception block be causing line_data to be garbage-collected? Here’s the shell output (‘fraid the triple-quote formatter isn’t working for me a second time in the same post):

abc-app-1  | \[2025-09-17 18:56:15,878\] \[WARNING\] \[N/A\] \[django\] \[/abc/abc/tasks.py:136\] JSON error encountered on line 0 with a timestamp of 2025-09-15T22:15:28.904035".  Line 0 skipped.
abc-app-1  | \[2025-09-17 18:56:15,878: WARNING/ForkPoolWorker-6\] JSON error encountered on line 0 with a timestamp of 2025-09-15T22:15:28.904035".  Line 0 skipped.
abc-app-1  | \[2025-09-17 18:56:15,892: ERROR/ForkPoolWorker-6\] Task abc.tasks.process_logs\[1a2a7a82-88e2-4d17-9fb0-9baf608b7292\] raised unexpected: UnboundLocalError(“cannot access local variable ‘line_data’ where it is not associated with a value”)
abc-app-1  | Traceback (most recent call last):
abc-app-1  |   File “/usr/local/lib/python3.11/site-packages/celery/app/trace.py”, line 453, in trace_task
abc-app-1  |     R = retval = fun(\*args, \*\*kwargs)
abc-app-1  |                  ^^^^^^^^^^^^^^^^^^^^
abc-app-1  |   File “/usr/local/lib/python3.11/site-packages/celery/app/trace.py”, line 736, in **protected_call**
abc-app-1  |     return self.run(\*args, \*\*kwargs)
abc-app-1  |            ^^^^^^^^^^^^^^^^^^^^^^^^^
abc-app-1  |   File “/abc/abc/tasks.py”, line 143, in process_logs
abc-app-1  |     ts = line_data\[“timestamp”\]
abc-app-1  |          ^^^^^^^^^
abc-app-1  | UnboundLocalError: cannot access local variable ‘line_data’ where it is not associated with a value

Side note: The lines of ``` must be lines by themselves - not part of any other line.

1 Like

This is a project which was already set up, and which I joined the team to help maintain, so I can’t speak to the choice of version numbers. I’m quite sure our libraries are way out of date, so I’m just trying to figure out the best way to work around (it seems) not having a working continue statement.

python 3.11.6,

django 4.2.13,

from the requirements file, it just lists django-celery-beat and celery.

The whole thing is being run as a Docker image.

In the (dunder)init for our app directory is this:

from .celery import app as celery_app

_all_ = [‘celery_app’]

and then (trying to hide any company info) that .celery file is:

import os

from celery import Celery

os.environ.setdefault(“DJANGO_SETTINGS_MODULE”, “abc_dashboard.settings”)

app = Celery(“abc_dashboard”)

app.config_from_object(“django.conf:settings”, namespace=“CELERY”)

app.autodiscover_tasks()

Then, in that module which runs the process_logs function, we import shared_task from celery.

oh - those are back-ticks? oops.

My first reaction is that all the docs I see show @shared_task as being used without parens. (i.e. not @shared_task(). ) I’m not in a position at the moment to determine whether this is relevent in this case.

My initial gut hunch for this is that there’s something going wrong in send_notification. I know strange things can happen when you encounter an exception within an exception handler. In this case, the nested exception would cause the termination of the except block.

Other than that, I don’t see anything fundamentally wrong with the code you have posted.
I’d be looking deeper at all logs and other functions/processes looking for some other clue.

Side note: The first part of these Celery logs should show the Celery version.

Your Dockerfiles / compose file(s) should show how you’re running Celery.

Ah, that makes a whole lot of sense. I’ve had that send_notification work sometimes and other times fail, so that could be running into exceptions. Thanks for the tip about exceptions within the exception handler. I’ll try removing that send_notification and see if that helps.

also, FWIW I found this in the logs - kind-of tough to find next to the gigantic C that Celery prints there… celery@c2b6d078a21f v5.5.3 (immunity)

Many thanks for your help!

By the way, Ken you were absolutely right about that send_notification function. What I did was move those calls outside of the Except blocks, and then everything worked fine. I didn’t even have to add any state variables, due to the types of failures causing the exception, so I didn’t have to add any “code smell.” Many thanks for your help.