- I am using uvicorn to run the server.
- I am not sure what channel layer means here, but I am using Redis I believe
- There are no error, even the try excect says “message has sent”, but in reality it did not sent anything (my consumers.py does not logs anything for some reason, not able to get logging work either)
Running on Mac M1 (Sonoma 14.6.1), where it works as expected, I dont use any OS within the Docker image
When using the same codebase on a VPS with Ubuntu 22.04, IN THE EXCEPTION BLOCK (Where some video is unavailable) THE SOCKET MESSAGE DOES NOT SENDS NOR IT SHOWS ANY ERROR
Please see this video (SORRY FOR THIS) to understand the issue better:
Here’s my project structure (to identify any misconfiguration) with some files related to the issue:
.
├── Dockerfile
├── README.md
├── db.sqlite3
├── docker-compose.yml
├── downloader
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── consumers.py
│ ├── migrations
│ │ ├── 0001_initial.py
│ │ ├── 0002_alter_downloadtask_resolution.py
│ │ ├── 0003_downloadtask_callback_url.py
│ │ ├── 0004_downloadtask_file_size_downloadtask_stage_and_more.py
│ │ └── __init__.py
│ ├── models.py
│ ├── routing.py
│ ├── tasks.py
│ ├── templates
│ │ └── downloader
│ │ └── index.html
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── manage.py
├── requirements.txt
├── tokens.json
└── youtube_downloader
├── __init__.py
├── asgi.py
├── celery.py
├── settings.py
├── urls.py
└── wsgi.py
asgi.py:
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from downloader.routing import websocket_urlpatterns
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'youtube_downloader.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
websocket_urlpatterns
)
),
})
consumers.py:
import json
from channels.generic.websocket import AsyncWebsocketConsumer
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logging.info("Logger initialized for consumers.py")
class DownloadProgressConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.task_id = self.scope["url_route"]["kwargs"]["task_id"]
self.task_group_name = f"task_{self.task_id}"
# Join the group for task-specific updates
await self.channel_layer.group_add(self.task_group_name, self.channel_name)
await self.accept()
logger.info(f"WebSocket connection established for task {self.task_id}")
async def disconnect(self, close_code):
# Leave the group when WebSocket disconnects
await self.channel_layer.group_discard(self.task_group_name, self.channel_name)
logger.info(f"WebSocket connection closed for task {self.task_id}")
async def progress_update(self, event):
try:
logger.info(f"Received event in WebSocket for task {self.task_id}: {event}")
await self.send(text_data=json.dumps(event))
# Close socket on completion/failure
if event["status"] in ["Completed", "Failed"] or event["stage"] in [
"error"
]:
logger.info(
f"Closing WebSocket for task {self.task_id} due to status: {event['status']}"
)
await self.close()
except Exception as e:
logger.error(f"Error handling progress update for task {self.task_id}: {e}")
await self.close()
async def websocket_close(self, event):
try:
logger.info(f"WebSocket close requested for task {self.task_id}")
await self.close()
except Exception as e:
logger.error(f"Error closing WebSocket for task {self.task_id}: {e}")
tasks.py:
def upload_file_with_progress(
file_path, bucket_name, key_name, storage_options, task_id, channel_layer, metadata
):
"""
Upload a file to S3 with progress tracking.
"""
s3_client = boto3.client(
"s3",
aws_access_key_id=storage_options["access_key"],
aws_secret_access_key=storage_options["secret_key"],
endpoint_url=storage_options["endpoint_url"],
)
# Transfer configuration for progress tracking
config = TransferConfig(
multipart_threshold=1024 * 25, max_concurrency=10, multipart_chunksize=1024 * 25
)
# Initialize the progress tracker
progress = ProgressPercentage(file_path, task_id, channel_layer, metadata)
# Upload the file with the progress tracker
s3_client.upload_file(
file_path, bucket_name, key_name, Config=config, Callback=progress
)
@shared_task(bind=True)
def download_video(self, task_id, original_payload):
"""
Celery task for downloading video and audio from YouTube, merging, and uploading.
"""
logger.info(f"Starting to process video with ID: {task_id}")
task = DownloadTask.objects.get(id=task_id)
channel_layer = get_channel_layer()
# Initialize variables for cleanup
video_filename = None
audio_filename = None
output_filename = None
try:
# --- Fetch video metadata ---
task.status = "in_progress"
task.stage = "fetching_metadata"
task.save()
yt = YouTube(
task.url,
use_oauth=True,
allow_oauth_cache=True,
token_file=os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"tokens.json",
),
)
video_metadata = {
"title": yt.title,
"views": yt.views,
"channel_name": yt.author,
"thumbnail": yt.thumbnail_url,
"duration": yt.length,
"original_payload": original_payload,
}
# --- Select video quality based on resolution ---
# [CODE REMOVED]
upload_file_with_progress(
output_filename,
bucket_name,
key_name,
storage_options,
task_id,
channel_layer,
video_metadata,
)
download_url = generate_s3_signed_url(key_name)
file_size = os.path.getsize(output_filename)
# --- Complete the process ---
task.status = "completed"
task.stage = "completed"
task.progress = 100.0
task.file_size = file_size
task.save()
video_metadata.update(
{
"download_url": download_url,
"download_size": file_size,
}
)
notify_progress_update(
"completed",
task_id,
channel_layer,
video_metadata,
progress=100,
download_url=download_url,
)
except (
VideoUnavailable,
AgeRestrictedError,
VideoPrivate,
LiveStreamError,
MembersOnly,
VideoRegionBlocked,
UnknownVideoError,
RecordingUnavailable,
) as e:
error_messages = {
VideoUnavailable: "Sorry, but this video is simply not available.",
AgeRestrictedError: "Oops! Looks like you're too young to watch this video.",
VideoPrivate: "Sorry, you're not invited to watch this private video.",
LiveStreamError: "Unfortunately, you can't download a live stream.",
MembersOnly: "This video is exclusively for members only.",
VideoRegionBlocked: "Sorry, this video is blocked in your region.",
UnknownVideoError: "Oops! An unknown error occurred while processing the video.",
RecordingUnavailable: "Sorry, the recording of this live stream is not available.",
}
error_message = error_messages.get(type(e), "An error occurred.")
task.status = "failed"
task.stage = "error"
task.save()
logger.info(f"Error downloading video msg: {error_message}")
notify_progress_update(
"error",
task_id,
channel_layer,
metadata=None,
error_message=error_message,
)
except Exception as e:
logger.info(f"Error downloading video: {str(e)}")
notify_progress_update(
"error", task_id, channel_layer, metadata=None, error_message=str(e)
)
task.status = "failed"
task.stage = "error"
task.save()
finally:
# Cleanup files if they were created
try:
if video_filename and os.path.exists(video_filename):
os.remove(video_filename)
if audio_filename and os.path.exists(audio_filename):
os.remove(audio_filename)
if output_filename and os.path.exists(output_filename):
os.remove(output_filename)
except Exception as cleanup_error:
logger.info(f"Cleanup failed: {str(cleanup_error)}")
celery.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "youtube_downloader.settings")
app = Celery("youtube_downloader")
# Using a string here means the worker doesn’t have to serialize
# the configuration object to child processes.
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
docker-compose.yml:
services:
web:
build: .
command: uvicorn youtube_downloader.asgi:application --host 0.0.0.0 --port 8000
volumes:
- .:/app
ports:
- "8000:8000"
depends_on:
- db
- redis
env_file:
- .env
worker:
build: .
command: celery -A youtube_downloader worker --loglevel=info --concurrency=4
volumes:
- .:/app
depends_on:
- db
- redis
env_file:
- .env
db:
image: postgres:16
volumes:
- postgres_data:/var/lib/postgresql/data/
env_file:
- .env
redis:
restart: always
image: redis:latest
volumes:
postgres_data:
requirements.txt:
asgiref==3.8.1
boto3==1.35.5
botocore==1.35.5
channels==4.1.0
Django==5.1
python-decouple==3.8
pytubefix==6.15.4
redis==5.0.8
redis_lock==0.2.0
celery==5.4.0
django-celery-beat==2.7.0
django-celery-results==2.5.1
psycopg2==2.9.9
django-storages==1.14.4
channels_redis==4.2.0
uvicorn[standard]==0.30.6
wsproto==1.2.0
websockets==13.0.0
django-cors-headers==4.4.0
eventlet