Hi,
I’m attempting to run an async view using the ASGI protocol with Daphne as the server. However, I’ve noticed that it’s creating new ThreadPoolExecutor instances for some requests, with daemon threads still running in the background post-benchmarking. My understanding is that since it’s based on an event loop, it should use a single ThreadPoolExecutor. Could someone clarify this for me?
Middlewares:
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
And my view is:
import threading
import asyncio
import aiohttp
import logging
import psutil as psutil
from rest_framework.response import Response
from adrf.decorators import api_view
import json
logger = logging.getLogger(__name__)
@api_view(['GET'])
async def index(request):
res = await make_api_request("http://{{host}}/v1/completions")
return Response(res, status=200)
async def make_api_request(url, method="POST", headers=None, params=None, json_data=None, timeout=None):
try:
json_data = {
'prompt': 'Hi, How are you?',
'max_new_tokens': 700, 'temperature': 0, 'top_p': 1, 'max_tokens': 700,
'model': 'meta-llama/Llama-2-7b-chat-hf'}
async with aiohttp.ClientSession() as session:
async with session.request(method, url, headers=headers, params=params, json=json_data,
timeout=timeout, ssl=False) as response:
content = await response.read()
if 'json' in response.headers.get('Content-Type', ''):
content = json.loads(content)
return content
except asyncio.TimeoutError:
raise TimeoutError("Request timed out. The server did not respond within the specified timeout period.")
except aiohttp.ClientError as e:
raise ConnectionError(f"Request error: {str(e)}")
except Exception as e:
raise Exception(f"Exception error: {str(e)}")
The code I’m pointing to in asgiref’s module sync.py
async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
__traceback_hide__ = True # noqa: F841
loop = asyncio.get_running_loop()
# Work out what thread to run the code in
if self._thread_sensitive:
if hasattr(AsyncToSync.executors, "current"):
# If we have a parent sync thread above somewhere, use that
executor = AsyncToSync.executors.current
elif self.thread_sensitive_context.get(None):
# If we have a way of retrieving the current context, attempt
# to use a per-context thread pool executor
thread_sensitive_context = self.thread_sensitive_context.get()
if thread_sensitive_context in self.context_to_thread_executor:
# Re-use thread executor in current context
executor = self.context_to_thread_executor[thread_sensitive_context]
else:
# Create new thread executor in current context
executor = ThreadPoolExecutor(max_workers=1)
# print("================== created new thread ================")
self.context_to_thread_executor[thread_sensitive_context] = executor
elif loop in AsyncToSync.loop_thread_executors:
# Re-use thread executor for running loop
executor = AsyncToSync.loop_thread_executors[loop]
elif self.deadlock_context.get(False):
raise RuntimeError(
"Single thread executor already being used, would deadlock"
)
else:
# Otherwise, we run it in a fixed single thread
executor = self.single_thread_executor
self.deadlock_context.set(True)
else:
# Use the passed in executor, or the loop's default if it is None
executor = self._executor
context = contextvars.copy_context()
child = functools.partial(self.func, *args, **kwargs)
func = context.run
try:
# Run the code in the right thread
ret: _R = await loop.run_in_executor(
executor,
functools.partial(
self.thread_handler,
loop,
self.get_current_task(),
sys.exc_info(),
func,
child,
),
)
finally:
_restore_context(context)
self.deadlock_context.set(False)
return ret
Checked the threads via command **top -o threads -pid <pid>**
Dependencies:
adrf==0.1.4
aiohttp==3.9.3
aiosignal==1.3.1
asgi-debug==0.1
asgiref==3.7.2
async-property==0.2.2
async-timeout==4.0.3
asyncio==3.4.3
attrs==23.2.0
autobahn==23.1.2
Automat==22.10.0
backports.zoneinfo==0.2.1
cffi==1.16.0
click==8.1.7
constantly==23.10.4
cryptography==42.0.4
daphne==4.1.0
Django==4.2.10
djangorestframework==3.14.0
frozenlist==1.4.1
h11==0.14.0
hyperlink==21.0.0
idna==3.6
incremental==22.10.0
multidict==6.0.5
packaging==24.0
psutil==5.9.8
pyasn1==0.5.1
pyasn1-modules==0.3.0
pycparser==2.21
pyOpenSSL==24.0.0
pytz==2024.1
service-identity==24.1.0
six==1.16.0
sqlparse==0.4.4
structlog==24.1.0
Twisted==23.10.0
txaio==23.1.1
typing_extensions==4.9.0
uvicorn==0.29.0
yarl==1.9.4
zope.interface==6.2