WebSocket Message Not Received in Browser Ubuntu Server (Docker)

When an error occurs during a Celery task (e.g., video unavailable), the WebSocket message is logged as “sent” by the Celery worker, but the browser WebSocket client never receives it, and the WebSocket connection remains open. This issue occurs on my Ubuntu 22.04 server (Docker), but everything works fine on Mac. Additionally, logging output is inconsistent on the Ubuntu server, making it harder to debug the issue.

# consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
import logging

logger = logging.getLogger(__name__)

class DownloadProgressConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.task_id = self.scope["url_route"]["kwargs"]["task_id"]
        self.task_group_name = f"task_{self.task_id}"

        # Join the group for task-specific updates
        await self.channel_layer.group_add(self.task_group_name, self.channel_name)
        await self.accept()
        logger.info(f"WebSocket connection established for task {self.task_id}")

    async def disconnect(self, close_code):
        # Leave the group when WebSocket disconnects
        await self.channel_layer.group_discard(self.task_group_name, self.channel_name)
        logger.info(f"WebSocket connection closed for task {self.task_id}")

    async def progress_update(self, event):
        try:
            logger.info(f"Received event in WebSocket for task {self.task_id}: {event}")
            await self.send(text_data=json.dumps(event))

            # Close socket on completion/failure
            if event["status"] in ["Completed", "Failed"] or event["stage"] in ["error"]:
                logger.info(f"Closing WebSocket for task {self.task_id} due to status: {event['status']}")
                await self.close()
        except Exception as e:
            logger.error(f"Error handling progress update for task {self.task_id}: {e}")
            await self.close()

    async def websocket_close(self, event):
        try:
            logger.info(f"WebSocket close requested for task {self.task_id}")
            await self.close()
        except Exception as e:
            logger.error(f"Error closing WebSocket for task {self.task_id}: {e}")

# tasks.py
from celery import shared_task
from .models import DownloadTask
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
import logging

logger = logging.getLogger(__name__)

@shared_task(bind=True)
def download_video(self, task_id, original_payload):
    task = DownloadTask.objects.get(id=task_id)
    channel_layer = get_channel_layer()

    try:
        # Simulate fetching video metadata and processing
        task.status = "in_progress"
        task.save()

        # Simulate an error (e.g., VideoUnavailable)
        raise Exception("Video unavailable")

    except Exception as e:
        logger.info(f"Error downloading video msg: {str(e)}")
        task.status = "failed"
        task.save()
        notify_progress_update("error", task_id, channel_layer, error_message=str(e))

def notify_progress_update(stage, task_id, channel_layer, error_message=None):
    payload = {
        "type": "progress_update",  # This matches the consumer method name
        "stage": stage,
        "status": DownloadTask.objects.get(id=task_id).status,
        "task_id": str(task_id),
        "error_message": error_message,
    }

    logger.info(f"Attempting to send payload: {payload}")
    try:
        async_to_sync(channel_layer.group_send)(
            f"task_{task_id}",  # Group name the WebSocket consumer subscribes to
            payload
        )
        logger.info(f"Successfully sent payload to group task_{task_id}")
    except Exception as e:
        logger.error(f"Error sending payload to group: {e}")

Kindly help me fix this, also the logging part is getting so hard to configure and make it work for everything.

Welcome @iRajatDas !

We’re going to need a lot more details and clarification about the symptoms you are seeing.

