Hi @KenWhitesell, I hope everything is going well.
I tried to implement the channel worker, and after a couple of days of study and research, I’m back here on the forum to ask for some advice on the code I’ve developed, though I’m not sure I’ve used the correct approach (as it’s not working properly).
Let me explain the logic I’ve adopted, as I couldn’t find a clear and complete guide on the topic: on the configuration side, I implemented the “ws-listener” channel worker, which is launched regularly with the command python manage.py runworker ws-listener
.
In this worker, I inserted the logic for connecting to the websocket (if it’s correct).
The task of this worker, besides running an infinite background task, is also to send and distribute the received message via group_send
.
To start the worker, however, I still had to use a management command, which would just send the start input to the channel worker. But at this point, I receive the error “Failed to connect: You must implement application_send().”
I was wondering if the architecture I set up is the correct solution for implementing the channel worker properly, or if you could give me some additional guidelines, I’d be very grateful.
Below is the code I wrote:
# asgi.py
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": URLRouter(urls.websocket_urlpatterns),
"channel": ChannelNameRouter({
"ws-listener": WSListenerConsumer.as_asgi(),
})
})
# urls.py
websocket_urlpatterns = [
path("ws/client/", WsClientConsumer.as_asgi()),
]
# channel worker consumer
class WSListenerConsumer(AsyncConsumer):
async def ws_listener(self, event):
ws_url = "wss://stream....."
channel_layer = get_channel_layer()
try:
async with websockets.connect(ws_url) as ws:
while True:
try:
response = await ws.recv()
data = json.loads(response)
await channel_layer.group_send(
"ws_data",
{
"type": "send_ws_data",
"data": data
},
)
except websockets.ConnectionClosedError as e:
print(f"Connection closed unexpectedly: {e}")
break
except websockets.ConnectionClosedOK as e:
print(f"Connection closed normally: {e}")
break
except websockets.WebSocketProtocolError as e:
print(f"Protocol error: {e}")
break
except websockets.InvalidStatusCode as e:
print(f"Invalid status code: {e}")
break
except Exception as e:
print(f"Failed to connect: {e}")
# client browser consumer
class WsClientConsumer(JsonWebsocketConsumer):
def connect(self):
self.group_name = "ws_data"
self.accept()
async_to_sync(self.channel_layer.group_add)(
self.group_name,
self.channel_name
)
def disconnect(self, close_code):
async_to_sync(self.channel_layer.group_discard)(
self.group_name,
self.channel_name
)
def send_ws_data(self, event):
data = event['data']
# Sends the data to the WebSocket
self.send(text_data=json.dumps({
'type': 'ws_data',
'data': data
}))
# management command
class Command(BaseCommand):
help = 'Listen to third-party websocket'
def handle(self, *args, **kwargs):
asyncio.run(self.start_ws_listener())
async def start_ws_listener(self):
channel_layer = get_channel_layer()
await channel_layer.send(
"ws-listener",
{
"type": "ws_listener",
"id": 123456789,
},
)
Am I missing something? Is this the right way to do it? Or maybe the management command isn’t necessary?
Thank you