Using background tasks in Django, but how exactly?

Whenever a frontend client starts a session to my django backend, I need to create a specific object for some iot network. That object handles the data, the connection and provides some functionality working with said network. Right now I use WebSockets. On_connect a new Object (Node) is created like:

class NodeConsumer(WebSockerConsumer):    
    def connect(self):
        self.node = Node()
        self.accept() 

    def receive(self, text_data=None):
        data = self.node.request_some_data()
        self.send(data)

This is a rought example on how the current workflow is build.
The problem is, that for every websocket connection a new node is created, which is not really nececarry.
E.g. when every client wants to connect to the same iot network, a single node, that handles to connection, would be sufficient.
Also, to differentiate which service the client requests, I have to parse text_data for the requested service ID or the like. I would be useful to be abled to requst some services via a rest-like api.

I read about celery and background workers but can not get them to work. I would need to spawn new workers on demand, check which workers run which Node. And I need to access methods on these Nodes and return their data, within a HTTP:GET request.

Any idea how I can achieve that? Ask if anything is unclear.

If you’re building this using Channels, then I suggest using a Channels worker for this.

The worker itself could be a “node manager” tracking what tasks are running, starting and stopping them as necessary. That manager is what would act on channel messages sent to it from a consumer.

The individual worker tasks would not respond directly to HTTP GET requests. They would write their updates to the database, where the regular Django views can read and format that data for presentation.

Another possibility, depending upon how frequently and “up-to-date” you need to be with the data presentation would be to have the worker task update the data in redis. The Django view can read the redis data to retrieve current values.

(This is the architecture we use for a very similar situation. We chose redis as the intermediary data store because the data is transient and there’s no need to persist it. In case you’re wondering about “overhead”, in this particular project, we’re processing incoming messages at a 10hz rate with no issues.)

If channels is not involved, I would still use the channel infrastructure - using redis as the means of communication between Django and the channels worker.

Celery really isn’t the best tool for this.

Regardless of what mechanism you use to communicate between Django and an external process manager, every reasonable suggestion I can think of still comes down to that basic design. Run an external persistent process that exchanges messages with the Django environment. That external process is what manages the tasks (threads, subprocesses, etc.) in an on-demand basis. I’ve just found that using the Channels library for this has been the easiest path toward implementation.

1 Like

So the idea would be to have channels worker that spawns and closes the Nodes on demand. When requested the workerd sends the corresponsing data though channels to the client; or in another szenario stores the updates in redis?

Since we are using a seperate react frontend I would like to retrieve the date via GET requests:
For model values:
Frontend: GET request → Django: GET endpoint gets cached values from redis or requests new ones

Another use case is sending commands to other devices. Can I call methods on the Nodes via the channels worker or do I have to define a channels message format that includes information like: a command should be send, which command, to which remote address and so on?

On demand or periodically - that’s up to you. (In part, it may depend upon whether the data feed you’re getting from the controller is a “push” or “pull” situation.)

I’ve had to deal with both. In one project, the device sends out updates at a 10hz rate. The worker receives the incoming packet and writes it out to redis. In another case, the consumer would send a message at 1hz to the worker node, which would then query the device and return the response.

Yes, this works.

That’s up to you and what you define your communication paths to be.

You could make your worker nodes “consumers” in their own right. Or, you could have the communication paths work through something like a Python queue. Or, if you manage them as external processes you could communicate through TCP or Unix domain sockets. That’s all entirely up to you.

So I tried to setup a simple poc, but cant get it to work. Somehow my worker does not receive any messages via the channels layer.

# asgi.py
application = ProtocolTypeRouter({
    "http": django_asgi_app,
    "websocket": URLRouter(websocket_urlpatterns),
    "channel": ChannelNameRouter({
        "node-manager": NodeManagerConsumer.as_asgi(),
    })
})
# consumer.py
class NodeConsumer(WebsocketConsumer):
    def connect(self):
        print("attempting connection") 
        async_to_sync(self.channel_layer.send(
            "node-manager",
            {
                "type": "request",
                "ip": "host.docker.internal",
                "port": "5123",
            }
        ))
        self.accept()
# consumer.py
class NodeManagerConsumer(SyncConsumer):
    def request(self, message):
        print("requested node")

This is my setup and I started the worker with python manage.py runworker node-manager but nothing happens. I dont know what Iam missing here. Redis is running and connected and when I checked the logs it does get some messages when I connect to the WebSocket Consumer.

Please show the console logs from both your runserver and runworker instances, where you’re starting them up and connecting to the websocket consumer from a browser.

Worker:

> python .\manage.py runworker node-manager
Could not find platform independent libraries <prefix>
Running worker for channels ['node-manager']

Websocket

> python manage.py runserver
Could not find platform independent libraries <prefix>
Could not find platform independent libraries <prefix>
Watching for file changes with StatReloader
Performing system checks...

System check identified no issues (0 silenced).
February 14, 2024 - 14:24:09
Django version 5.0.2, using settings 'consumer_node_test.settings'
Starting ASGI/Daphne version 4.0.0 development server at http://127.0.0.1:8000/
Quit the server with CTRL-BREAK.
WebSocket HANDSHAKING /compass/node [127.0.0.1:53327]
attempting connection
WebSocket CONNECT /compass/node [127.0.0.1:53327]

And thats it. Nothing other than that happens.

There are two things I would try:

I don’t have any firm justification for either of these suggestions as being better than what you have, other than they both fit into the category of “it works for me.”

Tried that, still nothing. I am really clueless on what might be the problem here. I checked the channel names, the method names, everything. But the worker does nothing…
I also moved the self.channel_layer.send() call to the receive() method. Still silence on the worker side.

Ok, lets try separating the sending of the message from the websocket consumer.

You can send messages directly from the Django shell.

Try issuing the following commands:

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

channel_layer = get_channel_layer()
async_to_sync(channel_layer.send)('node-manager',{'type': 'request', 'message': 'A message'})

If this works, you should see the message printed on your runworker console.

If it doesn’t, then there’s something not right with your connection between the worker and the channel layer.

Note; I’m extremely concerned about this message:

I believe this is an indication that your virtual environment might be corrupted. I suggest you rebuild your venv.

I somehow got it to work, however, I do not know what the solution was. But it works. Regarding Could not find platform independent libraries <prefix>: I looked it up, when it first appeared and the answer was not to worrie about it. I renewed my venv multiple time now and still get the same message.