Race condition? ASGI, asyncio, & asgiref.sync_to_async... oh my!!!

I’m facing a mysterious issue, perhaps a race condition, perhaps clashing event loops, perhaps something different altogether. I’m relatively new to async and unsure how best to debug. This scenario also doesn’t seem to be documented online, so I figured I’d share my findings with the community.

I’m working on an application with the following specs:

  • Django 5.1.4
  • ASGI (running on uvicorn)
  • SQLite as the database
  • PROD environment is a Debian server where settings.py DEBUG = False
  • DEV environment is a MacBook Air where settings.py DEBUG = True

TL:DR;
asgiref.sync_to_async wrapped database accesses are NOT executed if they are in an await statement inside an async function that is run with asyncio.create_task() resulting in application hanging when processing requests…

My investigation so far has yielded the following workarounds:

  • Django settings.py contains DEBUG = True (so this works in DEV environment)
  • asyncio.get_event_loop().set_debug(True) is run (see docs, debug mode does change how asyncio behaves)
  • asyncio.get_running_loop().set_debug(True) is run, seems to be logically equivalent to the above
  • await asyncio.sleep(0.1) is run inside the task BEFORE the database call
    • Note: asyncio.sleep(0) does NOT resolve the issue

My app responds to chat/ endpoint POST requests by streaming a response using StreamingHttpResponse. The generator that StreamingHttpResponse consumes runs some API calls that take around 5-10 seconds. There is a risk that the generator will NOT be iterated all the way through if the client disconnects halfway through iteration.

To mitigate this, an executor is created that does the work, putting the results into a asyncio.Queue, for a responder async generator to consume. The executor is triggered using asyncio.create_task and the responder is passed to StreamingHttpResponse. If the connection is severed and responder doesn’t iterate to completion? No problem. executor will continue to be served by the event loop in the background, ensuring the POST request is fully processed even if the response is only partially received by the client.

… or that’s the hope anyway …

In views.py, I have the following code:

TASK_SET = set() # Holds references to asyncio.Task, see docs for asyncio.create_task()
                 # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task

async def chat(request):
    # ------LOCATION 1------
   ...
   responder = create_executor_task_and_get_responder(request, data)
   return StreamingHttpResponse(responder, status=200, content_type='text/event-stream')

def create_executor_task_and_get_responder(request, data):
    async_queue = asyncio.Queue()
    executor_coroutine = chatExecutor(async_queue, request, data)
    executor_task = asyncio.create_task(executor_coroutine)
    TASK_SET.add(executor_task)
    executor_task.add_done_callback(remove_done_task) # Remove executor_task from TASK_SET upon completion
    responder = chatResponder(async_queue)
    return responder

async def chatExecutor(async_queue, request, data):
    try:
        # ------LOCATION 2------
        await write_to_db_lock() # Sets a lock, to prevent duplicate requests while this request is processing
        #-------LOCATION 3------
        await random_db_operation()
        await async_queue.put(some_value)
        await random_api_call()
        await async_queue.put(some_other_value)
        some_other_work()
        await async_queue.put(some_third_value)
    except Exception as e:
        logging.exception("ERROR in chatExecutor:")
    finally:
        await write_to_db_unlock()
        await async_queue.put(None)

async def chatResponder(async_queue):
    while True:
        item = await async_queue.get()
        if item is None:
            return
        yield item

write_to_db_lock, write_to_db_unlock, and plural random_db_operation are all decorated with asgiref @sync_to_async. Perhaps this is interfering with the event loop? sync_to_async docs

In DEV, this just works! Client gets beautiful streamed response. Everything executes as intended, even if client disconnects.

In PROD, this does NOT work. Using print statements inside chatExecutor and write_to_db_lock, I find that LOCATION 2 is the last place where a print statement succeeds. The function write_to_db_lock is NEVER executed, prints inside it fail to occur, and print statement at LOCATION 3 fails to occur. This leads me to believe that chatExecutor is stuck awaiting write_to_db_lock() to resolve, yet for some reason, that task is never started/picked up by the event loop.

This results in responder hanging, waiting for something to be added to async_queue, and eventually the StreamingHttpResponse times out.

EACH of the following changes ALONE to PROD make it work:

  • I set settings.py DEBUG=True
  • I run asyncio.get_event_loop().set_debug(True) at LOCATION 1
  • I run asyncio.get_running_loop().set_debug(True) at LOCATION 1
  • I run await asyncio.sleep(0.1) at LOCATION 2
    • NOTE: await asyncio.sleep(0) at LOCATION 2 does NOT work. I thought maybe the asgiref.sync_to_async needs a successful asyncio await to occur before it can succeed, perhaps it sets up the right context in the event loop? But no, if set to 0 seconds, the await asyncio.sleep(0) resolves and the program hangs on the write_to_db_lock line like always.

