channel_layers.group_send does not work multiple times

I was learning about django-channels and wanted to create a basic timer that tells me every second, or rather sends a response to the react frontend/ client every sec as to many seconds has it been since a certain button has been clicked. But it seems that the group_send method of the channel_layer runs only the first time, but in my code the time update of the timer is in the consumer class, which i cannot access if the group_send method does not work. Here is the consumer.py file

import json
import asyncio
from channels.generic.websocket import AsyncWebsocketConsumer
from django.utils import timezone

class HelloConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        # Join the hello_group when a WebSocket connection is established
        await self.channel_layer.group_add('hello_group', self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        # Leave the hello_group when a WebSocket connection is closed
        await self.channel_layer.group_discard('hello_group', self.channel_name)

    async def receive(self, text_data):
        # Handle the received message
        if text_data == 'hello':
            # Update the last_hit timestamp
            self.scope['last_hit'] = timezone.now()
        else:
            await self.send(text_data=json.dumps({'error': 'Invalid message'}))

    async def send_hello_message(self, event):

        # Send hello message to the client
        while True:
            print('hehe2')
            last_hit = event.get('last_hit', None)
            if last_hit:
                seconds_ago = (timezone.now() - last_hit).seconds
                await self.send(text_data=f'Hello World hit {seconds_ago} seconds ago')
            else:
                await self.send(text_data='Hello World endpoint never hit lol')
            
            await asyncio.sleep(1)

    async def hello_event(self, event):
        print("reached here")
        # Handle the hello event from DRF view
        if event.get('hit', False):
            print('hehe')
            # await self.send_hello_message(event)
            print("last hit:", event.get('last_hit', None))
            await asyncio.ensure_future(self.send_hello_message(event))

and there is a button in the frontend that calls this view

class HelloView(APIView):
    def get(self, request):
        # Trigger the WebSocket event when the DRF endpoint is hit
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            'hello_group',
            {'type': 'hello.event', 'hit': True, 'last_hit': timezone.now()}
        )
        print("inside view")
        return Response({'message': 'Hello World!'})

The biggest ick is that when clicking the button for the first time, everything works properly it sends time elapsed so and so seconds, but then, the second time the button is pressed, nothing happens, the original time keeps on piling it does not update the last_hit time.

For more reference here’s routing.py

from channels.routing import URLRouter
# from channels.auth import AuthMiddlewareStack
from django.urls import path
from .consumer import HelloConsumer

url_router = URLRouter(
            [
                path('ws/hello/', HelloConsumer.as_asgi()),
            ]
        )

And here’s the asgi.py

"""
ASGI config for channels_test project.

It exposes the ASGI callable as a module-level variable named ``application``.

For more information on this file, see
https://docs.djangoproject.com/en/4.2/howto/deployment/asgi/
"""

import os

from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter
from channels.auth import AuthMiddlewareStack

from channel_app.routing import url_router

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'channels_test.settings')

# application = get_asgi_application()

application = ProtocolTypeRouter({
    'http': get_asgi_application(),
    'websocket': AuthMiddlewareStack(
        url_router
    ),
})

And here’s the react frontend

import logo from './logo.svg';
import './App.css';
import React, { useState, useEffect } from 'react';

function App() {
  const [message, setMessage] = useState('Hello World endpoint never hit');
  
  const hitHelloWorldEndpoint = async () => {
    try {
        // Trigger a request to the Hello World DRF endpoint
        const response = await fetch('http://localhost:8000/api/hello/');
        // const data = await response.json();
        const data = await response.json();
        console.log('inside button', data.message)
        // setMessage(data.message);
    } catch (error) {
        console.error('Error hitting the Hello World endpoint:', error);
    }
  };
  useEffect(() => {
      // Establish a WebSocket connection
      const socket = new WebSocket('ws://localhost:8000/ws/hello/');

      // Handle WebSocket events
      socket.onopen = () => {
          console.log('WebSocket connection opened');
          socket.send('hello');
      };

      socket.onmessage = (event) => {
        console.log('onmessage: ', event.data);
        try {
            const data = JSON.parse(event.data);
            setMessage(data);
        } catch (error) {
            console.error('Error parsing JSON:', error);
            if (typeof(event.data) === 'string'){
              setMessage(event.data)
            }
        }
      };

      socket.onclose = () => {
          console.log('WebSocket connection closed');
      };

      // Cleanup WebSocket connection on component unmount
      return () => {
          socket.close();
      };
  }, []); // Empty dependency array ensures useEffect runs only once on component mount

  return (
    <div className="App">
      <header className="App-header">
        <img src={logo} className="App-logo" alt="logo" />
        <p>
          {message}
        </p>
        <button onClick={hitHelloWorldEndpoint}>Hit Hello World Endpoint</button>
      </header>
    </div>
  );
}

