in contentannotation/video2annotation.py [0:0]
def process(self, file_path: str) -> Dict[str, Any]:
# Upload video to Gemini
gemini_result = self.upload_video(file_path)
if gemini_result.get("error"):
return self.prep_return(gemini_error=gemini_result['error'])
# Process video with Gemini
gemini_text = self.process_video(gemini_result)
if gemini_text.get("error"):
return self.prep_return(gemini_error=gemini_text['error'])
gemini_out_text = gemini_text.get("gemini_text")
if gemini_out_text is None or gemini_text == "":
return self.prep_return(gemini_error="Empty gemini answer",
gemini_raw_result=gemini_out_text)
# Obtain JSON from the instructor
instructor_result = self.obtain_json(gemini_out_text)
if "IncompleteOutputException" in instructor_result.get("error", "") or "ValidationError" in instructor_result.get("error", ""):
print(f"\tRetrying full pipeline due to Instructor exception: {instructor_result['error']}")
# Retry processing the video
gemini_result = self.upload_video(file_path)
gemini_text = self.process_video(gemini_result, addition_to_prompt=" be concise with your answer")
print("\t retry completed")
# Check if there was an error during reprocessing
if gemini_text.get("error"):
return self.prep_return(gemini_error=gemini_text['error'])
gemini_out_text = gemini_text.get("gemini_text")
if gemini_out_text is None or gemini_text == "":
return self.prep_return(gemini_error="Empty gemini answer",
gemini_raw_result=gemini_out_text)
# Retry obtaining JSON from the instructor with the new Gemini text
instructor_result = self.obtain_json(gemini_text.get("gemini_text"))
# Prepare the final response
final_answer = json.loads(instructor_result["json_result"]) if instructor_result["json_result"] and not instructor_result.get("error") else None
return self.prep_return(final_answer=final_answer,
gemini_raw_result=gemini_text.get("gemini_text"),
instructor_raw_result=instructor_result.get("json_result",None),
instructor_error=instructor_result.get("error",None))