in contentannotation/video2annotation.py [0:0]
def process_single_video(video_id, worker_number):
videos_path = 'path/'
video_key = f'{videos_path}/{video_id}.mp4'
video_filename = f'{video_id}.mp4'
local_path = os.path.join(input_directory, video_filename)
if result_exists(video_filename):
print(f"Skipping {video_filename}, result already exists.")
return
# Download video from S3
if not download_video_from_s3(video_key, local_path):
# Handle download failure
error_data = {"error": "File not found in S3"}
error_file_path = os.path.join(output_directory, f"errors_{video_id}.json")
with open(error_file_path, "w") as f:
json.dump(error_data, f, indent=4)
# Update status report for download failure
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
status_report = f"{timestamp} - {video_id} - failed - File not found in S3\n"
print(status_report)
with open("status.txt", "a") as f:
f.write(status_report)
return
# Process the video using VideoProcessor class
processor = VideoProcessor(gemini_api_key_path=GEMINI_PATH, openai_api_key_path=OPENAI_PATH)
result = processor.process(local_path)
video_id = os.path.splitext(os.path.basename(local_path))[0]
# Save final answer to JSON if available
if result.get("final_answer") is not None:
with open(os.path.join(output_directory, f"{video_id}.json"), "w") as f:
json.dump(result["final_answer"], f, indent=4)
status = "successful"
else:
status = "failed"
# Save errors to JSON if any errors exist
errors = {}
if (result.get("gemini", {}).get("error") is not None) or (result.get("instructor", {}).get("error") is not None):
gemini_raw = result["gemini"].get("raw_result")
errors = {
"gemini_error": result["gemini"].get("error"),
"instructor_error": result["instructor"].get("error"),
"gemini_raw_result": gemini_raw
}
with open(os.path.join(output_directory, f"errors_{video_id}.json"), "w") as f:
json.dump(errors, f, indent=4)
# Prepare the status report
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
error_details = ', '.join(filter(None, [result.get("gemini", {}).get("error"), result.get("instructor", {}).get("error")]))
status_report = f"{timestamp} - {video_id} - {status} - {error_details if error_details else 'None'}\n"
print(status_report)
# Append the status report to status.txt
if worker_number is None:
with open("status.txt", "a") as f:
f.write(status_report)
else:
with open(f"status/status_{worker_number}.txt", "a") as f:
f.write(status_report)
# Remove the video file after processing
os.remove(local_path)
print(f"Deleted local file {local_path} after processing.")