rawdataset/ytdlps3/download_and_upload.py (60 lines of code) (raw):

import os import sys import boto3 from yt_dlp import YoutubeDL def download_youtube_video(video_id, output_path): ydl_opts = { 'format': 'best', 'writesubtitles': True, 'subtitleslangs': ['en'], 'subtitlesformat': 'vtt', 'writeinfojson': True, 'skip_download': False, 'outtmpl': os.path.join(output_path, f'{video_id}.%(ext)s'), } with YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(video_id, download=True) # Get the correct subtitle file path from the info_dict subtitle_file_path = None subtitles = info_dict.get('subtitles') if subtitles and 'en' in subtitles: subtitle_data = subtitles['en'][0] # Get the first English subtitle entry subtitle_file_path = ydl.prepare_filename(info_dict).replace('.mp4', '.en.vtt') return info_dict, subtitle_file_path def upload_to_s3(local_file_path, s3_bucket, s3_key): s3_client = boto3.client('s3') s3_client.upload_file(local_file_path, s3_bucket, s3_key) def log_failure(video_id, error_message, s3_bucket, s3_path): error_file_path = f"/tmp/{video_id}.txt" with open(error_file_path, 'w') as f: f.write(error_message) # Upload the error file to S3 in the failed/ subfolder s3_client = boto3.client('s3') s3_client.upload_file(error_file_path, s3_bucket, f"failed/{video_id}.txt") def process_video(video_id, s3_bucket, s3_path): try: # Create a temporary directory to store downloaded files download_path = '/tmp/youtube_downloads' os.makedirs(download_path, exist_ok=True) # Download the video, subtitles (if available), and metadata info_dict, subtitle_file_path = download_youtube_video(video_id, download_path) # Define file paths video_file = os.path.join(download_path, f'{video_id}.mp4') metadata_file = os.path.join(download_path, f'{video_id}.info.json') # Upload each file to the specified S3 path if it exists if os.path.exists(video_file): upload_to_s3(video_file, s3_bucket, os.path.join(s3_path, f'{video_id}.mp4')) if os.path.exists(metadata_file): upload_to_s3(metadata_file, s3_bucket, os.path.join(s3_path, f'{video_id}.json')) if subtitle_file_path and os.path.exists(subtitle_file_path): upload_to_s3(subtitle_file_path, s3_bucket, os.path.join(s3_path, f'{video_id}.en.vtt')) # Cleanup for file_name in os.listdir(download_path): os.remove(os.path.join(download_path, file_name)) except Exception as e: error_message = str(e) log_failure(video_id, error_message, s3_bucket, s3_path) def main(video_ids, s3_bucket, s3_path): for video_id in video_ids: process_video(video_id, s3_bucket, s3_path) if __name__ == "__main__": if len(sys.argv) < 4: print("Usage: python download_and_upload.py <s3_bucket_name> <s3_path> <youtube_video_id_1> [<youtube_video_id_2> ...]") sys.exit(1) s3_bucket = sys.argv[1] s3_path = sys.argv[2] video_ids = sys.argv[3:] main(video_ids, s3_bucket, s3_path)