Channels SyncConsumer base_send issue?

Channels 3.0.4 / Django 4.0.2

I’m using runworker to run a combination of sync and async background worker processes in one process. (I’m 99+% sure this doesn’t matter.)

Summary: I don’t know if this is a bug or if I’m just not understanding how this works.

I get the following error when an instance of a SyncConsumer is initialized:

.../channels/consumer.py:52: UserWarning: async_to_sync was passed a non-async-marked callable
self.base_send = async_to_sync(send)

This is the link to the code where the error is being thrown: channels/consumer.py at ae60a7d86f3655a1cc35cd9198e61fe5dcc5d1a1 · django/channels · GitHub

I have resolved the error message by replacing the above line with:

self.base_send = send

It works, or at least I no longer receive that error and my system is functioning exactly as expected with this change.

(Side bar: My websocket consumer is async. If this change adversely affects a synchronous websocket listener, I wouldn’t see it.
I know that in previous iterations - before making this change to consumer.py, I would not get the error message in a WebsocketConsumer.)

I’ve followed the logic all the way back to asgiref.server.StatelessServer, and this is where I get lost. I don’t understand where the self.application_send is coming from in get_or_create_application_instance so I’m not sure where to look beyond this.

What am I not seeing here?

Hey @KenWhitesell.

The full block there:

        # Store send function
        if self._sync:
            self.base_send = async_to_sync(send)
        else:
            self.base_send = send

So looking at self._sync, that’s False on AsyncConsumer and True on SyncConsumerso if your consumer is sync, then you should already be going down the else branch.

… which is the same as the adjustment you’ve made manually.

:thinking:

So the question is where/how/why is self._sync False here? — I guess I’d throw a breakpoint in at that line and and double check everything is as expected.

There’s not really enough to see the problem here.

You’re really lost me here:

  • Yes, I’m talking about a SyncConsumer

    • self._sync is True (verified)
  • The if condition in the code is if self._sync

Why would I be taking the else branch?

Follow-up note on the original with a little more information and to try to improve clarity:

I am talking about a SyncConsumer being used as a “background worker” process.

As a background worker process, it only communicates through the channel layer

  • It will never communicate directly with a websocket
  • Therefore, it doesn’t use self.send or self.base_send

All this means that the error message occurs when the instance of the SyncConsumer is created by the runworker process. It does not affect the operation of that consumer.

So, in thinking about it some more, I could just live with the warning message.

(It just “bugs” me - pun intended - but I guess that’s on me.)

I’ve lost myself there: I read the Boolean logic backwards, as always. :smiley:

OK, yes, I see you…

…I could just live with the warning message.

Mmmm. send is an asyncio.Queue.put right? That should be detectable as a coroutine object, I’d think. So, there’s an interesting question as to what we’d do to get that out of AsyncToSync.__init__() without the warning. :thinking:

I get this testing locally with PY310:

>>> import asyncio
>>> q = asyncio.Queue()
>>> q.put
<bound method Queue.put of <Queue at 0x10460c9a0 maxsize=0>>
>>> asyncio.iscoroutinefunction(q.put)
True

So, what is send in this case, and what Python are you on? etc.

You’ll probably tell me send is None or something:

>>> asyncio.iscoroutinefunction(None)
False

Likely we could guard against that… but it’d be worth a ticket to investigate properly if that’s the case.

send is printed as a lambda expression.

Actually, this is where I get lost in the code.

I’m not overriding send in my worker, which means it gets the definition of send from the parent class - SyncConsumer:

    def send(self, message):
        """
        Overrideable/callable-by-subclasses send method.
        """
        self.base_send(message)

The method base_send is being set in the block of code in the if condition we’ve been talking about. This is in the __call__ method of AsyncConsumer. (SyncConsumer inherits from AsyncConsumer).

To the extent of my understanding, that __call__ method is called in asgiref.server.StatelessServer in the get_or_create_application_instance at line 91ish:

        future = asyncio.ensure_future(
            application_instance(
                scope=scope,
                receive=input_queue.get,
                send=lambda message: self.application_send(scope, message),
            ),
        )

The self.application_send appears to be coming from line 69 because I don’t see any other method named application_send defined in the class hierarchy being used here.
But that method is flagged as async:

    async def application_send(self, scope, message):
        """
        Receives outbound sends from applications and handles them.
        """
        raise NotImplementedError("You must implement application_send()")

(The NotImplementedError being raised doesn’t concern me, because this is a background worker and does not call send.)

And this is where I’m getting lost and not really understanding what’s going on here.

Right. Yes. — We’re gonna struggle to mark that lambda as a coroutine. :slightly_smiling_face: (Or maybe we could just assign it to a variable and set _is_coroutine = asyncio.coroutines._is_coroutine on it… :thinking:)

Happy to investigate a cleanup, but, yes, this is the reason it’s a warning, not an error.

1 Like

To wrap this up - at least from my perspective.

I did some research and experimenting last night. Tried a number of different things. Did not find a solution that wasn’t significantly “uglier” and “more painful” (my opinion) than what exists now. (I also learned a bit more, so it was a bit of a win regardless.)

Anyway, if I were to make any recommendation at this point, I’d suggest tossing a note into the Channels docs in the “Worker and Background Task” section documenting that this is going to happen, it’s not a “problem” needing to be addressed, and the warning message can safely be ignored.

OK, thanks @KenWhitesell. I’ll be passing over this code this summer, so I will have a look at this then. :+1: