Unable to group_send() to other Consumers except for the Consumer that calls group_send()... No solution? Bug?

Here is my situation, I’m creating a RaspPi4 controlled solar heating system that is quite complicated in options / scope. I’m essentially having to rewrite their entire app in Python. Not only that, there are two apps, a MainApp which is just a usual Python script, and a DjangoApp which is also run on the same Pi. Things don’t have to work remotely. All that I need is the ability for the user to log into the site from their phone and make setting changes in the DjangoApp’s pages. I figured even though it’s a slow-running heat system (heat transfer is naturally a slow process), I wanted all comm channels to be fast. The previous version of the app was cludgy because it relied on the Database to communicate between MainApp and their web app (PHP! :flushed:). I tried to get the PHP going, but had many issues getting the database migrating from one of several remote live systems. The two coders that worked on it have vanished so can’t really help get it setup for development again. I suggested Django since I wrote a BSS (Bootstrap Studio) to Django template export script, Django is modern, Python based, so everything is under one systems language, and I know some Django. Not to mention database model management is also done in Python and is very nice. Anyhow, long story short here is the communication topology I came up with:

I’ve spent a week getting both MainApp ← {WebSocket} → DjangoApp and DjangoApp ← {WebSocket} → Browser JS both working error-free and coded in a robust way with classes and even and ErrorReporting class. Learned a lot of new JS features that were previously black magic.

One thing to note is that I have a full-stack Debug environment setup: I am able to debug in WingWare (Python) and VSCode (JS) on the same running instance. Even if the project is on the RaspPi4. However, currently because I don’t have time to write a Copy-over-to-Pi build step over SSH (though I know how from previous work), I am simply working on my Windows 10 workstation for now. So there is no MainApp yet, just a simple loop that keeps sending a text string or a number over the WS to Django.

Here are the relevant sections of code:

# sensor_test/consumers.py
import json
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
import json
from channels.layers import get_channel_layer
from channels.generic.websocket import AsyncWebsocketConsumer

class SensorTestConsumer(AsyncWebsocketConsumer):
    groups = ["sensor_test_group"]
    
    async def connect(self):
        await self.accept();
        
        await get_channel_layer().group_add(
            'sensor_update_group',
            self.channel_name
        )        
        
    async def disconnect(self, close_code):
        pass
                
    async def receive(self, text_data=None, bytes_data=None):
        print(f'{self.__class__.__name__} received: {text_data}')
        
        # asynchronously wait for string to send
        await self.send("Browser JS -> MainApp 😎😎😎😎😎😎")
        
    async def forward_to_client_browser(self, event):
        print(event['message'])

#main_app_comms/consumers.py
import json
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from channels.layers import get_channel_layer
from channels.generic.websocket import AsyncWebsocketConsumer

class ServerConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        await self.accept()
        
        self.send("Django->MainApp")
                
    async def disconnect(self, close_code):
        pass
                
    async def receive(self, text_data=None, bytes_data=None):
        print(f'{self.__class__.__name__} received: {text_data}.  Forwarding to the SensorTestConsumer')
        
        await get_channel_layer().group_send(
            "sensor_update_group",
            {
                "type": "forward_to_client_browser",
                "message" : json.dumps({ "type": "SensorTestUpdate", "sensor_vals" : {"HT3": text_data}})
            }
        )
        
        ### y wait for string to send
        await self.send("Django -> MainApp 😎😎😎😎😎😎")
        
    async def forward_to_client_browser(self, event):
        print("TEST TEST TEST")      # This is the only place the event gets forwarded to
        

#website/asgi.py
# This file completely rewritten per Django Channels install instructions.

import os
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
import main_app_comms.routing
import sensor_test.routing
from channels.auth import AuthMiddlewareStack 

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings')

django_asgi_app = get_asgi_application()

websocket_urlpatterns = sensor_test.routing.websocket_urlpatterns + main_app_comms.routing.websocket_urlpatterns 

application = ProtocolTypeRouter({
    "http" : django_asgi_app,
    "websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
})


