Django as a client with websocket connection

Hello everyone,

I am working on a Django application where I need to establish a WebSocket connection to receive real-time data from a third-party server. I am already using Django Channels for WebSocket connections between the server and browser clients, but this situation is a bit different. In this case, Django needs to act as a WebSocket client, connecting to an external server to receive data every second per stream (for 10 open streams, I receive 10 messages per second), rather than the typical scenario where Django acts as the WebSocket server waiting for client connections.

This WebSocket connection needs to be persistent, starting as soon as the Django server is launched.

I’m unsure whether Django Channels is the best choice for handling this, given that I’ve only used it when Django acts as the server for browser clients. Would it be better to manage the connection using a custom management command and then broadcast the data received via Django Channels using group_send?

Additionally, I’m using Uvicorn and Redis as channel layer in this application.

Lastly, I don’t need to store all the data in a database, as the previously received data can be discarded every second, once the new data arrives. Only specific data with “subscribe: true” needs to be stored in the DB.

What architecture would be most suitable for setting up this kind of WebSocket connection where Django acts as the client to a third-party server, starting from the server’s launch?

This would be a solution.

Personally, I’d be looking in a slightly different direction. My inclination would be to create a Channels Worker. This would be a class that could create its own websocket connections to the external server(s), but is being run within an async context within Daphne. (Being a consumer class, this sets it up being able to receive channels messages in addition to sending them.)

There may or may not be other “administrative” or “management” operations you would like to perform on this process. (For example, you might want to track some kind of status, provide for an in-process restart, counting packets, whatever.) If that’s true, then setting up a worker process like this helps facilitate that. (I did this once a long time ago - back in the Channels v1 days. It worked really well.)

1 Like

Nice, I’ll give that a try and get back to you with updates.

Thank you for your expertise!

Hi @KenWhitesell, I hope everything is going well.

I tried to implement the channel worker, and after a couple of days of study and research, I’m back here on the forum to ask for some advice on the code I’ve developed, though I’m not sure I’ve used the correct approach (as it’s not working properly).

Let me explain the logic I’ve adopted, as I couldn’t find a clear and complete guide on the topic: on the configuration side, I implemented the “ws-listener” channel worker, which is launched regularly with the command python manage.py runworker ws-listener.

In this worker, I inserted the logic for connecting to the websocket (if it’s correct).

The task of this worker, besides running an infinite background task, is also to send and distribute the received message via group_send.

To start the worker, however, I still had to use a management command, which would just send the start input to the channel worker. But at this point, I receive the error “Failed to connect: You must implement application_send().”

I was wondering if the architecture I set up is the correct solution for implementing the channel worker properly, or if you could give me some additional guidelines, I’d be very grateful.

Below is the code I wrote:

# asgi.py
application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": URLRouter(urls.websocket_urlpatterns),
    "channel": ChannelNameRouter({
        "ws-listener": WSListenerConsumer.as_asgi(),
    })
})

# urls.py
websocket_urlpatterns = [
    path("ws/client/", WsClientConsumer.as_asgi()),
]
# channel worker consumer
class WSListenerConsumer(AsyncConsumer):
    async def ws_listener(self, event):
        ws_url = "wss://stream....."

        channel_layer = get_channel_layer()

        try:
            async with websockets.connect(ws_url) as ws:
                while True:
                    try:
                        response = await ws.recv()
                        data = json.loads(response)

                        await channel_layer.group_send(
                            "ws_data",
                            {
                                "type": "send_ws_data",
                                "data": data
                            },
                        )

                    except websockets.ConnectionClosedError as e:
                        print(f"Connection closed unexpectedly: {e}")
                        break
                    except websockets.ConnectionClosedOK as e:
                        print(f"Connection closed normally: {e}")
                        break
                    except websockets.WebSocketProtocolError as e:
                        print(f"Protocol error: {e}")
                        break
                    except websockets.InvalidStatusCode as e:
                        print(f"Invalid status code: {e}")
                        break
        except Exception as e:
            print(f"Failed to connect: {e}")
# client browser consumer
class WsClientConsumer(JsonWebsocketConsumer):
    def connect(self):
        self.group_name = "ws_data"

        self.accept()

        async_to_sync(self.channel_layer.group_add)(
            self.group_name,
            self.channel_name
        )

    def disconnect(self, close_code):
        async_to_sync(self.channel_layer.group_discard)(
            self.group_name,
            self.channel_name
        )

    def send_ws_data(self, event):
        data = event['data']

        # Sends the data to the WebSocket
        self.send(text_data=json.dumps({
            'type': 'ws_data',
            'data': data
        }))
# management command
class Command(BaseCommand):
    help = 'Listen to third-party websocket'

    def handle(self, *args, **kwargs):
        asyncio.run(self.start_ws_listener())

    async def start_ws_listener(self):
        channel_layer = get_channel_layer()
        await channel_layer.send(
            "ws-listener",
            {
                "type": "ws_listener",
                "id": 123456789,
            },
        )

Am I missing something? Is this the right way to do it? Or maybe the management command isn’t necessary?

Thank you

I’m not seeing anything fundamentally wrong with what you have, and the test project I created to try the worker works fine for me.

Ahh… you’re using a JsonWebsocketConsumer, which means you want to use self.send_json instead of self.send. See the docs for JsonWebsocketConsumer.

Also, in the future, please post the complete error message with traceback if an error occurs. It really does help track down the cause of an issue.

Edit: When you use send_json, you pass it the actual dict - do not use json.dumps to serialize it first.

1 Like