First, please start by identifying what all you’re running on the server for this. Are you running this in Daphne? (Something else?

What are you using for the channel layer?

Have you identified any similar issues when there’s not an error involved?

What have you done to try and trace the progress of the message from the celery task through to the consumer, and then out to the browser?

You show some logging statements - please post the logs that you are getting, or identify the logs you’re expecting to get but aren’t seeing.

Other than the operating system, what other difference exists between the two environments? (You mention using docker containers on Ubuntu, are you using these same containers on the Mac?)

From your use of the scope to identify the task ID, does this mean you’re opening multiple websockets in the browser, one for each task? (We may need to see your websocket management code for the browser, but maybe not yet.)

Please clarify what you mean by this.

  1. I am using uvicorn to run the server.
  2. I am not sure what channel layer means here, but I am using Redis I believe
  3. There are no error, even the try excect says “message has sent”, but in reality it did not sent anything (my consumers.py does not logs anything for some reason, not able to get logging work either)

Running on Mac M1 (Sonoma 14.6.1), where it works as expected, I dont use any OS within the Docker image

When using the same codebase on a VPS with Ubuntu 22.04, IN THE EXCEPTION BLOCK (Where some video is unavailable) THE SOCKET MESSAGE DOES NOT SENDS NOR IT SHOWS ANY ERROR

Please see this video (SORRY FOR THIS) to understand the issue better:

Here’s my project structure (to identify any misconfiguration) with some files related to the issue:

.
├── Dockerfile
├── README.md
├── db.sqlite3
├── docker-compose.yml
├── downloader
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── consumers.py
│   ├── migrations
│   │   ├── 0001_initial.py
│   │   ├── 0002_alter_downloadtask_resolution.py
│   │   ├── 0003_downloadtask_callback_url.py
│   │   ├── 0004_downloadtask_file_size_downloadtask_stage_and_more.py
│   │   └── __init__.py
│   ├── models.py
│   ├── routing.py
│   ├── tasks.py
│   ├── templates
│   │   └── downloader
│   │       └── index.html
│   ├── tests.py
│   ├── urls.py
│   └── views.py
├── manage.py
├── requirements.txt
├── tokens.json
└── youtube_downloader
    ├── __init__.py
    ├── asgi.py
    ├── celery.py
    ├── settings.py
    ├── urls.py
    └── wsgi.py

asgi.py:

import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from downloader.routing import websocket_urlpatterns

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'youtube_downloader.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            websocket_urlpatterns
        )
    ),
})

consumers.py:

import json
from channels.generic.websocket import AsyncWebsocketConsumer
import logging

logging.basicConfig()

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logging.info("Logger initialized for consumers.py")


class DownloadProgressConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.task_id = self.scope["url_route"]["kwargs"]["task_id"]
        self.task_group_name = f"task_{self.task_id}"

        # Join the group for task-specific updates
        await self.channel_layer.group_add(self.task_group_name, self.channel_name)
        await self.accept()
        logger.info(f"WebSocket connection established for task {self.task_id}")

    async def disconnect(self, close_code):
        # Leave the group when WebSocket disconnects
        await self.channel_layer.group_discard(self.task_group_name, self.channel_name)
        logger.info(f"WebSocket connection closed for task {self.task_id}")

    async def progress_update(self, event):
        try:
            logger.info(f"Received event in WebSocket for task {self.task_id}: {event}")
            await self.send(text_data=json.dumps(event))

            # Close socket on completion/failure
            if event["status"] in ["Completed", "Failed"] or event["stage"] in [
                "error"
            ]:
                logger.info(
                    f"Closing WebSocket for task {self.task_id} due to status: {event['status']}"
                )
                await self.close()
        except Exception as e:
            logger.error(f"Error handling progress update for task {self.task_id}: {e}")
            await self.close()

    async def websocket_close(self, event):
        try:
            logger.info(f"WebSocket close requested for task {self.task_id}")
            await self.close()
        except Exception as e:
            logger.error(f"Error closing WebSocket for task {self.task_id}: {e}")

tasks.py:

def upload_file_with_progress(
    file_path, bucket_name, key_name, storage_options, task_id, channel_layer, metadata
):
    """
    Upload a file to S3 with progress tracking.
    """
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=storage_options["access_key"],
        aws_secret_access_key=storage_options["secret_key"],
        endpoint_url=storage_options["endpoint_url"],
    )

    # Transfer configuration for progress tracking
    config = TransferConfig(
        multipart_threshold=1024 * 25, max_concurrency=10, multipart_chunksize=1024 * 25
    )

    # Initialize the progress tracker
    progress = ProgressPercentage(file_path, task_id, channel_layer, metadata)

    # Upload the file with the progress tracker
    s3_client.upload_file(
        file_path, bucket_name, key_name, Config=config, Callback=progress
    )


