Multiple Daemon threads getting spawned in ASGI protocol with daphne server

Hi,

I’m attempting to run an async view using the ASGI protocol with Daphne as the server. However, I’ve noticed that it’s creating new ThreadPoolExecutor instances for some requests, with daemon threads still running in the background post-benchmarking. My understanding is that since it’s based on an event loop, it should use a single ThreadPoolExecutor. Could someone clarify this for me?

Middlewares:

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

And my view is:

import threading

import asyncio
import aiohttp
import logging

import psutil as psutil
from rest_framework.response import Response
from adrf.decorators import api_view
import json

logger = logging.getLogger(__name__)


@api_view(['GET'])
async def index(request):
    res = await make_api_request("http://{{host}}/v1/completions")
    return Response(res, status=200)


async def make_api_request(url, method="POST", headers=None, params=None, json_data=None, timeout=None):
    try:
    	json_data = {
                'prompt': 'Hi, How are you?',
                'max_new_tokens': 700, 'temperature': 0, 'top_p': 1, 'max_tokens': 700,
                'model': 'meta-llama/Llama-2-7b-chat-hf'}
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, headers=headers, params=params, json=json_data,
                                       timeout=timeout, ssl=False) as response:
                content = await response.read()
                if 'json' in response.headers.get('Content-Type', ''):
                    content = json.loads(content)
                return content
    except asyncio.TimeoutError:
        raise TimeoutError("Request timed out. The server did not respond within the specified timeout period.")
    except aiohttp.ClientError as e:
        raise ConnectionError(f"Request error: {str(e)}")
    except Exception as e:
        raise Exception(f"Exception error: {str(e)}")

The code I’m pointing to in asgiref’s module sync.py

    async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
        __traceback_hide__ = True  # noqa: F841
        loop = asyncio.get_running_loop()

        # Work out what thread to run the code in
        if self._thread_sensitive:
            if hasattr(AsyncToSync.executors, "current"):
                # If we have a parent sync thread above somewhere, use that
                executor = AsyncToSync.executors.current
            elif self.thread_sensitive_context.get(None):
                # If we have a way of retrieving the current context, attempt
                # to use a per-context thread pool executor
                thread_sensitive_context = self.thread_sensitive_context.get()

                if thread_sensitive_context in self.context_to_thread_executor:
                    # Re-use thread executor in current context
                    executor = self.context_to_thread_executor[thread_sensitive_context]
                else:
                    # Create new thread executor in current context
                    executor = ThreadPoolExecutor(max_workers=1)
                    # print("================== created new thread ================")
                    self.context_to_thread_executor[thread_sensitive_context] = executor
            elif loop in AsyncToSync.loop_thread_executors:
                # Re-use thread executor for running loop
                executor = AsyncToSync.loop_thread_executors[loop]
            elif self.deadlock_context.get(False):
                raise RuntimeError(
                    "Single thread executor already being used, would deadlock"
                )
            else:
                # Otherwise, we run it in a fixed single thread
                executor = self.single_thread_executor
                self.deadlock_context.set(True)
        else:
            # Use the passed in executor, or the loop's default if it is None
            executor = self._executor

        context = contextvars.copy_context()
        child = functools.partial(self.func, *args, **kwargs)
        func = context.run

        try:
            # Run the code in the right thread
            ret: _R = await loop.run_in_executor(
                executor,
                functools.partial(
                    self.thread_handler,
                    loop,
                    self.get_current_task(),
                    sys.exc_info(),
                    func,
                    child,
                ),
            )

        finally:
            _restore_context(context)
            self.deadlock_context.set(False)

        return ret

Checked the threads via command **top -o threads -pid <pid>**
Dependencies:

adrf==0.1.4
aiohttp==3.9.3
aiosignal==1.3.1
asgi-debug==0.1
asgiref==3.7.2
async-property==0.2.2
async-timeout==4.0.3
asyncio==3.4.3
attrs==23.2.0
autobahn==23.1.2
Automat==22.10.0
backports.zoneinfo==0.2.1
cffi==1.16.0
click==8.1.7
constantly==23.10.4
cryptography==42.0.4
daphne==4.1.0
Django==4.2.10
djangorestframework==3.14.0
frozenlist==1.4.1
h11==0.14.0
hyperlink==21.0.0
idna==3.6
incremental==22.10.0
multidict==6.0.5
packaging==24.0
psutil==5.9.8
pyasn1==0.5.1
pyasn1-modules==0.3.0
pycparser==2.21
pyOpenSSL==24.0.0
pytz==2024.1
service-identity==24.1.0
six==1.16.0
sqlparse==0.4.4
structlog==24.1.0
Twisted==23.10.0
txaio==23.1.1
typing_extensions==4.9.0
uvicorn==0.29.0
yarl==1.9.4
zope.interface==6.2