def process_video()

in contentannotation/video2annotation.py [0:0]


    def process_video(self, video_file: Any, addition_to_prompt=None) -> Dict[str, Optional[str]]:
        if "error" in video_file:
            return {"error": "video_file error: " + video_file["error"], "gemini_text": None}

        max_retries = 5
        attempt = 0
        delay = 2  # in seconds

        while attempt < max_retries:
            try:
                print(f"Processing {video_file['video_file'].display_name} (Attempt {attempt + 1})")
                prompt = open("gemini_prompt.txt", "r").read()
                if addition_to_prompt:
                    print(f"\t adding addition to prompt: {addition_to_prompt}")
                    prompt = prompt + addition_to_prompt
                model = genai.GenerativeModel(model_name="gemini-1.5-pro-latest")
                response = model.generate_content(
                    [video_file['video_file'], prompt],
                    request_options={"timeout": 600},
                    safety_settings=[
                        {"category": genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
                        "threshold": genai.types.HarmBlockThreshold.BLOCK_NONE},
                        {"category": genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT,
                        "threshold": genai.types.HarmBlockThreshold.BLOCK_NONE},
                        {"category": genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH,
                        "threshold": genai.types.HarmBlockThreshold.BLOCK_NONE},
                        {"category": genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
                        "threshold": genai.types.HarmBlockThreshold.BLOCK_NONE}
                    ]
                )

                if not response.candidates:
                    return {"error": "No candidates returned. Feedback: " + str(response.prompt_feedback), "gemini_text": None}
                #Cleaning up the analyzed file
                genai.delete_file(video_file['video_file'].name)
                
                return {"gemini_text": response.text}

            except google.api_core.exceptions.InternalServerError as e:
                print(f"InternalServerError occurred: {e}. Retrying in {delay} seconds...")
                attempt += 1
                time.sleep(delay)
                delay *= 2  # Exponential backoff

            except Exception as e:
                if "The read operation timed out" in str(e) or "record layer failure" in str(e):
                    print(f"Gemini error: {str(e)}. Retrying in {delay} seconds...")
                    attempt +=1
                    time.sleep(delay)
                    delay *= 2
                else:
                    return {"error": str(e), "gemini_text": None}

        # If all retries fail
        return {"error": f"Failed after {max_retries} attempts due to InternalServerError / timeouts / SSL.", "gemini_text": None}