@shared_task(bind=True)
def download_video(self, task_id, original_payload):
    """
    Celery task for downloading video and audio from YouTube, merging, and uploading.
    """
    logger.info(f"Starting to process video with ID: {task_id}")
    task = DownloadTask.objects.get(id=task_id)
    channel_layer = get_channel_layer()

    # Initialize variables for cleanup
    video_filename = None
    audio_filename = None
    output_filename = None

    try:
        # --- Fetch video metadata ---
        task.status = "in_progress"
        task.stage = "fetching_metadata"
        task.save()

        yt = YouTube(
            task.url,
            use_oauth=True,
            allow_oauth_cache=True,
            token_file=os.path.join(
                os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
                "tokens.json",
            ),
        )
        video_metadata = {
            "title": yt.title,
            "views": yt.views,
            "channel_name": yt.author,
            "thumbnail": yt.thumbnail_url,
            "duration": yt.length,
            "original_payload": original_payload,
        }

        # --- Select video quality based on resolution ---
       # [CODE REMOVED]

        upload_file_with_progress(
            output_filename,
            bucket_name,
            key_name,
            storage_options,
            task_id,
            channel_layer,
            video_metadata,
        )

        download_url = generate_s3_signed_url(key_name)
        file_size = os.path.getsize(output_filename)

        # --- Complete the process ---
        task.status = "completed"
        task.stage = "completed"
        task.progress = 100.0
        task.file_size = file_size
        task.save()

        video_metadata.update(
            {
                "download_url": download_url,
                "download_size": file_size,
            }
        )

        notify_progress_update(
            "completed",
            task_id,
            channel_layer,
            video_metadata,
            progress=100,
            download_url=download_url,
        )

    except (
        VideoUnavailable,
        AgeRestrictedError,
        VideoPrivate,
        LiveStreamError,
        MembersOnly,
        VideoRegionBlocked,
        UnknownVideoError,
        RecordingUnavailable,
    ) as e:
        error_messages = {
            VideoUnavailable: "Sorry, but this video is simply not available.",
            AgeRestrictedError: "Oops! Looks like you're too young to watch this video.",
            VideoPrivate: "Sorry, you're not invited to watch this private video.",
            LiveStreamError: "Unfortunately, you can't download a live stream.",
            MembersOnly: "This video is exclusively for members only.",
            VideoRegionBlocked: "Sorry, this video is blocked in your region.",
            UnknownVideoError: "Oops! An unknown error occurred while processing the video.",
            RecordingUnavailable: "Sorry, the recording of this live stream is not available.",
        }

        error_message = error_messages.get(type(e), "An error occurred.")
        task.status = "failed"
        task.stage = "error"
        task.save()
        logger.info(f"Error downloading video msg: {error_message}")

        notify_progress_update(
            "error",
            task_id,
            channel_layer,
            metadata=None,
            error_message=error_message,
        )

    except Exception as e:
        logger.info(f"Error downloading video: {str(e)}")
        notify_progress_update(
            "error", task_id, channel_layer, metadata=None, error_message=str(e)
        )

        task.status = "failed"
        task.stage = "error"
        task.save()
    finally:
        # Cleanup files if they were created
        try:
            if video_filename and os.path.exists(video_filename):
                os.remove(video_filename)
            if audio_filename and os.path.exists(audio_filename):
                os.remove(audio_filename)
            if output_filename and os.path.exists(output_filename):
                os.remove(output_filename)
        except Exception as cleanup_error:
            logger.info(f"Cleanup failed: {str(cleanup_error)}")

celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "youtube_downloader.settings")

app = Celery("youtube_downloader")

# Using a string here means the worker doesn’t have to serialize
# the configuration object to child processes.
app.config_from_object("django.conf:settings", namespace="CELERY")


# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

docker-compose.yml:

