I have been working on possible solutions to ticket #33738
Accessing the queue where the receive is coming from is only possible because the tests in tests/asgi/tests.py use the ApplicationCommunicator provided by asgiref.testing. This Communicator uses an asycio.queue.
using
input_queue = receive.__self__._queue
to get the queue and using
# Quit out if that's the end.
if not input_queue and not message.get("more_body", None):
break
to check for when it’s empty to exit the while loop in read_body, seems to pass all the tests including the disconnect issue.
But this isn’t a good implementation as most backend web applications send one request at a time and don’t use queues that can be accessed from receive.
Trying to catch the errors thrown by message = await receive()
has not been fruitful.
I feel catching an error when the request has fully been consumed, in other to exit the loop, is the right approach but I am a little bit confused as to how to go about catching the various async cancel & timeout errors and context var errors raised by SyncToAsync.thread_sensitive_context
context manager.
I will like some guidance on how to approach this.