Hello everybody,
I stumbled over a default behavior of sync_to_async in a AsyncWebsocketConsumer or in general in web sockets, which I find odd and I’m not totally sure how to go on about this.
When sync_to_async is called this function is invoked:
async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
__traceback_hide__ = True # noqa: F841
loop = asyncio.get_running_loop()
# Work out what thread to run the code in
if self._thread_sensitive:
if hasattr(AsyncToSync.executors, "current"):
# If we have a parent sync thread above somewhere, use that
executor = AsyncToSync.executors.current
elif self.thread_sensitive_context.get(None):
# If we have a way of retrieving the current context, attempt
# to use a per-context thread pool executor
thread_sensitive_context = self.thread_sensitive_context.get()
if thread_sensitive_context in self.context_to_thread_executor:
# Re-use thread executor in current context
executor = self.context_to_thread_executor[thread_sensitive_context]
else:
# Create new thread executor in current context
executor = ThreadPoolExecutor(max_workers=1)
self.context_to_thread_executor[thread_sensitive_context] = executor
elif loop in AsyncToSync.loop_thread_executors:
# Re-use thread executor for running loop
executor = AsyncToSync.loop_thread_executors[loop]
elif self.deadlock_context.get(False):
raise RuntimeError(
"Single thread executor already being used, would deadlock"
)
else:
# Otherwise, we run it in a fixed single thread
executor = self.single_thread_executor
self.deadlock_context.set(True)
else:
# Use the passed in executor, or the loop's default if it is None
executor = self._executor
context = contextvars.copy_context()
child = functools.partial(self.func, *args, **kwargs)
func = context.run
try:
# Run the code in the right thread
ret: _R = await loop.run_in_executor(
executor,
functools.partial(
self.thread_handler,
loop,
self.get_current_task(),
sys.exc_info(),
func,
child,
),
)
finally:
_restore_context(context)
self.deadlock_context.set(False)
return ret
The important part is in the longer if-clause, since _thread_sensitve is true by default this will end up in here:
else:
# Otherwise, we run it in a fixed single thread
executor = self.single_thread_executor
self.deadlock_context.set(True)
self.single_thread_executor is a static variable on all SyncToAsync classes, this means, that only ONE thread is used for all sync_to_async calls in websockets. In my understanding this can lead to a big bottleneck, when for example 1000 clients are connected to the socket and on an event broadcasted to all of them each needs to run sync_to_async for a database call for example. All 1000 would need to use this one thread one after another.
If I got something wrong, please correct me
Now how can I solve this?
One idea would be to always call a sync_to_async inside of
async with ThreadSensitiveContext():
But to have this for all 1000 clients would need to a lot of new database connections which would maybe exceed the number the database is capable of. Also creating 1000 threads seems a lot.
Another idea would be to set thread_sensitive=False , then the executer is either passed into it or the default executer of the event loop is used? To always pass a new executer seems to be not practical. How about the default executer, does this have multiple threads maybe and would solve the problem? When are these threads created, I dont see this in the code?
I use uvicorn workers in gunicorn.
Thank you for any insights.