#website/settings.py
# .....
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("localhost", 6379)],
        },
    },
}

Expected Behavior: For a message received by MainAppCommsConsumer to be forwarded correctly to the “sensor_test_group” of connections of which the Consumer called SensorTestConsumer is subscribed.

Actual Behavior: The best actual behavior I’ve seen is that the group_send() message is being sent but only to the MainAppCommsConsumer which is a useless feature, because MainAppCommsConsumer was the one who received the original message.

What I need to do is in general “forward the MainApp data” to the client browser WS connection. Any other way of occomplishing this will be fine, but remember in Django there could be 1-N users connected and viewing the page, so Django Channels group sending is exactly appropriate.

I have considered but do not want to code the entire MainApp as Django worker/background processes.

I would be willing to pay you even if you have a freelancer.com account and/or Zelle on your banking app. This is critical for me. They want it done ASAP and this is probably typical of any project, however this is my second embedded project and they’re giving me the bum’s rush a second time.

Would also be willing to post my code on a public GitHub repository for others to try to debug.

Currently what I’m doing is debugging the Channels library code starting from group_send() with my WingWare debugger. Sometimes this helps, but it’s up-in-the-air when I’ll discover the bug or my coding error. Hence this post.

What does your websocket_urlpatterns look like? What does the code on the client side(s) look like that is connecting here? (Are you seeing that your clients are connecting to the appropriate consumers?) I’d start by adding print statements in the connect, printing the channel. Then after the connections have been made, examine the group in redis to see if those channels are connected.

1 Like

Ken, both sides are working fine, I’ve sent data on the MainApp <-{WS}->Django as wll as the Django<-{WS}->Browser side. I have tested that. The problem lies in the Django code or my usage of Channels. Here are my websocket_urlpatterns:

websocket_urlpatterns = [
    re_path("ws/sensor-test/", consumers.SensorTestConsumer.as_asgi()),
]

websocket_urlpatterns = [
    re_path("ws/update/", consumers.ServerConsumer.as_asgi()),
]

Thanks! :slight_smile:

Just to be clear I want to forward data received by ServerConsumer (from separate process MainApp) to the SensorTestConsumer (which in turn sends it to browser’s JS over a WS).
Btw, I am not getting any errors! I can set a breakpoint in the SensorTestConsumer.forward_to_browser_client(event) routine, and it’s never hit, but the exact same routine in ServerConsumer will get called as long as on the same group.

Yes, I’m pretty sure I understand what you’re trying to do here.

What I don’t understand is how you’re trying to do it from the clients.

What have you done to verify that each client is connecting to the correct consumer?

1 Like

Ken, what I did is run two tests. One for either side. But you made me think now to check that I didn’t reverse the URLs or something, brb. :smiley:

Ken, here is my MainApp stub (a simple loop that keeps sending data every 2 seconds):

import asyncio
import websockets

HOST = 'localhost'
PORT = 8000
STRING_TO_SEND = '30.156'

class Base:
    """Basic class, containing common functionality among all versions for Raspberry Pi."""
    def __init__(self, string):
        self.string = string
        self.server_url = f'ws://{HOST}:{PORT}/ws/update/'
        self.publish = True
   
    async def publish_state(self, interval: int = 2):
        """Coro: Periodically publishes instance state to websocket URL."""
        async with websockets.connect(self.server_url) as ws:
            ws.open_timeout = 10000000
            print(f'connected to {self.server_url}')
            while self.publish:
                await ws.send(self.string)                
                await asyncio.sleep(0.1)
                response = await ws.recv()
                print(f'response: {response}')
                await asyncio.sleep(2)

async def main():
    base = Base(string=STRING_TO_SEND)
    await base.publish_state()
    
if __name__ == "__main__":
    print(f'running {__file__}')
    asyncio.run(main())

JS side (client browser):

// Base class:
import { ErrorReporter } from "./error_reporter.js";

