finealignment/video_alignment.py (298 lines of code) (raw):

import json import os from typing import List from scenedetect import VideoManager, SceneManager from scenedetect.detectors import ContentDetector from scenedetect.frame_timecode import FrameTimecode import argparse import re import json import boto3 from datetime import datetime # # Given an input list of videos, this script downloads them from S3 and aligns the metadata from those videos generated with video2annotation.py with the videos itself. # # The code is prepared to run as a standalone application: # The first parameter is size_chunk: it basically divide the list of videos in sublists of length size_chunk # The worker_number decides in which sublist of size size_chunk the current execution will be working on # --video-list is to specify the json file that contains a list of videoids as a JSON list. If that is not provided, it defaults to video_alignment_to_process.json # ### CONFIG ### bucket_name = '<bucket_name>' video_folder_path = 'videos_minioracle/' json_folder_path = 'videos_minioracle_results/' output_folder_path = 'results_minioracle_aligned/' ### # AWS S3 Configuration - specify your personal profile session = boto3.Session() s3_client = session.client('s3') # Function to download video from S3 def download_video_from_s3(video_key, local_path): try: s3_client.download_file(bucket_name, video_key, local_path) print(f"Downloaded {video_key} to {local_path}") return True except Exception as e: print(f"Failed to download {video_key} from S3: {e}") return False def handle_error(video_id: str, error_message: str, output_folder_path: str, worker_number: str): """Handle errors by creating an error file and updating the status report.""" error_data = { "error": error_message, "video_id": video_id, "worker_number": worker_number } error_file_path = os.path.join(output_folder_path, f"errors_{video_id}.json") with open(error_file_path, "w") as f: json.dump(error_data, f, indent=4) # Update status report for failure timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') status_report = f"{timestamp} - {video_id} - failed - {error_message}\n" print(status_report) with open(f"status/status_alignment_{worker_number}.txt", "a") as f: f.write(status_report) def time_to_frametimecode(time_str: str, fps: float, scene_end_time: FrameTimecode = None, filename: str = "unknown_file", worker_number: str = None) -> str: """Convert mm:ss or ss time format to FrameTimecode, or handle special cases like 'end'.""" # Define special cases if time_str == "end": if scene_end_time is not None: return scene_end_time.get_timecode() else: raise ValueError("time_str is end and no replacement for scene_end_time provided") special_cases = ["", "n/a", "varies", "throughout scene", "throughout the scene", "end", "throughout", "not present", "not applicable"] if time_str.lower() in special_cases or re.match(r"scene\s\d+", time_str.lower()): return None match = re.match(r"(\d+)s$", time_str.lower()) if match: time_str = match.group(1) if 'around ' in time_str: time_str = time_str.split('around ')[0] if '~' in time_str: time_str = time_str.split('~')[0] if '+' in time_str: time_str = time_str.split('+')[0] if '-' in time_str: time_str = time_str.split("-")[0] if ' ' in time_str and ":" in time_str: time_str = time_str.split(" ")[0] if ":" in time_str: parts = time_str.split(":") if len(parts) == 3: hours, minutes, seconds = parts elif len(parts) == 2: hours = 0 minutes, seconds = parts elif len(parts) == 1: hours = 0 minutes = 0 seconds = parts[0] else: raise ValueError(f"Invalid timestamp format: {time_str}") if '.' in seconds: seconds = seconds.split(".")[0] match = re.match(r"^\d+", seconds) if match: seconds = int(match.group()) else: raise ValueError(f"Invalid timestamp format: {time_str}") total_seconds = float(hours) * 3600 + float(minutes) * 60 + float(seconds) else: try: total_seconds = float(time_str) except ValueError: raise ValueError(f"Invalid timestamp format: {time_str}") return FrameTimecode(timecode=total_seconds, fps=fps).get_timecode() def adjust_scene_boundaries(video_path, initial_scenes, video_id, worker_number): """Adjust scene boundaries based on scene detection.""" # Initialize video manager and scene manager video_manager = VideoManager([video_path]) scene_manager = SceneManager() scene_manager.add_detector(ContentDetector(threshold=15.0)) # Adjust threshold for sensitivity # Start the video manager and obtain FPS video_manager.start() fps = video_manager.get_framerate() # Get FPS from VideoManager # print(f"Detected FPS: {fps}") # Get total frames using duration in seconds and fps duration_seconds = video_manager.get_duration()[0].get_seconds() total_frames = int(duration_seconds * fps) last_frame_timecode = FrameTimecode(timecode=total_frames, fps=fps).get_timecode().split(".")[0].split(":") last_frame_timecode = last_frame_timecode[1] + ":" + last_frame_timecode[2] adjusted_scenes = [] for idx, initial_scene in enumerate(initial_scenes): if idx == len(initial_scenes) - 1: #Hack to avoid issues with answers that signal the last timestamp as 'end' initial_scene['timestamps']['end_timestamp'] = last_frame_timecode # print(last_frame_timecode) start_timecode = time_to_frametimecode(initial_scene['timestamps']['start_timestamp'], fps, filename=video_id, worker_number = worker_number) end_timecode = time_to_frametimecode(initial_scene['timestamps']['end_timestamp'], fps, filename=video_id, worker_number = worker_number) # Ensure all FrameTimecode objects use the same fps start_frame_number = int(max(0, FrameTimecode(timecode=start_timecode, fps=fps).get_frames() - 2 * fps)) end_frame_number = int(min(total_frames, FrameTimecode(timecode=end_timecode, fps=fps).get_frames() + 2 * fps)) search_start = FrameTimecode(timecode=start_frame_number, fps=fps) search_end = FrameTimecode(timecode=end_frame_number, fps=fps) # Seek to the start frame for detection using FrameTimecode video_manager.seek(search_start) scene_manager.detect_scenes(frame_source=video_manager, end_time=search_end.get_seconds()) detected_scenes = scene_manager.get_scene_list() # Find closest detected boundaries, default to original timecodes if no match found adjusted_start_timecode = start_timecode adjusted_end_timecode = end_timecode if detected_scenes: closest_start = min(detected_scenes, key=lambda x: abs(x[0].get_frames() - FrameTimecode(timecode=start_timecode, fps=fps).get_frames()), default=None) closest_end = min(detected_scenes, key=lambda x: abs(x[1].get_frames() - FrameTimecode(timecode=end_timecode, fps=fps).get_frames()), default=None) if closest_start and abs(closest_start[0].get_frames() - FrameTimecode(timecode=start_timecode, fps=fps).get_frames()) < 2 * fps: adjusted_start_timecode = closest_start[0].get_timecode() distance = abs(closest_start[0].get_seconds() - FrameTimecode(timecode=start_timecode, fps=fps).get_seconds()) if distance > 2: print(f"\t adjusting start timestamp by {distance:.2f} seconds") print(f"\t\tFrom: {start_timecode} to {adjusted_start_timecode}" ) if distance >=5: raise ValueError(f"Large start timestamp adjustment ({distance:.2f} seconds) required for scene {idx+1}") if closest_end and abs(closest_end[1].get_frames() - FrameTimecode(timecode=end_timecode, fps=fps).get_frames()) < 2 * fps: distance = abs(closest_end[1].get_seconds() - FrameTimecode(timecode=end_timecode, fps=fps).get_seconds()) adjusted_end_timecode = closest_end[1].get_timecode() if distance > 2: print(f"\t adjusting end timestamp by {distance:.2f} seconds") print(f"\t\tFrom: {end_timecode} to {adjusted_end_timecode}" ) if distance >=5: raise ValueError(f"Large start timestamp adjustment ({distance:.2f} seconds) required for scene {idx+1}") # Update the JSON with FrameTimecode formatted as HH:MM:SS:FF initial_scene['timestamps']['start_timestamp'] = adjusted_start_timecode initial_scene['timestamps']['end_timestamp'] = adjusted_end_timecode adjusted_scenes.append(initial_scene) # Ensure continuity between scenes if idx > 0: previous_scene_end = FrameTimecode(timecode=adjusted_scenes[idx - 1]['timestamps']['end_timestamp'], fps=fps) current_scene_start = FrameTimecode(timecode=adjusted_start_timecode, fps=fps) # if current_scene_start.get_frames() <= previous_scene_end.get_frames(): # Set start of current scene to be exactly the frame after the end of the previous scene new_start_timecode = previous_scene_end.get_frames() + 1 adjusted_scenes[idx]['timestamps']['start_timestamp'] = FrameTimecode(timecode=new_start_timecode, fps=fps).get_timecode() frame_adjustment = abs(current_scene_start.get_frames() - new_start_timecode) if frame_adjustment > 25: print(f"\t\tWARNING: adjusting a scene start by {frame_adjustment} frames") if frame_adjustment > 125: raise ValueError(f"Large frame adjustment ({frame_adjustment} frames) required for scene {idx+1}") video_manager.release() return fps, adjusted_scenes def update_timestamps_in_json(data: dict, fps: float, video_id: str, worker_number: str) -> dict: """Update all timestamp fields in the JSON data to FrameTimecode format and ensure they stay within scene boundaries.""" # Update timestamps in scenes for scene in data.get('scenes', []): scene_start = FrameTimecode(timecode=scene['timestamps']['start_timestamp'], fps=fps) scene_end = FrameTimecode(timecode=scene['timestamps']['end_timestamp'], fps=fps) def enforce_within_boundaries(timestamp, start, end): if timestamp is None: return None frame_timecode = FrameTimecode(timecode=timestamp, fps=fps) if frame_timecode.get_frames() < start.get_frames(): return start.get_timecode() elif frame_timecode.get_frames() > end.get_frames(): return end.get_timecode() else: return timestamp # Update activities timestamps for activity in scene.get('activities', []): if 'timestamp' in activity: if 'start_timestamp' in activity['timestamp']: activity['timestamp']['start_timestamp'] = enforce_within_boundaries( time_to_frametimecode(activity['timestamp']['start_timestamp'], fps, filename=video_id, scene_end_time=scene_end, worker_number = worker_number), scene_start, scene_end ) if 'end_timestamp' in activity['timestamp']: activity['timestamp']['end_timestamp'] = enforce_within_boundaries( time_to_frametimecode(activity['timestamp']['end_timestamp'], fps, filename=video_id, scene_end_time=scene_end,worker_number = worker_number), scene_start, scene_end ) # Update props timestamps for prop in scene.get('props', []): if 'timestamp' in prop: if 'start_timestamp' in prop['timestamp']: prop['timestamp']['start_timestamp'] = enforce_within_boundaries( time_to_frametimecode(prop['timestamp']['start_timestamp'], fps, filename=video_id, scene_end_time=scene_end,worker_number = worker_number), scene_start, scene_end ) if 'end_timestamp' in prop['timestamp']: prop['timestamp']['end_timestamp'] = enforce_within_boundaries( time_to_frametimecode(prop['timestamp']['end_timestamp'], fps, filename=video_id, scene_end_time=scene_end,worker_number = worker_number), scene_start, scene_end ) # Update video editing details timestamps for video_editing in scene.get('videoEditingDetails', []): if 'timestamps' in video_editing: if 'start_timestamp' in video_editing['timestamps']: video_editing['timestamps']['start_timestamp'] = enforce_within_boundaries( time_to_frametimecode(video_editing['timestamps']['start_timestamp'], fps, filename=video_id, scene_end_time=scene_end,worker_number = worker_number), scene_start, scene_end ) if 'end_timestamp' in video_editing['timestamps']: video_editing['timestamps']['end_timestamp'] = enforce_within_boundaries( time_to_frametimecode(video_editing['timestamps']['end_timestamp'], fps, filename=video_id, scene_end_time=scene_end,worker_number = worker_number), scene_start, scene_end ) # Update mood key moments timestamps for key_moment in scene.get('mood', {}).get('keyMoments', []): if 'timestamp' in key_moment: key_moment['timestamp'] = enforce_within_boundaries( time_to_frametimecode(key_moment['timestamp'], fps, filename=video_id, scene_end_time=scene_end,worker_number = worker_number), scene_start, scene_end ) # Update narrative progression timestamps for narrative in scene.get('narrativeProgression', []): if 'timestamp' in narrative: narrative['timestamp'] = enforce_within_boundaries( time_to_frametimecode(narrative['timestamp'], fps, filename=video_id, scene_end_time=scene_end,worker_number = worker_number), scene_start, scene_end ) # Update storylines climax timestamps if 'storylines' in data and 'climax' in data['storylines'] and 'timestamp' in data['storylines']['climax']: data['storylines']['climax']['timestamp'] = time_to_frametimecode(data['storylines']['climax']['timestamp'], fps, filename=video_id, scene_end_time=scene_end,worker_number = worker_number) # Update trimming suggestions timestamps for trimming in data.get('trimmingSuggestions', []): if 'timestamps' in trimming: if 'start_timestamp' in trimming['timestamps']: trimming['timestamps']['start_timestamp'] = enforce_within_boundaries( time_to_frametimecode(trimming['timestamps']['start_timestamp'], fps, filename=video_id, scene_end_time=scene_end,worker_number = worker_number), scene_start, scene_end ) if 'end_timestamp' in trimming['timestamps']: trimming['timestamps']['end_timestamp'] = enforce_within_boundaries( time_to_frametimecode(trimming['timestamps']['end_timestamp'], fps, filename=video_id, scene_end_time=scene_end,worker_number = worker_number), scene_start, scene_end ) return data def result_exists(video_filename,output_directory): video_id = os.path.splitext(video_filename)[0] result_file = os.path.join(output_directory, f"{video_id}.json") error_file = os.path.join(output_directory, f"errors_{video_id}.json") return os.path.exists(result_file) or os.path.exists(error_file) def process_single_video(video_id, worker_number): s3_folder_videos = 'videos/' video_key = f'{s3_folder_videos}/{video_id}.mp4' video_filename = f'{video_id}.mp4' video_local_path = os.path.join(video_folder_path, video_filename) if result_exists(video_filename,output_folder_path): print(f"Skipping {video_filename}, result already exists.") return # Download video from S3 if not download_video_from_s3(video_key, video_local_path): # Handle download failure error_data = {"error": "File not found in S3"} error_file_path = os.path.join(output_folder_path, f"errors_{video_id}.json") with open(error_file_path, "w") as f: json.dump(error_data, f, indent=4) # Update status report for download failure timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') status_report = f"{timestamp} - {video_id} - failed - File not found in S3\n" print(status_report) with open(f"status/status_alignment_{worker_number}.txt", "a") as f: f.write(status_report) return # Construct paths json_path = os.path.join(json_folder_path, f"{video_id}.json") json_result_path = os.path.join(output_folder_path, f"{video_id}.json") # Load JSON file with open(json_path, 'r') as json_file: video_data = json.load(json_file) try: # Adjust scene boundaries using PySceneDetect to determine FPS fps, adjusted_scenes = adjust_scene_boundaries(video_local_path, video_data['scenes'], video_id, str(worker_number)) # Update scenes in the original data video_data['scenes'] = adjusted_scenes video_data['fps'] = fps # Update all timestamps to FrameTimecode format video_data = update_timestamps_in_json(video_data, fps, video_id, str(worker_number)) # Write updated JSON back to file with open(json_result_path, 'w') as json_file: json.dump(video_data, json_file, indent=4) print(f"Processed video {video_id}.") # Prepare the status report timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') status_report = f"{timestamp} - {video_id} - complete\n" print(status_report) # Append the status report to status.txt if worker_number is None: with open("status_alignment.txt", "a") as f: f.write(status_report) else: with open(f"status/status_alignment_{worker_number}.txt", "a") as f: f.write(status_report) except Exception as e: # Handle any errors in adjusting scenes or updating timestamps error_data = { "error": str(e), "video_id": video_id, "worker_number": worker_number } error_file_path = os.path.join(output_folder_path, f"errors_{video_id}.json") with open(error_file_path, "w") as f: json.dump(error_data, f, indent=4) # Update status report for failure timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') status_report = f"{timestamp} - {video_id} - failed - Error during processing: {str(e)}\n" print(status_report) with open(f"status/status_alignment_{worker_number}.txt", "a") as f: f.write(status_report) finally: # Remove the video file after processing, even if an error occurred if os.path.exists(video_local_path): os.remove(video_local_path) print(f"Deleted local file {video_local_path} after processing.") def process_chunk(videos_to_process, size_chunk, worker_number): # Calculate start and end indices for this worker's chunk start_index = worker_number * size_chunk end_index = min(start_index + size_chunk, len(videos_to_process)) # Process videos in this worker's chunk for video_id in videos_to_process[start_index:end_index]: process_single_video(video_id, worker_number) if __name__ == "__main__": # Parse command-line arguments parser = argparse.ArgumentParser(description='Process videos in chunks.') parser.add_argument('size_chunk', type=int, help='Size of each chunk to process') parser.add_argument('worker_number', type=int, help='Worker number (zero-indexed)') parser.add_argument('--video_list', type=str, help='Optional video list file in JSON format') args = parser.parse_args() # Load the list of videos if args.video_list: with open(args.video_list, 'r') as f: videos_to_process = json.load(f) print(f"Using provided video list: {args.video_list}") else: with open('video_alignment_to_process.json', 'r') as f: videos_to_process = json.load(f) print("Using default video list: video_alignment_to_process.json") # Process the assigned chunk process_chunk(videos_to_process, args.size_chunk, args.worker_number)