contentannotation/video2annotation.py (293 lines of code) (raw):

import boto3 import google.generativeai as genai from openai import OpenAI import json import time from datetime import datetime from typing import List, Dict, Any, Optional, TypedDict import instructor import os import google.api_core.exceptions import argparse # # Given an input list of videos, this script downloads them from S3 and annotates the videos with Gemini # and structures the data with instructor using GPT4o underneath. # # The code is prepared to run as a standalone application: # The first parameter is size_chunk: it basically divide the list of videos in sublists of length size_chunk # The worker_number decides in which sublist of size size_chunk the current execution will be working on # --video-list is to specify the json file that contains a list of videoids as a JSON list. If that is not provided, it defaults to oracle_videos_server.json # ### CONFIG ### # Directories to download input videos and output annotation results input_directory = 'videos_minioracle/' output_directory = 'videos_minioracle_results/' bucket_name = '<bucket_name>' GEMINI_PATH="/path/to/your/key/file" OPENAI_PATH="/path/to/your/key/file" ### ### Data Schema ### class Character(TypedDict): characterId: str name: str description: str class Timestamps(TypedDict): start_timestamp: str end_timestamp: str class Activity(TypedDict): description: str timestamp: Timestamps class Prop(TypedDict): name: str timestamp: Timestamps class VideoEditingDetail(TypedDict): description: str timestamps: Timestamps class KeyMoment(TypedDict): timestamp: str changeDescription: str class Mood(TypedDict): description: str keyMoments: List[KeyMoment] class NarrativeProgression(TypedDict): description: str timestamp: str class CharacterInteraction(TypedDict): characters: List[str] description: str class Scene(TypedDict): sceneId: int title: str timestamps: Timestamps cast: List[str] activities: List[Activity] props: List[Prop] videoEditingDetails: List[VideoEditingDetail] mood: Mood narrativeProgression: List[NarrativeProgression] characterInteraction: List[CharacterInteraction] thematicElements: str contextualRelevance: str dynamismScore: float audioVisualCorrelation: float class Climax(TypedDict): description: str timestamp: str class Storyline(TypedDict): description: str scenes: List[int] climax: Climax class QAndA(TypedDict): question: str answer: str class TrimmingSuggestion(TypedDict, total=False): timestamps: Timestamps description: str class Schema(TypedDict): title: str description: str characterList: List[Character] scenes: List[Scene] storylines: Storyline qAndA: List[QAndA] trimmingSuggestions: List[TrimmingSuggestion] ### class VideoProcessor: def __init__(self, gemini_api_key_path: str, openai_api_key_path: str): # Initialize API keys and clients self.gemini_apikey = self._read_api_key(gemini_api_key_path) self.openai_apikey = self._read_api_key(openai_api_key_path) genai.configure(api_key=self.gemini_apikey) self.clientOpenAI = OpenAI(api_key=self.openai_apikey) def _read_api_key(self, path: str) -> str: with open(path, "r") as file: return file.read().strip() def upload_video(self, file_path: str) -> Dict[str, Any]: print(f"Uploading file... {file_path}") try: video_file = genai.upload_file(path=file_path) while video_file.state.name == "PROCESSING": time.sleep(10) video_file = genai.get_file(video_file.name) if video_file.state.name == "FAILED": return {"error": "Upload failed", "video_file": None} return {"video_file": video_file} except Exception as e: return {"error": str(e), "video_file": None} def process_video(self, video_file: Any, addition_to_prompt=None) -> Dict[str, Optional[str]]: if "error" in video_file: return {"error": "video_file error: " + video_file["error"], "gemini_text": None} max_retries = 5 attempt = 0 delay = 2 # in seconds while attempt < max_retries: try: print(f"Processing {video_file['video_file'].display_name} (Attempt {attempt + 1})") prompt = open("gemini_prompt.txt", "r").read() if addition_to_prompt: print(f"\t adding addition to prompt: {addition_to_prompt}") prompt = prompt + addition_to_prompt model = genai.GenerativeModel(model_name="gemini-1.5-pro-latest") response = model.generate_content( [video_file['video_file'], prompt], request_options={"timeout": 600}, safety_settings=[ {"category": genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, "threshold": genai.types.HarmBlockThreshold.BLOCK_NONE}, {"category": genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT, "threshold": genai.types.HarmBlockThreshold.BLOCK_NONE}, {"category": genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, "threshold": genai.types.HarmBlockThreshold.BLOCK_NONE}, {"category": genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, "threshold": genai.types.HarmBlockThreshold.BLOCK_NONE} ] ) if not response.candidates: return {"error": "No candidates returned. Feedback: " + str(response.prompt_feedback), "gemini_text": None} #Cleaning up the analyzed file genai.delete_file(video_file['video_file'].name) return {"gemini_text": response.text} except google.api_core.exceptions.InternalServerError as e: print(f"InternalServerError occurred: {e}. Retrying in {delay} seconds...") attempt += 1 time.sleep(delay) delay *= 2 # Exponential backoff except Exception as e: if "The read operation timed out" in str(e) or "record layer failure" in str(e): print(f"Gemini error: {str(e)}. Retrying in {delay} seconds...") attempt +=1 time.sleep(delay) delay *= 2 else: return {"error": str(e), "gemini_text": None} # If all retries fail return {"error": f"Failed after {max_retries} attempts due to InternalServerError / timeouts / SSL.", "gemini_text": None} def obtain_json(self, gemini_answer: Optional[str]) -> Dict[str, Optional[str]]: if gemini_answer is None or (isinstance(gemini_answer, dict) and "error" in gemini_answer): return {"error": gemini_answer.get("error") if gemini_answer else "No Gemini answer", "json_result": None} try: # Patch the OpenAI client client = instructor.from_openai(self.clientOpenAI) promptOpenAI = gemini_answer completion = client.chat.completions.create( model="gpt-4o-2024-08-06", response_model=Schema, messages=[ {"role": "user", "content": promptOpenAI}, ] ) return {"json_result": completion.json()} except Exception as e: return {"error": str(e), "json_result": None} def prep_return(self, final_answer=None, gemini_error = None, gemini_raw_result=None, instructor_error = None, instructor_raw_result=None): return { "final_answer": final_answer, "gemini": { "error": gemini_error, "raw_result": gemini_raw_result }, "instructor": { "error": instructor_error, "raw_result": instructor_raw_result } } def process(self, file_path: str) -> Dict[str, Any]: # Upload video to Gemini gemini_result = self.upload_video(file_path) if gemini_result.get("error"): return self.prep_return(gemini_error=gemini_result['error']) # Process video with Gemini gemini_text = self.process_video(gemini_result) if gemini_text.get("error"): return self.prep_return(gemini_error=gemini_text['error']) gemini_out_text = gemini_text.get("gemini_text") if gemini_out_text is None or gemini_text == "": return self.prep_return(gemini_error="Empty gemini answer", gemini_raw_result=gemini_out_text) # Obtain JSON from the instructor instructor_result = self.obtain_json(gemini_out_text) if "IncompleteOutputException" in instructor_result.get("error", "") or "ValidationError" in instructor_result.get("error", ""): print(f"\tRetrying full pipeline due to Instructor exception: {instructor_result['error']}") # Retry processing the video gemini_result = self.upload_video(file_path) gemini_text = self.process_video(gemini_result, addition_to_prompt=" be concise with your answer") print("\t retry completed") # Check if there was an error during reprocessing if gemini_text.get("error"): return self.prep_return(gemini_error=gemini_text['error']) gemini_out_text = gemini_text.get("gemini_text") if gemini_out_text is None or gemini_text == "": return self.prep_return(gemini_error="Empty gemini answer", gemini_raw_result=gemini_out_text) # Retry obtaining JSON from the instructor with the new Gemini text instructor_result = self.obtain_json(gemini_text.get("gemini_text")) # Prepare the final response final_answer = json.loads(instructor_result["json_result"]) if instructor_result["json_result"] and not instructor_result.get("error") else None return self.prep_return(final_answer=final_answer, gemini_raw_result=gemini_text.get("gemini_text"), instructor_raw_result=instructor_result.get("json_result",None), instructor_error=instructor_result.get("error",None)) # AWS S3 Configuration session = boto3.Session() s3_client = session.client('s3') # Ensure the input and output directories exist os.makedirs(input_directory, exist_ok=True) os.makedirs(output_directory, exist_ok=True) # Function to check if a result or error file exists for a video def result_exists(video_filename): video_id = os.path.splitext(video_filename)[0] result_file = os.path.join(output_directory, f"{video_id}.json") error_file = os.path.join(output_directory, f"errors_{video_id}.json") return os.path.exists(result_file) or os.path.exists(error_file) # Function to download video from S3 def download_video_from_s3(video_key, local_path): bucket_name = '<bucket_name>' try: s3_client.download_file(bucket_name, video_key, local_path) print(f"Downloaded {video_key} to {local_path}") return True except Exception as e: print(f"Failed to download {video_key} from S3: {e}") return False def process_single_video(video_id, worker_number): videos_path = 'path/' video_key = f'{videos_path}/{video_id}.mp4' video_filename = f'{video_id}.mp4' local_path = os.path.join(input_directory, video_filename) if result_exists(video_filename): print(f"Skipping {video_filename}, result already exists.") return # Download video from S3 if not download_video_from_s3(video_key, local_path): # Handle download failure error_data = {"error": "File not found in S3"} error_file_path = os.path.join(output_directory, f"errors_{video_id}.json") with open(error_file_path, "w") as f: json.dump(error_data, f, indent=4) # Update status report for download failure timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') status_report = f"{timestamp} - {video_id} - failed - File not found in S3\n" print(status_report) with open("status.txt", "a") as f: f.write(status_report) return # Process the video using VideoProcessor class processor = VideoProcessor(gemini_api_key_path=GEMINI_PATH, openai_api_key_path=OPENAI_PATH) result = processor.process(local_path) video_id = os.path.splitext(os.path.basename(local_path))[0] # Save final answer to JSON if available if result.get("final_answer") is not None: with open(os.path.join(output_directory, f"{video_id}.json"), "w") as f: json.dump(result["final_answer"], f, indent=4) status = "successful" else: status = "failed" # Save errors to JSON if any errors exist errors = {} if (result.get("gemini", {}).get("error") is not None) or (result.get("instructor", {}).get("error") is not None): gemini_raw = result["gemini"].get("raw_result") errors = { "gemini_error": result["gemini"].get("error"), "instructor_error": result["instructor"].get("error"), "gemini_raw_result": gemini_raw } with open(os.path.join(output_directory, f"errors_{video_id}.json"), "w") as f: json.dump(errors, f, indent=4) # Prepare the status report timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') error_details = ', '.join(filter(None, [result.get("gemini", {}).get("error"), result.get("instructor", {}).get("error")])) status_report = f"{timestamp} - {video_id} - {status} - {error_details if error_details else 'None'}\n" print(status_report) # Append the status report to status.txt if worker_number is None: with open("status.txt", "a") as f: f.write(status_report) else: with open(f"status/status_{worker_number}.txt", "a") as f: f.write(status_report) # Remove the video file after processing os.remove(local_path) print(f"Deleted local file {local_path} after processing.") def process_chunk(videos_to_process, size_chunk, worker_number): # Calculate start and end indices for this worker's chunk start_index = worker_number * size_chunk end_index = min(start_index + size_chunk, len(videos_to_process)) # Process videos in this worker's chunk for video_id in videos_to_process[start_index:end_index]: process_single_video(video_id, worker_number) if __name__ == "__main__": # Parse command-line arguments parser = argparse.ArgumentParser(description='Process videos in chunks.') parser.add_argument('size_chunk', type=int, help='Size of each chunk to process') parser.add_argument('worker_number', type=int, help='Worker number (zero-indexed)') parser.add_argument('--video_list', type=str, help='Optional video list file in JSON format') args = parser.parse_args() # Load the list of videos if args.video_list: with open(args.video_list, 'r') as f: videos_to_process = json.load(f) print(f"Using provided video list: {args.video_list}") else: with open('oracle_videos_server.json', 'r') as f: videos_to_process = json.load(f) print("Using default video list: oracle_videos_server.json") # Process the assigned chunk process_chunk(videos_to_process, args.size_chunk, args.worker_number)