export class WebSocketClient
{
    constructor(url_path, err_element=null) {
        const protocol = (window.location.protocol === 'https:' ? 'wss' : 'ws') + '://';
        this._connect_url = `${protocol}${window.location.host}/ws/${url_path}`;        
        this._websocket = null;
        this._err_element = err_element;
        this._error_reporter = null;
        this.log_status = false;
    }

    connect() 
    {    
        if (this._websocket != null && this._websocket.OPEN)
            return;
        this._websocket = new WebSocket(this._connect_url);
        if (this._error_reporter == null)
            this._error_reporter = new ErrorReporter(this, this._err_element);
        this._setup_event_handlers();
    }

    disconnect()
    {
        if (this._websocket != null)
        {
            if (this._websocket.OPEN)
                this._websocket.close();
            this._websocket = null;
        }
    }

    _setup_event_handlers()
    {
        this._websocket.onopen = () => { this._onopen(); };
        this._websocket.onmessage = (msg) => { this._onmessage(msg); };
        this._websocket.onclose = () => { this._onclose(); };
        this._websocket.onerror = (err) => { this._onerror(err); };
    }

    _onopen() {
        console.log(`WebSocketClient connected to ${this._connect_url}`);
    }

    _onmessage(message) {
        if (this.log_status)
            this.report_status(`WebSocketClient received message: \n + ${message.data}`);
    }

    _onclose() {
        if (this.log_status)
            this.report_status("WebSocketClient connection closed.");
    }

    _onerror(err) {
        console.log(`WebSocketClient error occured: \n + ${err}`);
    }

    send(message) {
        if (this.log_status)
            this.report_status(`WebSocketClient sending message: \n ${message}`);

        if (this._websocket.CLOSED)
            this.connect();

        this._websocket.send(message);
    }

    report_error(err_msg, severity="CRITICAL")
    {
        this._error_reporter.report(err_msg, severity);
    }

    _parse_json(json_str)
    {
        var json = null;

        try {
            var json = JSON.parse(json_str);
        }
        catch (err)
        {
            this.report_error(`WebSocketClient unable to parse JSON: \n ${json_str}. \n More: ${err}`);
        }

        return json;
    }

    report_status(status_msg)
    {
        this.report_error(status_msg, "INFO");
    }
};

From which the following app-specific subclass is derived:

import { WebSocketClient } from "../core/websocket_client.js";
import { is_dictionary, is_float, is_string } from "../core/js_utils.js";

export class SensorTestClient extends WebSocketClient
{
    constructor(backend_path, err_element=null)
    {
        super(backend_path, err_element);
        this.update_sensor_value = null;   //  Callback for GUI to latch onto
        this._self = this;
    }

    _onopen()
    {
        console.debug("_onopen() overridend 😁");
    }

    _onclose()
    {
        super._onclose();
    }

    _onmessage(message)
    {
        const json = this._parse_json(message.data);
        
        if (json == null)
        {
            this.report_error(
                `SensorTestClient can't parse JSON from message: \n "${message}"`);
            return;
        }

        if (this._marshall_json(json))
        {
            const sensor_vals = json['sensor_vals'];
            
            for(var sensor_var in sensor_vals)
            {
                const sensor_value = sensor_vals[sensor_var];

                if (this.update_sensor_value != null)
                    this.update_sensor_value(sensor, sensor_value);
            }
        }        
    }

    _marshall_json(json)
    {
        if (!('type' in json)) {
            this.report_error("SensorTestClient received JSON message with no 'type' key.")
            return false;
        }

        if (json['type'] != 'SensorTestUpdate') {
            this.report_error("SensorTestClient expects JSON message type SensorTestUpdate.");
            return false;
        }

        if (!('sensor_vals' in json))
        {
            this.report_error("SensorTestClient received SensorTestUpdate message without 'sensor_vals' key.");
            return false;
        }

        const sensor_vals = json['sensor_vals'];

        if (!(is_dictionary(sensor_vals)))
        {
            this.report_error("SensorTestClient expects a Dictionary of sensors under the 'sensor_vals' key.");
            return false;
        }

        for(var sensor_var in sensor_vals)
        {
            const sensor_value = sensor_vals[sensor_var];

            if (!is_float(sensor_value)) {
                this.report_error(`SensorTestClient expects floats for sensor values.\n Got: ${sensor_value}. Putting NaN.`, "WARNING");
                sensor_vals[sensor_var] = NaN;
            }
        }

        return true;
    }