export default App;

And this is the output i get

WebSocket CONNECT /ws/hello/ [127.0.0.1:52606] # this is the websocket connection successful message
reached hereinside view #this is when the view is called and reached here means that it is inside the consumer class

hehe 
last hit: 2023-12-14 19:04:24.685034+00:00 # this is called when the last_hit is updated
hehe2
HTTP GET /api/hello/ 200 [0.00, 127.0.0.1:52607] #called when the api is hit
hehe2
hehe2
hehe2
hehe2
hehe2
inside view
HTTP GET /api/hello/ 200 [0.00, 127.0.0.1:52607]
hehe2
hehe2
hehe2
hehe2
hehe2
inside view
HTTP GET /api/hello/ 200 [0.00, 127.0.0.1:52607]
hehe2
hehe2
hehe2
hehe2

Can anyone explain why the api call goes through but last_hit does not get updated, moreover the i believe the group_send lines from the view

channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            'hello_group',
            {'type': 'hello.event', 'hit': True, 'last_hit': timezone.now()}
        )

don’t even work after the button is clicked for the first time.

Thanks in advance

There are a couple things I notice right off-hand (there may be more).

  • Your while loop in send_hello_message will never exit. This gets into an area of Consumers that I’m not familiar with. It seems to me that if you call this method a second time, you’re going to have this loop running twice - the original loop with no “last_hit” and the second instance with.

  • You’re calling send_hello_message inside the asyncio.ensure_future method, which isn’t necessary. And, it seems to me that because you’re calling send_hello_message as a function (and not passing the function to ensure_future), it’s calling that function, and waiting for the results to be returned before the call to ensure_future can complete.

I’m a bit out of my depth here, and I’m not sure what the precise semantics would be in this case. I’m drawing this as an analogy to a simpler situation.

If I have:

def x(a):
    while True:
        pass
print (x(1))

nothing will ever get printed because the call to x never returns and so print doesn’t complete.

When I need to do something like this, I use a sentinal value on the consumer that might get updated elsewhere. For example:

def looping_function(self, a=None):
    self.a = a
    while self.a:
        # Do stuff

Then, when I want to stop that loop, I set self.a = None from another method within the consumer.

(Also, I don’t think I’ve ever needed to directly use a method from the asyncio library in my Channels code other than sleep. There’s always been a simpler way to do things.)

1 Like

Yep, just to agree with what @KenWhitesell said, you should hold a reference to the future here, and make sure to cancel it at some point, in `disconnect, say. (You should be seeing error logging at shutdown about unfinished tasks.)

Also using create_task is probably the way forward.

This updates the value in the scope, but…

… this takes a local copy, which is then never updated.

1 Like

@KenWhitesell and @carltongibson thank you so much guys. It works with asyncio.create_task. My god! idk why that did not even cross my mind, i had just read it somewhere that to use loops in channels asynchronously, ensure_future was the go to function for it and did not even think about the loop not finishing and the line await asyncio.ensure_future(self.send_hello_message(event)) kept on waiting for the function to finish. All this while i’m thinking maybe the group_send does not run twice since it thinks it’s the same instance and does not take into context the code when run for the second time.

This is the updated code for future reference, i’ve only changed the hello_event function from the consumer, and that did the job

    async def hello_event(self, event):
        print("reached here")
        if self.task: # checks if previous task exists and stops it, and stopping time exceed error
            self.task.cancel()
        self.task = asyncio.create_task(self.send_hello_message(event)) # always creates a new task with a new loop, upholding individuality and modularity
1 Like