Again, something about asyncio debug mode resolves the issue. BUT, a workaround is possible, even if not in asyncio debug mode, if you await asyncio.sleep(0.1).

Why? I have no idea.
Is this reliable? I have no idea.

I’m done for the night, but will be digging in more tomorrow to try to debug async. Even if you are unsure about what is happening here, tips on debugging async and exploring Tasks and event loops would be appreciated. Any previous experience on asgiref.sync_to_async’s interplay with asyncio would also be helpful.

Thanks for reading!

Hi @khalide — so this looks like a deadlock. By adding the sleep() here you return control to the event loop, which I guess allows the other task to release the lock you’re going to wait for in the next step.

These things are notoriously difficult to debug. Without looking much more closely I can’t say exactly where the deadlock is occurring. (Essentially you’re running competing tasks on the same thread, each requiring the other to complete before it can progress. You need to diagnose and remove that contention. It’s hard.)

Sorry I can’t say more.

Welcome @khalide !

In addition to the above, keep in mind that SQLite is a process-based file-oriented library, and not access to an external database engine through an API. As such, it has it’s own threading rules and file-contention issues that it needs to address.
See Using SQLite In Multi-Threaded Applications

Doing some more digging I found this thread: Is sqlite3.threadsafety the same thing as sqlite3_threadsafe() from the C library? - Core Development - Discussions on Python.org

One of the conclusions I drew from that thread is that you’ve identified that your development environment is a Mac but your deployment environment is Debian. It is possible that the default threadsafe settings are different between the two, causing the difference of behavior.

Thanks for the clue! I’ll look into deadlock… I do wonder if there is a good way to view the state of the event loop(s?) and threads for debugging, so I could step through and watch resources conflicts arise… but I also wonder if it would even repro if stepped through, perhaps the slowed timing would make things just work?

Thanks @KenWhitesell good to be here

This is a good hunch, I need to investigate more to understand what it means for something to be “thread safe” - it seems that asgiref.sync_to_async has some concept of “thread sensitivity” (docs) and there’s a general idea of “async safety” that I haven’t grappled with yet (docs).

All that said… I am dubious about it being SQLite for the following reason: this is what my write_to_db_lock looks like:

@sync_to_async
def write_to_db_lock(user, lock_bool):
    logging.info("DEBUG - write_to_db_lock - Start")
    user.is_chat_in_flight = lock_bool
    logging.info("DEBUG - write_to_db_lock - User lock set")
    user.save()
    logging.info("DEBUG - write_to_db_lock - User saved, End")

NONE of these logging statements are logged! If it was a SQLite issue, I would at least anticipate seeing the “Start” message, and maybe the “User lock set” message. This is why I think it’s more of an issue with whatever sync_to_async does with threads/perhaps its own event loop under the hood. Something there is perhaps resulting in the task created by await write_to_db_lock() in chatExecutor to never be selected for execution by the event loop.

To test this, I refactored to use async ORM instead of sync_to_async:

async def write_to_db_lock(user, lock_bool):
    logging.info("DEBUG - write_to_db_lock - Start")
    user.is_chat_in_flight = lock_bool
    logging.info("DEBUG - write_to_db_lock - User lock set")
    await user.asave()
    logging.info("DEBUG - write_to_db_lock - User saved, End")

With this, the first and second logging statements ARE logged, but not the third. This leads me NOT to discount your suggestion… perhaps something SQLite is doing, or how threads interact with SQLite is the issue.

NOTE: in PROD, without ANY workarounds, the issue repro’d approximately 95% of the time. Sometimes, it just worked, but then would hang the next request. in DEV, when changing config to match PROD, the issue repro’d approximately 50% of the time, unreliably.

Will be doing more investigation and posting findings.

EDIT:
ORM_object.asave() uses asgiref.sync_to_async under the hood… so this refactor didn’t really “do away” with sync_to_async after all.

Hitting diminishing returns, I need to move on, so I’m accepting asyncio.sleep() as my workaround.

Here’s my attempt at making sure this workaround is reliable:

I know sleep(0.1) succeeds and sleep(0) fails, so try to find the threshold…

  • 0.1 - Success
  • 0.01 - Success
  • 0.001 - Success
  • 0.0001 - Success (tested 5 or 6 times, all worked)
  • 0.00001 - Failure

I’m choosing 0.01 as my value, it’s two orders of magnitude removed from the threshold and still imperceptible from a UX perspective.

Hopefully this will be sufficient in PROD under load.

Would love to know the root of the issue, but I can’t give this any more time right now.