services:
  web:
    build: .
    command: uvicorn youtube_downloader.asgi:application --host 0.0.0.0 --port 8000
    volumes:
      - .:/app
    ports:
      - "8000:8000"
    depends_on:
      - db
      - redis
    env_file:
      - .env

  worker:
    build: .
    command: celery -A youtube_downloader worker --loglevel=info --concurrency=4
    volumes:
      - .:/app
    depends_on:
      - db
      - redis
    env_file:
      - .env

  db:
    image: postgres:16
    volumes:
      - postgres_data:/var/lib/postgresql/data/
    env_file:
      - .env

  redis:
    restart: always
    image: redis:latest

volumes:
  postgres_data:

requirements.txt:
asgiref==3.8.1
boto3==1.35.5
botocore==1.35.5
channels==4.1.0
Django==5.1
python-decouple==3.8
pytubefix==6.15.4
redis==5.0.8
redis_lock==0.2.0
celery==5.4.0
django-celery-beat==2.7.0
django-celery-results==2.5.1
psycopg2==2.9.9
django-storages==1.14.4
channels_redis==4.2.0
uvicorn[standard]==0.30.6
wsproto==1.2.0
websockets==13.0.0
django-cors-headers==4.4.0
eventlet

1 Like

@KenWhitesell I am still looking for the solution, In the meanwhile I tried to deploy the same on Ubuntu 24 but issue remains same

Please help me fix/or debug the issue.

Thanks

You’ve got two separate processes running on the server, but your description still isn’t clear where the problems are happening.

So you’re saying that in monitoring your redis instance, you have verified that the group_send does not issue the message to redis?

You’re saying that this message is not getting logged?

So what needs to be determined / clarified:

Is the Celery task issuing the group_send call? (If your logging isn’t working, how have you verified this?)

Have you verified that your consumer is joined to the groups for each task? (If so, how?)

If the messages have been sent by the Celery task, have you verified that the consumer is receiving them? (If so, how?)

It’s a little more awkward, but if you’re having problems getting the logging to work (you haven’t identified what the issue are there), I would suggest adding some print statements to your Celery task and run it directly from a terminal session to get a better idea of what exactly is happening within it.

It might also help to do the same with the Consumer as well.

Depending upon what your research shows, it may be helpful to try it with Daphne instead of uvicorn.

Hi, the application works as expected when accessed via the IP address, but issues arise when accessing it through the domain with SSL enabled. I’m using Cloudflare to manage my domains and SSL, and I’ve already verified that the WebSocket option in Cloudflare is enabled.

Cloudflare SSL mode is set to “Flexible”

I recently switched from uvivorn to daphne as you suggested.

Here’s my nginx.conf:

events {
    worker_connections 1024;
}

http {
    server {
        listen 80;
        server_name api.rapidyt.com;

        # Redirect HTTP traffic to HTTPS
        location / {
            proxy_pass http://web:8000;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
        }

        # WebSocket configuration for Django Channels
        location /ws/ {
            proxy_pass http://web:8000;
            proxy_http_version 1.1;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "Upgrade";
            # Additional headers for WebSocket stability
            proxy_read_timeout 86400;
            proxy_send_timeout 86400;
        }
    }
}

docker-compose.yml

services:
  web:
    build: .
    command: daphne -b 0.0.0.0 -p 8000 youtube_downloader.asgi:application
    volumes:
      - .:/app
    expose:
      - "8000"
    depends_on:
      - db
      - redis
    env_file:
      - .env

  worker:
    build: .
    command: celery -A youtube_downloader worker --loglevel=INFO -E --concurrency=4
    volumes:
      - .:/app
    depends_on:
      - db
      - redis
    env_file:
      - .env

  flower:
    build: .
    command: celery -A youtube_downloader flower --port=5555
    ports:
      - "5555:5555"
    depends_on:
      - redis
    env_file:
      - .env

  db:
    image: postgres:16
    volumes:
      - postgres_data:/var/lib/postgresql/data/
    env_file:
      - .env

  redis:
    restart: always
    image: redis:latest

  nginx:
    image: nginx:latest
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    ports:
      - "80:80"
    depends_on:
      - web

volumes:
  postgres_data:

Kindly help me understand what might be causing this issue.