Concerns about Consumer using sync_to_async

Hello everybody,

I stumbled over a default behavior of sync_to_async in a AsyncWebsocketConsumer or in general in web sockets, which I find odd and I’m not totally sure how to go on about this.
When sync_to_async is called this function is invoked:

async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
        __traceback_hide__ = True  # noqa: F841
        loop = asyncio.get_running_loop()

        # Work out what thread to run the code in
        if self._thread_sensitive:
            if hasattr(AsyncToSync.executors, "current"):
                # If we have a parent sync thread above somewhere, use that
                executor = AsyncToSync.executors.current
            elif self.thread_sensitive_context.get(None):
                # If we have a way of retrieving the current context, attempt
                # to use a per-context thread pool executor
                thread_sensitive_context = self.thread_sensitive_context.get()

                if thread_sensitive_context in self.context_to_thread_executor:
                    # Re-use thread executor in current context
                    executor = self.context_to_thread_executor[thread_sensitive_context]
                else:
                    # Create new thread executor in current context
                    executor = ThreadPoolExecutor(max_workers=1)
                    self.context_to_thread_executor[thread_sensitive_context] = executor
            elif loop in AsyncToSync.loop_thread_executors:
                # Re-use thread executor for running loop
                executor = AsyncToSync.loop_thread_executors[loop]
            elif self.deadlock_context.get(False):
                raise RuntimeError(
                    "Single thread executor already being used, would deadlock"
                )
            else:
                # Otherwise, we run it in a fixed single thread
                executor = self.single_thread_executor
                self.deadlock_context.set(True)
        else:
            # Use the passed in executor, or the loop's default if it is None
            executor = self._executor

        context = contextvars.copy_context()
        child = functools.partial(self.func, *args, **kwargs)
        func = context.run

        try:
            # Run the code in the right thread
            ret: _R = await loop.run_in_executor(
                executor,
                functools.partial(
                    self.thread_handler,
                    loop,
                    self.get_current_task(),
                    sys.exc_info(),
                    func,
                    child,
                ),
            )

        finally:
            _restore_context(context)
            self.deadlock_context.set(False)

        return ret

The important part is in the longer if-clause, since _thread_sensitve is true by default this will end up in here:

else:
                # Otherwise, we run it in a fixed single thread
                executor = self.single_thread_executor
                self.deadlock_context.set(True)

self.single_thread_executor is a static variable on all SyncToAsync classes, this means, that only ONE thread is used for all sync_to_async calls in websockets. In my understanding this can lead to a big bottleneck, when for example 1000 clients are connected to the socket and on an event broadcasted to all of them each needs to run sync_to_async for a database call for example. All 1000 would need to use this one thread one after another.
If I got something wrong, please correct me :slight_smile:

Now how can I solve this?

One idea would be to always call a sync_to_async inside of
async with ThreadSensitiveContext():
But to have this for all 1000 clients would need to a lot of new database connections which would maybe exceed the number the database is capable of. Also creating 1000 threads seems a lot.

Another idea would be to set thread_sensitive=False , then the executer is either passed into it or the default executer of the event loop is used? To always pass a new executer seems to be not practical. How about the default executer, does this have multiple threads maybe and would solve the problem? When are these threads created, I dont see this in the code?
I use uvicorn workers in gunicorn.

Thank you for any insights.

I think for large numbers of connected clients you’d need to be investigating some kind of fan-out strategy, so a single worker connects to the database, and so on, and then sends a message via the channel layer that is then picked up by all clients.

DB connections are pretty much always the bottleneck. There’s no entirely straightforward answer to “How do I connect 1000 clients to the DB at once?” — The answer would be some variant on “Don’t”.

Thank you for your reply.

Making a single worker connect to the database makes sense, since the response should be always quite the same, not fully, but mostly.

Do you know more about thread_sensitive=False in this case? Where is the default Executer created and with how many threads?

DB connections are not thread safe, do you should always use thread_senstive=True for these connections.

What do you mean with thread safe in this case?

I’m talking just about the use in Websockets, in normal http-requests I have thread_senstive=True always of course.

But I dont see the point in a Websocket, since they are not running in ThreadSensitiveContext, they would run in any thread anyways. Right?

As far as I understand DB connections are thread safe, they are stored in the thread locals. Or did I get this wrong?

DB connections must always be used on the thread they’re opened on. (They’re not “thread safe”) Django enforces this. Use sync_to_async with thread_sensitive=True for this. If you use the “single worker” idea this would apply there.

For the websocket consumers, sure it’s all async, but then why use sync_to_async? — if it’s to make a DB call then you’re back where you were.

It’s not really possible to say more here, I think.

Yes it’s to make database calls.

thread_sensitive=True in a Websocket will all execute in one thread one after another as mentioned in my first question, this is not optimal. I mean the function has to be called by each consumer and will be executed one after another, so the last consumer will get it way later than the first.

With thread_sensitive=False, they would be executed in different threads all from the event loops ThreadExecuter which has by default min(32, (os.cpu_count() or 1) + 4) threads.

What difference does it make to use thread_sensitive=True/False in a websocket? I think it does not make a difference, since it is not running in ThreadSensitiveContext, therefor not each Consumer has it’s own thread. So the only difference is the use of one or more threads to run the functions at once or after one another. I dont see the point of doing that one after another with thread_sensitive=True.

Or is my understanding wrong?
Database connections are always opened in the thread anyways and not in the main thread with the event loop, right?

Am I missing something?

I think you’re actually referring to a Consumer here? Or are you talking about using an Async Django view?

In either case, you’re not running the Consumer or the view with thread_sensitive=True - it is used when you’re calling a synchronous function.

I think you might be conflating what is happening in the Consumer with what is happening in the sync_to_async calls.

In an async consumer, everything is running in one event loop (one thread). This is optimal and the very purpose of running async. (Being able to adequately handle a large number of consumers concurrently.)

Then the question becomes, “which thread is used when calling a synchronous function”. When thread_sensitive is True, it’s defined in the docs as:

the sync function will run in the same thread as all other thread_sensitive functions.

which is required when using the database. Using it anywhere else depends upon whether those other functions are capable of working in a different (new) thread.

1 Like

You are right, I should be more clear.

Im only talking about Consumers here. In views it is clear that always thread_sensitive should be True, when calling a function with sync_to_async, because a request is handled in ThreadSensitiveContext. This means that all sync_to_async of this request are run in this one thread.

In a Consumer on the other hand there is nothing like a ThreadSensitiveContext. So when I want to call the database I have to do it in a function wrapped in sync_to_async. Now here this is actually the problem of thread_sensitive=True :

Everything running in the same thread is bad for performance isn’t it? Suppose I have 1000 consumers triggered and they all have to make a DB call before sending something to the client, then with threat_sensitive=True all use this one thread for this function calling the DB one after another. When I call this function with threat_sensitive=False. There would be min(32, (os.cpu_count() or 1) + 4) threads to run this function on the same time, this would make this way faster. This is because then the default executer of the loop is used (see the code I send in the else block).

In Theory, perhaps.

In reality, it depends upon what needs to be done.

In practice, the theoretical issues are irrelevant because Django requires it be done this way, as explained above and in the docs.

If necessary, it can be addressed by running multiple processes, each one an independent instance and therefore running independent event loops. But how you address this really does depend upon your usage and requirements - and may require taking a different look at what you’re doing and how these events are triggered.