    _parse_json(string)
    {
        return super._parse_json(string);
    }
}

Side note:

You don’t need to call get_channel_layer here (or with the group_send).

The consumer is instantiated with a reference to the channel_layer and can be referenced as self.channel_layer, making this statement:
await self.channel_layer.group_add(...
(and correspondingly the same with the await self.channel_layer.group_send()

This is not the cause of the symptoms you are seeing, but just a step of taking advantage of what Channels provides for the consumers.

1 Like

Ken,

Thank you! I will edit now and put that into my codebase. Shorter codes always welcome :slight_smile:

Do you think I could find the bug (in my code) by debugging / understanding the internals of Channels (where it actually does the sending?) or does that typically take a week or more because of the library’s complexity?

No, I’m not at the step where I think that would be necessary.

(The symptoms you are describing are consistent with the clients connecting to the wrong consumer.)

There’s a lot of information you can gather before needing to dig deeper into the code.

Going back to something I said earlier:

Side note: I’ve looked an the JS you’ve provided here, and I don’t see where you’re defining the url path being submitted, so I still can’t tell if your JS is connecting to the right consumer. (I don’t consider myself proficient with JS - that’s primarily why I rely on HTMX for my websocket work - so I may simply have missed it in the code.)

1 Like

Ken, below it says sensor-test, but maybe it should be sensor-test/ ? I will test that out, I think the ws/ I coded my WS base class to do for me.

import {Gui, Switch} from "../core/gui-widgets.js";
import { pure_js_on_load } from "../core/js_utils.js";
import { SensorTestClient } from "./ws-client.js";

const BACKEND_PATH = 'sensor-test';

class SensorTestGui extends Gui
{
    // Pass in the html element id (without the #) where you
    // would like error/status text to display.  This is done
    // simply by $().text(...) jquery method to set the element's
    // textual content.
    constructor(backend_path, err_element=null)
    {
        super(null); // No html id needed here
        this._update_auto_sw = new Switch("update-auto");
        this._update_auto_sw.on_checked = () => { this.enable_auto_update(); };
        this._update_auto_sw.on_unchecked = () => { this.disable_auto_update(); };
        this._client = new SensorTestClient(backend_path, err_element);
        this._client.update_sensor_value = (sens, val) => { this.upodate_sensor_value(sens, val); };
        this._sensors = {
            HT3 : null
        };
    }

    enable_auto_update()
    {
        this._client.connect();
    }

    disable_auto_update()
    {
        this._client.disconnect();
    }

    update_sensor_value(sensor_var, float_value)
    {
        $(`#${sensor_var}`).text(float_value);
    }
}

// Global instances of our classes:
var sensor_test_gui = null;

pure_js_on_load((event) =>
{
    sensor_test_gui = new SensorTestGui(BACKEND_PATH, "error-status");
});

I’ll also do some googling on how to gather that information: channel name and group members in Redis.

1 Like

If you’re not familiar with redis-cli, you should be if you’re going to be working with channels. (Not strictly required, but I’ve found it extremely helpful while trying to understand what’s going on.)

the keys * command will show you all keys known to redis.

You should find a key with a name like asgi:group:sensor_update_group.

You can then get a list of connected channels by using the zscan command: zscan asgi:group:sensor_update_group 0

These should match up with the channel names of the consumers created by the client connections.

2 Likes

Ken, you’ve guided me to find the coder’s error. It was the missing / slash on sensor-test/ I guess! I was messing around with that. But perhaps had other issues, so those got solved but when I went back to this issue, I had forgotten to propagate my new URL convention throughout all the code.

I will bookmark your latest post under WebSocket/ for future troubleshooting reference!

You’ve saved me/the project, once again!

Mahalo :smiley:.

2 Likes