colab-enterprise/Create-Campaign-Quality-Control-ABCD.ipynb (5,053 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "VRLzn_0Bw5VJ"
},
"source": [
"## <font color='#4285f4'>Overview</font>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook demonstrates an AI-powered method of evaluating campaign videos based on the ABCDs framework built and maintained by the Google Creative Works team. It leverages a [GitHub project](https://github.com/google-marketing-solutions/abcds-detector) built by Ana Esqueda to score the auto-generated brand videos created by Veo and suggest areas for improvement.\n",
"\n",
"Process Flow: \n",
"1. ABCD Criteria & Assessment Functions:\n",
" - Define functions to evaluate each of the 23 ABCD features, using both Video Intelligence API annotations (e.g., shot detection, text detection) and Gemini LLM's understanding of video content.\n",
" - The LLM assesses video features through prompts tailored for each criterion.\n",
" - The functions calculate scores and provide detailed explanations for the assessments.\n",
"2. Execute Assessment:\n",
" - Generates video annotations using the Video Intelligence API for all brand videos in the GCS bucket.\n",
" - Trims the videos to create 5-second versions for certain assessments.\n",
" - Executes the ABCD assessment for each video, combining API annotations and LLM evaluations.\n",
" - Parses the assessment results and prints a summary for each video, including score, overall result, and evaluation of each feature.\n",
"3. Save Results:\n",
" - Saves the parsed results (brand name, video name, score, result text, feature details, feature_timestamps etc.) to the BigQuery table (campaign_abcd_results) for long-term analysis.\n",
"\n",
"\n",
"Author: Ana Esqueda (with small integration edits by Paul Ramsey)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "HMsUvoF4BP7Y"
},
"source": [
"## <font color='#4285f4'>License</font>\n",
"\n",
"\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "jQgQkbOvj55d"
},
"source": [
"```\n",
"# Copyright 2024 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "liRvAabsyNZb"
},
"source": [
"## <font color='#4285f4'>Pre-req's</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "UmyL-Rg4Dr_f"
},
"source": [
"### Initialize"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "5MaWM6H5i6rX"
},
"outputs": [],
"source": [
"!pip install --upgrade google-cloud-videointelligence \\\n",
" google-auth==2.23.0 \\\n",
" google-cloud-aiplatform \\\n",
" google-cloud-storage \\\n",
" moviepy \\\n",
" google-api-python-client\n",
"\n",
"# Install gcsfuse\n",
"# From https://cloud.google.com/storage/docs/gcsfuse-quickstart-mount-bucket\n",
"!echo \"deb https://packages.cloud.google.com/apt gcsfuse-`lsb_release -c -s` main\" | sudo tee /etc/apt/sources.list.d/gcsfuse.list\n",
"!curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -\n",
"!sudo apt-get -q update\n",
"!sudo apt-get -q install fuse gcsfuse\n",
"!gcsfuse -v"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Restart session - You will get an error saying \"Your session crashed for an unknown reason\"\n",
"# This is expected, and the runtime will automatically reconnect. \n",
"\n",
"import os\n",
"os.kill(os.getpid(), 9)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "xOYsEVSXp6IP"
},
"outputs": [],
"source": [
"import json\n",
"import random\n",
"import time\n",
"import datetime\n",
"import base64\n",
"import vertexai\n",
"import os\n",
"import urllib\n",
"from google.cloud import storage\n",
"from google.cloud import videointelligence\n",
"#from moviepy.editor import VideoFileClip -> looks like package has changed\n",
"from moviepy import VideoFileClip\n",
"import vertexai.preview.generative_models as generative_models\n",
"from vertexai.preview.generative_models import GenerativeModel, Part\n",
"from googleapiclient.errors import HttpError\n",
"from IPython.display import HTML\n",
"from IPython.display import Video\n",
"from base64 import b64encode\n",
"from IPython.display import YouTubeVideo\n",
"from google.cloud import videointelligence_v1 as videointelligence2\n",
"from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception\n",
"import logging\n",
"from IPython.display import HTML\n",
"from IPython.display import Audio\n",
"import markdown\n",
"import re\n",
"\n",
"\n",
"from google.cloud import bigquery\n",
"client = bigquery.Client()\n",
"\n",
"# Set these (run this cell to verify the output)\n",
"bigquery_location = \"${bigquery_location}\"\n",
"region = \"${region}\"\n",
"\n",
"# Get some values using gcloud\n",
"project_id = !(gcloud config get-value project)\n",
"user = !(gcloud auth list --filter=status:ACTIVE --format=\"value(account)\")\n",
"\n",
"\n",
"if len(project_id) != 1:\n",
" raise RuntimeError(f\"project_id is not set: {project_id}\")\n",
"project_id = project_id[0]\n",
"\n",
"if len(user) != 1:\n",
" raise RuntimeError(f\"user is not set: {user}\")\n",
"user = user[0]\n",
"\n",
"bucket_name = \"${chocolate_ai_bucket}\"\n",
"project_number = !(gcloud projects describe $project_id --format=\"value(projectNumber)\")\n",
"project_number = project_number[0]\n",
"\n",
"print(f\"project_id = {project_id}\")\n",
"print(f\"project_number = {project_number}\")\n",
"print(f\"user = {user}\")\n",
"print(f\"bucket_name = {bucket_name}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "9gekgDPnnUFR"
},
"source": [
"### Create Table\n",
"\n",
"Create the `campaign_abcd_results` table to store results of the ABCDs assessments for long-term analysis."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "57I6qTTJLFp2"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"CREATE TABLE IF NOT EXISTS `${project_id}.${bigquery_chocolate_ai_dataset}.campaign_abcd_results`\n",
"(\n",
" assessment_id STRING DEFAULT GENERATE_UUID() OPTIONS(description=\"Unique identifier for the assessment.\"),\n",
" assessment_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP() OPTIONS(description=\"The date and time the assessment was run.\"),\n",
" brand_name STRING NOT NULL OPTIONS(description=\"The name of the brand.\"),\n",
" video_name STRING NOT NULL OPTIONS(description=\"The name of the video.\"),\n",
" video_url STRING OPTIONS(description=\"URL of the video being assessed.\"),\n",
" score FLOAT64 OPTIONS(description=\"Overall score of the assessment.\"),\n",
" result_text STRING OPTIONS(description=\"Summary text of the assessment result.\"),\n",
" passed_features_count FLOAT64 OPTIONS(description=\"Number of features that passed the assessment.\"),\n",
" total_features_count FLOAT64 OPTIONS(description=\"Total number of features assessed.\"),\n",
" features_detail JSON OPTIONS(description=\"Detailed information about each feature and its assessment result.\"),\n",
" feature_timestamps JSON OPTIONS(description=\"Timestamps for each feature and its assessment result.\"),\n",
") CLUSTER BY assessment_id;"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "EYRHDPdVKBzd"
},
"source": [
"## <font color='#4285f4'>Setup ABCDs Environment</font>\n",
"\n",
"NOTE: This notebook is based on the work of Ana Esqueda in the [google-marketing-solutions/abdcs-detector](https://github.com/google-marketing-solutions/abcds-detector) repo. Please refer to the source repo for updates and enhancements to this product."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "V98rJWlRa9Yj"
},
"source": [
"### Define environment variables"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "XjTkztgBUsVU"
},
"outputs": [],
"source": [
"\"\"\"Module that defines the colab parameters\"\"\"\n",
"# @markdown ### Knowledge Graph Key\n",
"# @markdown Generate an API Key to connect to the Knowledge Graph API to find entities such as brands, products, etc., to match with video annotation results.\n",
"\n",
"# @markdown To generate an API key, please follow the steps [here](https://support.google.com/googleapi/answer/6158862?hl=en), then enter the key in the box below.\n",
"\n",
"\n",
"KNOWLEDGE_GRAPH_API_KEY = \"\" # @param {type:\"string\"}\n",
"\n",
"# @markdown ### Brand and Product Details\n",
"# @markdown Providing video hints helps the AI model perform a better evaluation.\n",
"\n",
"brand_name = \"chocolate_ai\" # @param {type:\"string\"}\n",
"brand_variations_str = \"chocolateai\" # @param {type:\"string\"}\n",
"branded_products_str = \"Chocolate Tasting Flight, Chocolate Decadence, Molten Caramel Surprise\" # @param {type:\"string\"}\n",
"branded_products_categories_str = \"chocolate, cake, coffee\" # @param {type:\"string\"}\n",
"branded_call_to_actions_str = \"Indulge in the artistry of Chocolate AI\" # @param {type:\"string\"}\n",
"\n",
"# @markdown ### Solution Setup\n",
"# @markdown Advanced options that allow only parts of the solution to run.\n",
"\n",
"VIDEO_SIZE_LIMIT_MB = 40 # @param {type:\"number\"}\n",
"VERBOSE = True # @param {type:\"boolean\"}\n",
"use_llms = True # @param {type:\"boolean\"}\n",
"use_annotations = True # @param {type:\"boolean\"}\n",
"# For local testing outside colab ONLY, set to False for colab\n",
"STORE_ASSESSMENT_RESULTS_LOCALLY = True # @param {type:\"boolean\"}\n",
"TEST_RESULTS = []\n",
"\n",
"# @markdown ### ABCD Framework Details\n",
"# @markdown Video analysis parameters to generate text, runs first.\n",
"\n",
"early_time_seconds = 5\n",
"confidence_threshold = 0.5 # @param {type:\"number\"}\n",
"face_surface_threshold = 0.15 # @param {type:\"number\"}\n",
"logo_size_threshold = 3.5 # @param {type:\"number\"}\n",
"avg_shot_duration_seconds = 2 # @param {type:\"number\"}\n",
"dynamic_cutoff_ms = 3000 # @param {type:\"number\"}\n",
"\n",
"\n",
"# @markdown ### LLM Configuration\n",
"# @markdown Tune the text analysis model, runs second.\n",
"\n",
"GEMINI_PRO = \"gemini-2.0-flash\" # @param {type:\"string\"}\n",
"llm_location = \"${location}\" # @param {type:\"string\"}\n",
"max_output_tokens = 8192 # @param {type:\"number\"}\n",
"temperature = 1 # @param {type:\"number\"}\n",
"top_p = 0.95 # @param {type:\"number\"}\n",
"top_k = 32 # @param {type:\"number\"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "xFkXuunavOYN"
},
"source": [
"### Mount Bucket and Transfer Video Files"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "5RFDkudKvNoE"
},
"outputs": [],
"source": [
"# Copy the sample ad video\n",
"!gsutil -m cp gs://data-analytics-golden-demo/chocolate-ai/v1/Campaign-Assets-Text-to-Video-01/story-01/full-video-with-audio-en-GB.mp4 gs://\"$bucket_name/chocolate_ai/videos/chocolate-ai_story-HITL-01_full-video-with-audio-en-GB.mp4\"\n",
"!gsutil -m cp gs://data-analytics-golden-demo/chocolate-ai/v1/Campaign-Assets-Text-to-Video-01/story-02/full-video-with-audio-en-GB.mp4 gs://\"$bucket_name/chocolate_ai/videos/chocolate-ai_story-HITL-02_full-video-with-audio-en-GB.mp4\"\n",
"!gsutil -m cp gs://data-analytics-golden-demo/chocolate-ai/v1/Campaign-Assets-Text-to-Video-01/story-03/full-video-with-audio-en-GB.mp4 gs://\"$bucket_name/chocolate_ai/videos/chocolate-ai_story-HITL-03_full-video-with-audio-en-GB.mp4\"\n",
"!gsutil -m cp gs://data-analytics-golden-demo/chocolate-ai/v1/Campaign-Assets-Text-to-Video-02/story-01/full-video-with-audio-en-GB.mp4 gs://\"$bucket_name/chocolate_ai/videos/chocolate-ai_story-No-HITL-03_full-video-with-audio-en-GB.mp4\"\n",
"\n",
"# Mount the Google Cloud Storage Bucket\n",
"%env bucket_name={bucket_name}\n",
"\n",
"!mkdir /content/$bucket_name\n",
"!gcsfuse --implicit-dirs $bucket_name /content/$bucket_name"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "I0xXYGNqbCyK"
},
"source": [
"### Load Helper Methods"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "KIks6X7iMHxk"
},
"source": [
"#### restAPIHelper()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "40wlwnY4kM11"
},
"outputs": [],
"source": [
"def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n",
" \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n",
"\n",
" import requests\n",
" import google.auth\n",
" import json\n",
"\n",
" # Get an access token based upon the current user\n",
" creds, project = google.auth.default()\n",
" auth_req = google.auth.transport.requests.Request()\n",
" creds.refresh(auth_req)\n",
" access_token=creds.token\n",
"\n",
" headers = {\n",
" \"Content-Type\" : \"application/json\",\n",
" \"Authorization\" : \"Bearer \" + access_token\n",
" }\n",
"\n",
" if http_verb == \"GET\":\n",
" response = requests.get(url, headers=headers)\n",
" elif http_verb == \"POST\":\n",
" response = requests.post(url, json=request_body, headers=headers)\n",
" elif http_verb == \"PUT\":\n",
" response = requests.put(url, json=request_body, headers=headers)\n",
" elif http_verb == \"PATCH\":\n",
" response = requests.patch(url, json=request_body, headers=headers)\n",
" elif http_verb == \"DELETE\":\n",
" response = requests.delete(url, headers=headers)\n",
" else:\n",
" raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n",
"\n",
" if response.status_code == 200:\n",
" return json.loads(response.content)\n",
" #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n",
" else:\n",
" error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n",
" raise RuntimeError(error)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "jF-08l7rMRk8"
},
"source": [
"#### RunQuery(sql)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "KC3nKweMMUNM"
},
"outputs": [],
"source": [
"def RunQuery(sql, job_config = None):\n",
" import time\n",
"\n",
" if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n",
" df_result = client.query(sql).to_dataframe()\n",
" return df_result\n",
" else:\n",
" if job_config == None:\n",
" job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n",
" query_job = client.query(sql, job_config=job_config)\n",
"\n",
" # Check on the progress by getting the job's updated state.\n",
" query_job = client.get_job(\n",
" query_job.job_id, location=query_job.location\n",
" )\n",
" print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n",
"\n",
" while query_job.state != \"DONE\":\n",
" time.sleep(2)\n",
" query_job = client.get_job(\n",
" query_job.job_id, location=query_job.location\n",
" )\n",
" print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n",
"\n",
" if query_job.error_result == None:\n",
" return True\n",
" else:\n",
" return False"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "0bltMx4TNXXq"
},
"source": [
"#### RetryCondition(error)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "zJUHxZ3VNYib"
},
"outputs": [],
"source": [
"def RetryCondition(error):\n",
" error_string = str(error)\n",
" print(error_string)\n",
"\n",
" retry_errors = [\n",
" \"RESOURCE_EXHAUSTED\",\n",
" \"No content in candidate\",\n",
" \"429 Unable to submit request because the service is temporarily out of capacity\",\n",
" # Add more error messages here as needed\n",
" ]\n",
"\n",
" for retry_error in retry_errors:\n",
" if retry_error in error_string:\n",
" print(\"Retrying...\")\n",
" return True\n",
"\n",
" return False"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "wezkgJBy-hCP"
},
"source": [
"#### ExtractTimestampsFromText()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "xG5yqvqH-gsI"
},
"outputs": [],
"source": [
"def ExtractTimestampsFromText(parsed_abcd_result):\n",
" timestamp_array = []\n",
" timestamp_pattern = r\"\\b\\d+:\\d{2}(?:(?:\\s*(?:and|[-,])\\s*)?\\d+:\\d{2})*\\b\"\n",
" timestamp_video = parsed_abcd_result['video_name']\n",
" for detail in parsed_abcd_result['features_detail']:\n",
" timestamp_feature = detail['feature']\n",
" timestamp_feature_detected = detail['feature_detected']\n",
" \n",
" # Initialize an empty list to store timestamps for the current feature\n",
" all_timestamps = []\n",
" all_explanations = []\n",
"\n",
" for llm_detail in detail['llm_details']:\n",
" try:\n",
" timestamps = re.findall(timestamp_pattern, llm_detail['llm_explanation'])\n",
" # Extend the list with timestamps found in this llm_detail\n",
" all_timestamps.extend(timestamps)\n",
" all_explanations.append(llm_detail['llm_explanation'])\n",
" except:\n",
" pass\n",
"\n",
" # Create a single entry for the feature with all timestamps\n",
" timestamp_array.append({\n",
" 'feature': timestamp_feature,\n",
" 'feature_detected': timestamp_feature_detected,\n",
" 'timestamps': all_timestamps,\n",
" 'explanation': all_explanations # Store all explanations\n",
" })\n",
"\n",
" return {'video': timestamp_video, 'feature_timestamps': timestamp_array}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "HxHiHCdoeg6H"
},
"source": [
"#### Video Annotations\n",
"\n",
"Generate video annotations using Video Intelligence API\n",
"\n",
"Note: No output is expected from this section."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "oafMiR49etR4"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def generate_video_annotations(brand_name: str):\n",
" \"\"\"Generates video annotations for videos in Google Cloud Storage\n",
" Args:\n",
" brand_name: the brand to generate the video annotations for\n",
" \"\"\"\n",
" # Get videos from GCS\n",
" bucket = get_bucket()\n",
" brand_videos_folder = f\"{brand_name}/videos\"\n",
" blobs = bucket.list_blobs(prefix=brand_videos_folder)\n",
" # Video processing\n",
" for video in blobs:\n",
" if video.name == f\"{brand_videos_folder}/\" or \"1st_5_secs\" in video.name:\n",
" # Skip parent folder and trimmed versions of videos\n",
" continue\n",
" video_name, video_name_with_format = get_file_name_from_gcs_url(video.name)\n",
" video_location = f\"gs://{bucket_name}/{video.name}\"\n",
" video_annotations = get_existing_annotations_from_gcs(brand_name)\n",
" # Generate video annotations\n",
" generate_annotations_for_video(\n",
" brand_name,\n",
" video_name,\n",
" video_name_with_format,\n",
" video_location,\n",
" video_annotations,\n",
" )\n",
"\n",
"\n",
"def generate_annotations_for_video(\n",
" brand_name: str,\n",
" video_name: str,\n",
" video_name_with_format: str,\n",
" video_location: str,\n",
" existing_video_annotations: list[str],\n",
"):\n",
" \"\"\"Generates video annotations only if the video hasn't been processed\n",
" Args:\n",
" brand_name: the brand to generate the video annotations for\n",
" video_name: the name of the video to generate the annotations for\n",
" video_name_with_format: video name and format\n",
" existing_video_annotations: a list of existing annotations to avoid generating\n",
" them for the same video\n",
" \"\"\"\n",
"\n",
" # Label Detection\n",
" label_detection_output = (\n",
" f\"gs://{bucket_name}/{brand_name}/annotations/{video_name}/label-detection.json\"\n",
" )\n",
" if label_detection_output not in existing_video_annotations:\n",
" detect_labels(video_location, label_detection_output)\n",
" else:\n",
" print(\n",
" f\"Label annotations for video {video_name_with_format} already exist, API request skipped.\\n\"\n",
" )\n",
"\n",
" # Face Detection\n",
" face_detection_output = (\n",
" f\"gs://{bucket_name}/{brand_name}/annotations/{video_name}/face-detection.json\"\n",
" )\n",
" if face_detection_output not in existing_video_annotations:\n",
" detect_faces(video_location, face_detection_output)\n",
" else:\n",
" print(\n",
" f\"Face annotations for video {video_name_with_format} already exist, API request skipped.\\n\"\n",
" )\n",
"\n",
" # People Detection\n",
" people_detection_output = f\"gs://{bucket_name}/{brand_name}/annotations/{video_name}/people-detection.json\"\n",
" if people_detection_output not in existing_video_annotations:\n",
" detect_people(video_location, people_detection_output)\n",
" else:\n",
" print(\n",
" f\"People annotations for video {video_name_with_format} already exist, API request skipped.\\n\"\n",
" )\n",
"\n",
" # Shot Detection\n",
" shot_detection_output = (\n",
" f\"gs://{bucket_name}/{brand_name}/annotations/{video_name}/shot-detection.json\"\n",
" )\n",
" if shot_detection_output not in existing_video_annotations:\n",
" detect_shots(video_location, shot_detection_output)\n",
" else:\n",
" print(\n",
" f\"Shot annotations for video {video_name_with_format} already exist, API request skipped.\\n\"\n",
" )\n",
"\n",
" # Text Detection\n",
" text_detection_output = (\n",
" f\"gs://{bucket_name}/{brand_name}/annotations/{video_name}/text-detection.json\"\n",
" )\n",
" if text_detection_output not in existing_video_annotations:\n",
" detect_text(video_location, text_detection_output)\n",
" else:\n",
" print(\n",
" f\"Text annotations for video {video_name_with_format} already exist, API request skipped.\\n\"\n",
" )\n",
"\n",
" # Logo Detection\n",
" logo_detection_output = (\n",
" f\"gs://{bucket_name}/{brand_name}/annotations/{video_name}/logo-detection.json\"\n",
" )\n",
" if logo_detection_output not in existing_video_annotations:\n",
" detect_logos(video_location, logo_detection_output)\n",
" else:\n",
" print(\n",
" f\"Logo annotations for video {video_name_with_format} already exist, API request skipped.\\n\"\n",
" )\n",
"\n",
" # Speech Detection\n",
" speech_detection_output = f\"gs://{bucket_name}/{brand_name}/annotations/{video_name}/speech-detection.json\"\n",
" if speech_detection_output not in existing_video_annotations:\n",
" detect_speech(video_location, speech_detection_output)\n",
" else:\n",
" print(\n",
" f\"Speech annotations for video {video_name_with_format} already exist, API request skipped.\\n\"\n",
" )\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "jQTAVTvlexxY"
},
"source": [
"#### Label Detection\n",
"\n",
"The Video Intelligence API can identify entities shown in video footage using the LABEL_DETECTION feature. This feature identifies objects, locations, activities, animal species, products, and more.\n",
"\n",
"For more information visit the official Google Cloud documentation: [https://cloud.google.com/video-intelligence/docs/analyze-labels]\n",
"\n",
"Note: No output is expected from this cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "bGkJcEUhe2_Q"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_labels(input_gs_file_name: str, output_gs_file_name: str) -> None:\n",
" \"\"\"Detect labels in a video\n",
" Args:\n",
" input_gs_file_name: gcs bucket where the video is located\n",
" output_gs_file_name: gcs bucket output for the video annotations\n",
" \"\"\"\n",
" video_client = videointelligence.VideoIntelligenceServiceClient()\n",
"\n",
" features = [videointelligence.Feature.LABEL_DETECTION]\n",
" operation = video_client.annotate_video(\n",
" request={\n",
" \"features\": features,\n",
" \"input_uri\": input_gs_file_name,\n",
" \"output_uri\": output_gs_file_name,\n",
" }\n",
" )\n",
" print(f\"\\nProcessing video {input_gs_file_name} for label annotations...\")\n",
"\n",
" result = operation.result(timeout=800)\n",
"\n",
" print(\n",
" f\"\\nFinished processing video {input_gs_file_name} for label annotations...\\n\"\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Z9nSDCz3e5Cu"
},
"source": [
"#### Face Detection\n",
"\n",
"The Video Intelligence API Face detection feature looks for faces in a video.\n",
"\n",
"For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/face-detection\n",
"\n",
"Note: No output is expected from this cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "YAX5YQg6e9nX"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_faces(input_gs_file_name: str, output_gs_file_name: str) -> None:\n",
" \"\"\"Detects faces in a video.\n",
" Args:\n",
" input_gs_file_name: gcs bucket where the video is located\n",
" output_gs_file_name: gcs bucket output for the video annotations\n",
" \"\"\"\n",
"\n",
" video_client = videointelligence.VideoIntelligenceServiceClient()\n",
"\n",
" # Configure the request\n",
" config = videointelligence.FaceDetectionConfig(\n",
" include_bounding_boxes=True, include_attributes=True\n",
" )\n",
" context = videointelligence.VideoContext(face_detection_config=config)\n",
"\n",
" # Start the asynchronous request\n",
" operation = video_client.annotate_video(\n",
" request={\n",
" \"features\": [videointelligence.Feature.FACE_DETECTION],\n",
" \"input_uri\": input_gs_file_name,\n",
" \"output_uri\": output_gs_file_name,\n",
" \"video_context\": context,\n",
" }\n",
" )\n",
"\n",
" print(f\"\\nProcessing video {input_gs_file_name} for face annotations...\")\n",
"\n",
" result = operation.result(timeout=800)\n",
"\n",
" print(f\"\\nFinished processing video {input_gs_file_name} for face annotations...\\n\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "lSu26Nmle_6T"
},
"source": [
"#### People Detection\n",
"\n",
"Video Intelligence can detect the presence of humans in a video file and track individuals across a video or video segment.\n",
"\n",
"For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/people-detection\n",
"\n",
"Note: No output is expected from this cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "bIcleXqje_OI"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_people(input_gs_file_name: str, output_gs_file_name: str) -> None:\n",
" \"\"\"Detects people in a video\n",
" Args:\n",
" input_gs_file_name: gcs bucket where the video is located\n",
" output_gs_file_name: gcs bucket output for the video annotations\n",
" \"\"\"\n",
" video_client = videointelligence2.VideoIntelligenceServiceClient()\n",
"\n",
" # Configure the request\n",
" config = videointelligence2.types.PersonDetectionConfig(\n",
" include_bounding_boxes=True,\n",
" include_attributes=True,\n",
" include_pose_landmarks=True,\n",
" )\n",
" context = videointelligence2.types.VideoContext(person_detection_config=config)\n",
"\n",
" # Start the asynchronous request\n",
" operation = video_client.annotate_video(\n",
" request={\n",
" \"features\": [videointelligence2.Feature.PERSON_DETECTION],\n",
" \"input_uri\": input_gs_file_name,\n",
" \"video_context\": context,\n",
" \"output_uri\": output_gs_file_name,\n",
" }\n",
" )\n",
"\n",
" print(f\"\\nProcessing video {input_gs_file_name} for people annotations...\")\n",
"\n",
" result = operation.result(timeout=800)\n",
"\n",
" print(\n",
" f\"\\nFinished processing video {input_gs_file_name} for people annotations...\\n\"\n",
" )\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "oUX_iDObfJA0"
},
"source": [
"#### Shot Detection\n",
"Shot change analysis detects shot changes in a video.\n",
"\n",
"For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/analyze-shots\n",
"\n",
"Note: No output is expected from this cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ZdYWtNGyfIkO"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_shots(input_gs_file_name: str, output_gs_file_name: str) -> None:\n",
" \"\"\"Detects camera shot changes in a video.\n",
" Args:\n",
" input_gs_file_name: gcs bucket where the video is located\n",
" output_gs_file_name: gcs bucket output for the video annotations\n",
" \"\"\"\n",
" video_client = videointelligence.VideoIntelligenceServiceClient()\n",
" features = [videointelligence.Feature.SHOT_CHANGE_DETECTION]\n",
" operation = video_client.annotate_video(\n",
" request={\n",
" \"features\": features,\n",
" \"input_uri\": input_gs_file_name,\n",
" \"output_uri\": output_gs_file_name,\n",
" }\n",
" )\n",
" print(f\"\\nProcessing video {input_gs_file_name} for shot annotations...\")\n",
"\n",
" result = operation.result(timeout=800)\n",
"\n",
" print(f\"\\nFinished processing video {input_gs_file_name} for shot annotations...\\n\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Awp8MmgUfNdQ"
},
"source": [
"#### Object Detection\n",
"\n",
"Object tracking tracks objects detected in an input video. To make an object tracking request, call the annotate method and specify OBJECT_TRACKING in the features field.\n",
"\n",
"For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/object-tracking\n",
"\n",
"Note: No output is expected from this cell.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "nq_kwJ64fNVv"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_objects(input_gs_file_name: str, output_gs_file_name: str) -> None:\n",
" \"\"\"Detects objects in a video.\n",
" Args:\n",
" input_gs_file_name: gcs bucket where the video is located\n",
" output_gs_file_name: gcs bucket output for the video annotations\n",
" \"\"\"\n",
" video_client = videointelligence.VideoIntelligenceServiceClient()\n",
" features = [videointelligence.Feature.OBJECT_TRACKING]\n",
" operation = video_client.annotate_video(\n",
" request={\n",
" \"features\": features,\n",
" \"input_uri\": input_gs_file_name,\n",
" \"output_uri\": output_gs_file_name,\n",
" }\n",
" )\n",
" print(f\"\\nProcessing video {input_gs_file_name} for object annotations...\")\n",
"\n",
" result = operation.result(timeout=800)\n",
"\n",
" print(\n",
" f\"\\nFinished processing video {input_gs_file_name} for object annotations...\\n\"\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "hMX1en7LfTEG"
},
"source": [
"#### Text Detection\n",
"Text Detection performs Optical Character Recognition (OCR), which detects and extracts text within an input video.\n",
"\n",
"Text detection is available for all the languages supported by the Cloud Vision API.\n",
"\n",
"For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/text-detection\n",
"\n",
"Note: No output is expected from this cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "kd9MnDtTfS7G"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_text(input_gs_file_name: str, output_gs_file_name: str) -> None:\n",
" \"\"\"Detects text in a video.\n",
" Args:\n",
" input_gs_file_name: gcs bucket where the video is located\n",
" output_gs_file_name: gcs bucket output for the video annotations\n",
" \"\"\"\n",
" video_client = videointelligence.VideoIntelligenceServiceClient()\n",
" features = [videointelligence.Feature.TEXT_DETECTION]\n",
"\n",
" operation = video_client.annotate_video(\n",
" request={\n",
" \"features\": features,\n",
" \"input_uri\": input_gs_file_name,\n",
" \"output_uri\": output_gs_file_name,\n",
" }\n",
" )\n",
"\n",
" print(f\"\\nProcessing video {input_gs_file_name} for text annotations...\")\n",
"\n",
" result = operation.result(timeout=800)\n",
"\n",
" print(f\"\\nFinished processing video {input_gs_file_name} for text annotations...\\n\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "7g_ythv1fWZn"
},
"source": [
"#### Logo Detection\n",
"The Video Intelligence API can detect, track, and recognize the presence of over 100,000 brands and logos in video content.\n",
"\n",
"For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/logo-recognition\n",
"\n",
"Note: No output is expected from this cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "3qcEPHRTfWQP"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_logos(input_gs_file_name: str, output_gs_file_name: str) -> None:\n",
" \"\"\"Detect logos in a video.\n",
" Args:\n",
" input_gs_file_name: gcs bucket where the video is located\n",
" output_gs_file_name: gcs bucket output for the video annotations\n",
" \"\"\"\n",
" video_client = videointelligence.VideoIntelligenceServiceClient()\n",
" features = [videointelligence.Feature.LOGO_RECOGNITION]\n",
"\n",
" operation = video_client.annotate_video(\n",
" request={\n",
" \"features\": features,\n",
" \"input_uri\": input_gs_file_name,\n",
" \"output_uri\": output_gs_file_name,\n",
" }\n",
" )\n",
"\n",
" print(f\"\\nProcessing video {input_gs_file_name} for logo annotations...\")\n",
"\n",
" response = operation.result(timeout=800)\n",
"\n",
" print(f\"\\nFinished processing video {input_gs_file_name} for logo annotations...\\n\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "1qa_ADmMfat6"
},
"source": [
"#### Speech Detection\n",
"\n",
"The Video Intelligence API transcribes speech to text from supported video files. There are two supported models, \"default\" and \"video.\"\n",
"\n",
"For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/transcription\n",
"\n",
"Note: No output is expected from this cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "-d5R-mMGfalm"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_speech(input_gs_file_name: str, output_gs_file_name: str) -> None:\n",
" \"\"\"Detects speech in a video.\n",
" Args:\n",
" input_gs_file_name: gcs bucket where the video is located\n",
" output_gs_file_name: gcs bucket output for the video annotations\n",
" \"\"\"\n",
"\n",
" video_client = videointelligence.VideoIntelligenceServiceClient()\n",
" features = [videointelligence.Feature.SPEECH_TRANSCRIPTION]\n",
"\n",
" config = videointelligence.SpeechTranscriptionConfig(\n",
" language_code=\"en-US\", enable_automatic_punctuation=True\n",
" )\n",
" video_context = videointelligence.VideoContext(speech_transcription_config=config)\n",
"\n",
" operation = video_client.annotate_video(\n",
" request={\n",
" \"features\": features,\n",
" \"input_uri\": input_gs_file_name,\n",
" \"output_uri\": output_gs_file_name,\n",
" \"video_context\": video_context,\n",
" }\n",
" )\n",
"\n",
" print(f\"\\nProcessing video {input_gs_file_name} for speech annotations...\")\n",
"\n",
" result = operation.result(timeout=800)\n",
"\n",
" print(\n",
" f\"\\nFinished processing video {input_gs_file_name} for speech annotations...\\n\"\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5PNuYy3eMZTH"
},
"source": [
"#### Misc. ABCD Helpers"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "vzvScZ_La0I0"
},
"outputs": [],
"source": [
"### DO NOT EDIT, vars built from user's input ###\n",
"def convert_string_to_list(list_str: str):\n",
" \"\"\"Converts a string to a list and\n",
" removes white spaces from strings in list\n",
" Args:\n",
" list_str\n",
" \"\"\"\n",
" cleaned_list = []\n",
" for item in list_str.split(\",\"):\n",
" cleaned_list.append(item.strip())\n",
" return cleaned_list\n",
"\n",
"brand_variations = convert_string_to_list(brand_variations_str)\n",
"brand_variations.append(brand_name)\n",
"branded_products = convert_string_to_list(branded_products_str)\n",
"branded_products_categories = convert_string_to_list(branded_products_categories_str)\n",
"branded_call_to_actions = convert_string_to_list(branded_call_to_actions_str)\n",
"\n",
"if VERBOSE:\n",
" print(\"ABCD Detector parameters:\")\n",
" print(f\"Brand Variations: {brand_variations}\")\n",
" print(f\"Brand products: {branded_products}\")\n",
" print(f\"Brand categories: {branded_products_categories}\")\n",
" print(f\"Brand call to actions: {branded_call_to_actions}\")\n",
"\n",
"llm_generation_config = {\n",
" \"max_output_tokens\": max_output_tokens,\n",
" \"temperature\": temperature,\n",
" \"top_p\": top_p,\n",
" \"top_k\": top_k,\n",
"}\n",
"\n",
"context_and_examples = \"\"\"Only base your answers strictly on what information is available in the video attached.\n",
"Do not make up any information that is not part of the video.\n",
"Explain in a very detailed way the reasoning behind your answer.\n",
"Please present the extracted information in a VALID JSON format like this:\n",
"{\n",
" \"feature_detected\": \"True/False\",\n",
" \"explanation\": \"...\"\n",
"}\n",
"\"\"\"\n",
"\n",
"\n",
"\n",
"\n",
"def calculate_time_seconds(part_obj: dict, part: str) -> float:\n",
" \"\"\"Calculate time of the provided part of the video\n",
" Args:\n",
" part_obj: part of the video to calculate the time\n",
" part: either start_time_offset or end_time_offset\n",
" Returns:\n",
" time_seconds: the time in seconds\n",
" \"\"\"\n",
" if part not in part_obj:\n",
" if VERBOSE:\n",
" print(f\"There is no part time {part} in {part_obj}\")\n",
" return 0\n",
" time_seconds = (\n",
" (part_obj.get(part).get(\"seconds\") or 0)\n",
" + ((part_obj.get(part).get(\"microseconds\") or 0) / 1e6)\n",
" + ((part_obj.get(part).get(\"nanos\") or 0) / 1e9)\n",
" )\n",
" return time_seconds\n",
"\n",
"\n",
"def detected_text_in_first_5_seconds(annotation: dict) -> tuple[bool, any]:\n",
" \"\"\"Detect if the text feature appears in the first 5 seconds\n",
" Args:\n",
" annotation: the text annotation\n",
" Returns:\n",
" True if the text is found in the 1st 5 secs, False otherwise\n",
" frame: the frame where the feature was found\n",
" \"\"\"\n",
" for segment in annotation.get(\"segments\"):\n",
" start_time_secs = calculate_time_seconds(\n",
" segment.get(\"segment\"), \"start_time_offset\"\n",
" )\n",
" if start_time_secs > early_time_seconds:\n",
" continue # Ignore a segment > 5 secs\n",
" frames = segment.get(\"frames\")\n",
" for frame in frames:\n",
" start_time_seconds = calculate_time_seconds(frame, \"time_offset\")\n",
" if start_time_seconds <= early_time_seconds:\n",
" return True, frame\n",
" return False, None\n",
"\n",
"\n",
"def find_elements_in_transcript(\n",
" speech_transcriptions: list[dict],\n",
" elements: list[str],\n",
" elements_categories: list[str],\n",
" apply_condition: bool,\n",
") -> tuple[bool, bool]:\n",
" \"\"\"Finds a list of elements in the video transcript\n",
" Args:\n",
" speech_transcriptions: the speech annotations\n",
" elements: list of elements to find in the transcript\n",
" elements_categories: list of element categories to find in the transcript\n",
" apply_condition: flag to filter out text with less than x chars. This is\n",
" only needed when elements come from text annotations since words are\n",
" sometimes 1 character only.\n",
" Returns:\n",
" True if the elements are found, False otherwise\n",
" \"\"\"\n",
" words_1st_5_secs = []\n",
" element_mention_speech = False\n",
" element_mention_speech_1st_5_secs = False\n",
" for speech_transcription in speech_transcriptions:\n",
" # The number of alternatives for each transcription is limited by\n",
" # SpeechTranscriptionConfig.max_alternatives.\n",
" # Each alternative is a different possible transcription\n",
" # and has its own confidence score.\n",
" for alternative in speech_transcription.get(\"alternatives\"):\n",
" # Check confidence against user defined threshold\n",
" if alternative and alternative.get(\"confidence\") >= confidence_threshold:\n",
" transcript = alternative.get(\"transcript\")\n",
" # Check if elements or elements categories are found in transcript\n",
" if apply_condition:\n",
" found_elements = find_text_annotation_elements_in_transcript(\n",
" elements, transcript\n",
" )\n",
" else:\n",
" found_elements = [\n",
" element\n",
" for element in elements\n",
" if element.lower() in transcript.lower()\n",
" ]\n",
" found_elements_categories = [\n",
" elements_category\n",
" for elements_category in elements_categories\n",
" if elements_category.lower() in transcript.lower()\n",
" ]\n",
" if len(found_elements) > 0 or len(found_elements_categories) > 0:\n",
" element_mention_speech = True\n",
" # For 1st 5 secs, check elements and elements_categories in words\n",
" # since only the words[] contain times\n",
" words = alternative.get(\"words\") if \"words\" in alternative else []\n",
" # Sort words by time to construct correct transcript later\n",
" sorted_words = sorted(\n",
" words,\n",
" key=lambda x: calculate_time_seconds(x, \"start_time\"),\n",
" reverse=False,\n",
" )\n",
" for word_info in sorted_words:\n",
" start_time_secs = calculate_time_seconds(word_info, \"start_time\")\n",
" # Consider only words in the 1st 5 secs\n",
" if start_time_secs <= early_time_seconds:\n",
" words_1st_5_secs.append(word_info.get(\"word\"))\n",
"\n",
" # Evaluate 1st 5 secs - Construct transcript from words\n",
" transcript_1st_5_secs = \" \".join(words_1st_5_secs)\n",
" if apply_condition:\n",
" found_elements_1st_5_seconds = find_text_annotation_elements_in_transcript(\n",
" elements, transcript_1st_5_secs\n",
" )\n",
" else:\n",
" found_elements_1st_5_seconds = [\n",
" element\n",
" for element in elements\n",
" if element.lower() in transcript_1st_5_secs.lower()\n",
" ]\n",
" found_elements_categories_1st_5_seconds = [\n",
" elements_category\n",
" for elements_category in elements_categories\n",
" if elements_category.lower() in transcript_1st_5_secs.lower()\n",
" ]\n",
" if (\n",
" len(found_elements_1st_5_seconds) > 0\n",
" or len(found_elements_categories_1st_5_seconds) > 0\n",
" ):\n",
" element_mention_speech_1st_5_secs = True\n",
"\n",
" return element_mention_speech, element_mention_speech_1st_5_secs\n",
"\n",
"\n",
"def find_text_annotation_elements_in_transcript(elements: list[str], transcript: str):\n",
" \"\"\"Checks if text annotation elements in an array are found in transcript\n",
" Args:\n",
" elements: list of elements to find in the transcript\n",
" transcript: the transcript to find the elements in\n",
" This is only needed when elements come from text annotations since\n",
" words are sometimes 1 character only.\n",
" \"\"\"\n",
" found_elements = [\n",
" element\n",
" for element in elements\n",
" # filter out words with less than 3 chars? - DONE\n",
" if len(element) > 3 and element.lower() in transcript.lower()\n",
" ]\n",
" return found_elements\n",
"\n",
"\n",
"def get_speech_transcript(speech_transcriptions: list[dict]) -> str:\n",
" \"\"\"Get transcript built from transcript alternatives\n",
" Args:\n",
" speech_transcriptions: the speech annotations\n",
" Returns\n",
" final_transcript: the constructured transcript\n",
" \"\"\"\n",
" transcript_alternatives = []\n",
" transcript_alt_confidence = []\n",
" for speech_transcription in speech_transcriptions:\n",
" # The number of alternatives for each transcription is limited by\n",
" # SpeechTranscriptionConfig.max_alternatives.\n",
" # Each alternative is a different possible transcription\n",
" # and has its own confidence score.\n",
" for alternative in speech_transcription.get(\"alternatives\"):\n",
" # Check confidence against user defined threshold\n",
" transcript = alternative.get(\"transcript\")\n",
" if alternative and alternative.get(\"confidence\") >= confidence_threshold:\n",
" transcript_alternatives.append(transcript)\n",
" transcript_alt_confidence.append(alternative)\n",
"\n",
" sorted_transcript_by_confidence = sorted(\n",
" transcript_alt_confidence,\n",
" key=lambda x: x.get(\"confidence\"),\n",
" reverse=True,\n",
" ) # don't use this for now\n",
" highest_confidence_trascript = (\n",
" sorted_transcript_by_confidence[0].get(\"transcript\")\n",
" if len(sorted_transcript_by_confidence) > 0\n",
" else \"\"\n",
" ) # don't use this for now\n",
" final_transcript = \" \".join(transcript_alternatives)\n",
" return final_transcript\n",
"\n",
"\n",
"def get_speech_transcript_1st_5_secs(speech_transcriptions: list[dict]):\n",
" \"\"\"Get transcript with highest confidence\n",
" Args:\n",
" speech_transcriptions: the speech annotations\n",
" Returns\n",
" transcript_1st_5_secs: the transcript in the 1st 5 secs\n",
" \"\"\"\n",
" words_1st_5_secs = []\n",
" for speech_transcription in speech_transcriptions:\n",
" # The number of alternatives for each transcription is limited by\n",
" # SpeechTranscriptionConfig.max_alternatives.\n",
" # Each alternative is a different possible transcription\n",
" # and has its own confidence score.\n",
" for alternative in speech_transcription.get(\"alternatives\"):\n",
" # Check confidence against user defined threshold\n",
" if alternative and alternative.get(\"confidence\") >= confidence_threshold:\n",
" # For 1st 5 secs get transcript from words\n",
" # since only the words[] contain times\n",
" words = alternative.get(\"words\") if \"words\" in alternative else []\n",
" # Sort words by time to construct correct transcript later\n",
" sorted_words = sorted(\n",
" words,\n",
" key=lambda x: calculate_time_seconds(x, \"start_time\"),\n",
" reverse=False,\n",
" )\n",
" for word_info in sorted_words:\n",
" start_time_secs = calculate_time_seconds(word_info, \"start_time\")\n",
" # Consider only words in the 1st 5 secs\n",
" if start_time_secs <= early_time_seconds:\n",
" words_1st_5_secs.append(word_info.get(\"word\"))\n",
" # Construct transcript from words\n",
" transcript_1st_5_secs = \" \".join(words_1st_5_secs)\n",
" return transcript_1st_5_secs\n",
"\n",
"\n",
"def get_existing_annotations_from_gcs(brand_name: str) -> list[str]:\n",
" \"\"\"Get existing annotations from Cloud Storage\n",
" Args:\n",
" brand_name: the parent folder in Cloud Storage\n",
" Returns:\n",
" video_annotations: array of annotation url/names\n",
" \"\"\"\n",
" bucket = get_bucket()\n",
" blobs = bucket.list_blobs(prefix=f\"{brand_name}/annotations/\")\n",
" video_annotations = []\n",
" for blob in blobs:\n",
" video_annotations.append(f\"gs://{bucket_name}/{blob.name}\")\n",
" return video_annotations\n",
"\n",
"\n",
"def download_video_annotations(\n",
" brand_name: str, video_name: str\n",
") -> tuple[dict, dict, dict, dict, dict, dict, dict]:\n",
" \"\"\"Download video annotations from Google Cloud Storage\n",
" Args:\n",
" brand_name: the brand to generate the video annotations for\n",
" video_name: Full video name\n",
" Returns:\n",
" text_annotation_results (tuple): Text annotations tuple\n",
" \"\"\"\n",
" annotation_location = f\"{brand_name}/annotations/{video_name}\"\n",
" bucket = get_bucket()\n",
"\n",
" # Label Annotations\n",
" blob_label = bucket.blob(f\"{annotation_location}/label-detection.json\")\n",
" data_label = json.loads(blob_label.download_as_string(client=None))\n",
" # Get label annotations. The first result is retrieved because a single video was processed.\n",
" label_annotation_results = data_label.get(\"annotation_results\")[0]\n",
"\n",
" # Face Annotations\n",
" blob_face = bucket.blob(f\"{annotation_location}/face-detection.json\")\n",
" data_face = json.loads(blob_face.download_as_string(client=None))\n",
" # Get face annotations. The first result is retrieved because a single video was processed.\n",
" face_annotation_results = data_face.get(\"annotation_results\")[0]\n",
"\n",
" # People Annotations\n",
" blob_people = bucket.blob(f\"{annotation_location}/people-detection.json\")\n",
" data_people = json.loads(blob_people.download_as_string(client=None))\n",
" # Get people annotations. The first result is retrieved because a single video was processed.\n",
" people_annotation_results = data_people.get(\"annotation_results\")[0]\n",
"\n",
" # Shot Annotations\n",
" blob_shot = bucket.blob(f\"{annotation_location}/shot-detection.json\")\n",
" data_shot = json.loads(blob_shot.download_as_string(client=None))\n",
" # Get logo annotations. The first result is retrieved because a single video was processed.\n",
" shot_annotation_results = data_shot.get(\"annotation_results\")[0]\n",
"\n",
" # Text Annotations\n",
" blob_text = bucket.blob(f\"{annotation_location}/text-detection.json\")\n",
" data_text = json.loads(blob_text.download_as_string(client=None))\n",
" # Get text annotations. The first result is retrieved because a single video was processed.\n",
" text_annotation_results = data_text.get(\"annotation_results\")[0]\n",
"\n",
" # Logo Annotations\n",
" blob_logo = bucket.blob(f\"{annotation_location}/logo-detection.json\")\n",
" data_logo = json.loads(blob_logo.download_as_string(client=None))\n",
" # Get logo annotations. The first result is retrieved because a single video was processed.\n",
" logo_annotation_results = data_logo.get(\"annotation_results\")[0]\n",
"\n",
" # Speech Annotations\n",
" blob_speech = bucket.blob(f\"{annotation_location}/speech-detection.json\")\n",
" data_speech = json.loads(blob_speech.download_as_string(client=None))\n",
" # Get speech annotations. The first result is retrieved because a single video was processed.\n",
" speech_annotation_results = data_speech.get(\"annotation_results\")[0]\n",
"\n",
" return (\n",
" label_annotation_results,\n",
" face_annotation_results,\n",
" people_annotation_results,\n",
" shot_annotation_results,\n",
" text_annotation_results,\n",
" logo_annotation_results,\n",
" speech_annotation_results,\n",
" )\n",
"\n",
"class LLMParameters:\n",
" \"\"\"Class that represents the required params to make a prediction to the LLM\"\"\"\n",
"\n",
" model_name: str\n",
" location: str\n",
" modality: dict\n",
" generation_config: dict = { # Default model config\n",
" \"max_output_tokens\": 2048,\n",
" \"temperature\": 0.5,\n",
" \"top_p\": 1,\n",
" \"top_k\": 40,\n",
" }\n",
"\n",
" def __init__(\n",
" self,\n",
" model_name: str,\n",
" location: str,\n",
" generation_config: dict,\n",
" modality: dict = None,\n",
" ):\n",
" self.model_name = model_name\n",
" self.location = location\n",
" self.generation_config = generation_config\n",
" self.modality = modality\n",
"\n",
" def set_modality(self, modality: dict) -> None:\n",
" \"\"\"Sets the modal to use in the LLM\n",
" The modality object changes depending on the type.\n",
" For video:\n",
" {\n",
" \"type\": \"video\", # prompt is handled separately\n",
" \"video_uri\": \"\"\n",
" }\n",
" For text:\n",
" {\n",
" \"type\": \"text\" # prompt is handled separately\n",
" }\n",
" \"\"\"\n",
" self.modality = modality\n",
"\n",
"\n",
"class VertexAIService:\n",
" \"\"\"Vertex AI Service to leverage the Vertex APIs for inference\"\"\"\n",
"\n",
" def __init__(self, project_id: str):\n",
" self.project_id = project_id\n",
"\n",
" def execute_gemini_model(self, prompt: str, params: LLMParameters) -> str:\n",
" \"\"\"Makes a request to Gemini to get a prediction based on the provided prompt\n",
" and multi-modal params\n",
" Args:\n",
" prompt: a string with the prompt for LLM\n",
" params: llm params model_name, location, modality and generation_config\n",
" Returns:\n",
" response.text: a string with the generated response\n",
" \"\"\"\n",
" retries = 4\n",
" for this_retry in range(retries):\n",
" try:\n",
" vertexai.init(project=self.project_id, location=params.location)\n",
" model = GenerativeModel(params.model_name)\n",
" modality_params = self._get_modality_params(prompt, params)\n",
" response = model.generate_content(\n",
" modality_params,\n",
" generation_config=params.generation_config,\n",
" safety_settings={\n",
" generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,\n",
" generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,\n",
" generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,\n",
" generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,\n",
" },\n",
" stream=False,\n",
" )\n",
" return response.text if response else \"\"\n",
" except AttributeError as ex:\n",
" error_message = str(ex)\n",
" if (\n",
" this_retry == retries - 1\n",
" or \"Content has no parts\" not in error_message\n",
" ):\n",
" # Raise exception for other attribute errors\n",
" raise\n",
" # Retry request\n",
" if \"Content has no parts\" in error_message:\n",
" print(\n",
" f\"Error: {ex} Gemini might be blocking the response due to safety issues.\\n\"\n",
" )\n",
" wait = 10 * 2**this_retry\n",
" time.sleep(wait)\n",
" except Exception as ex:\n",
" print(\"GENERAL EXCEPTION...\\n\")\n",
" error_message = str(ex)\n",
" # Check quota issues for now\n",
" if (\n",
" this_retry == retries - 1\n",
" or \"429 Quota exceeded\" not in error_message\n",
" or \"503 The service is currently unavailable\" not in error_message\n",
" or \"500 Internal error encountered\" not in error_message\n",
" or \"403\" not in error_message\n",
" ):\n",
" if VERBOSE:\n",
" print(f\"{error_message}\\n\")\n",
" # Raise exception for non-retriable errors\n",
" raise\n",
" # Retry request\n",
" if VERBOSE:\n",
" print(\n",
" f\"Error {error_message}. Retrying {retries - 1} times using exponential backoff. Retry number {this_retry}...\\n\"\n",
" )\n",
" wait = 10 * 2**this_retry\n",
" time.sleep(wait)\n",
" return \"\"\n",
"\n",
" def _get_modality_params(self, prompt: str, params: LLMParameters) -> list[any]:\n",
" \"\"\"Build the modality params based on the type of llm capability to use\n",
" Args:\n",
" prompt: a string with the prompt for LLM\n",
" model_params: the model params for inference, see defaults above\n",
" Returns:\n",
" modality_params: list of modality params based on the model capability to use\n",
" \"\"\"\n",
" if params.modality[\"type\"] == \"video\":\n",
" mime_type = f\"video/{get_video_format(params.modality['video_uri'])}\"\n",
" video = Part.from_uri(uri=params.modality[\"video_uri\"], mime_type=mime_type)\n",
" return [video, prompt]\n",
" elif params.modality[\"type\"] == \"text\":\n",
" return [prompt]\n",
" return []\n",
"\n",
"\n",
"def get_vertex_ai_service():\n",
" \"\"\"Gets Vertex AI service to interact with Gemini\"\"\"\n",
" vertex_ai_service = VertexAIService(project_id)\n",
" return vertex_ai_service\n",
"\n",
"\n",
"def detect_feature_with_llm(\n",
" feature: str, prompt: str, llm_params: LLMParameters\n",
") -> tuple[bool, str]:\n",
" \"\"\"Detect feature using LLM\n",
" Args:\n",
" feature: the feature to evaluate\n",
" prompt: prompt for the llm\n",
" llm_params: object with llm params\n",
" Returns:\n",
" feature_detected: True if the feature is detected, False otherwise\n",
" \"\"\"\n",
" try:\n",
" vertex_ai_service = get_vertex_ai_service()\n",
" if llm_params.model_name == GEMINI_PRO:\n",
" # Gemini 1.5 does not support top_k param\n",
" if \"top_k\" in llm_params.generation_config:\n",
" del llm_params.generation_config[\"top_k\"]\n",
" llm_response = vertex_ai_service.execute_gemini_model(\n",
" prompt=prompt, params=llm_params\n",
" )\n",
" else:\n",
" print(f\"LLM {llm_params.model_name} not supported.\")\n",
" return False\n",
" # Parse response\n",
" llm_response_json = json.loads(clean_llm_response(llm_response))\n",
" if (\n",
" \"feature_detected\" in llm_response_json\n",
" and \"explanation\" in llm_response_json\n",
" ):\n",
" if VERBOSE:\n",
" print(\"***Powered by LLMs***\")\n",
" print(\n",
" f\"Feature detected: {feature}: {llm_response_json.get('feature_detected')}\"\n",
" )\n",
" print(f\"Explanation: {llm_response_json.get('explanation')}\\n\")\n",
" feature_detected = (\n",
" llm_response_json.get(\"feature_detected\") == \"True\"\n",
" or llm_response_json.get(\"feature_detected\") == \"true\"\n",
" )\n",
" return feature_detected, llm_response_json.get(\"explanation\")\n",
" else:\n",
" if VERBOSE:\n",
" print(\"***Powered by LLMs***\")\n",
" print(\n",
" \"JSON parse was successful but the JSON keys: feature_detected and explanation were not found.\"\n",
" )\n",
" print(\"Using string version...\\n\")\n",
" print(llm_response)\n",
" feature_detected = is_feature_detected(llm_response)\n",
" return feature_detected, llm_response\n",
" except json.JSONDecodeError as ex:\n",
" if VERBOSE:\n",
" print(f\"LLM response could not be parsed. Error: {ex}.\\n\")\n",
" print(\"Using string version...\\n\")\n",
" if llm_response:\n",
" print(\"***Powered by LLMs***\")\n",
" print(f\"{feature}: {llm_response}\")\n",
" except Exception as ex:\n",
" print(ex)\n",
" raise\n",
" feature_detected = is_feature_detected(llm_response)\n",
" return feature_detected, llm_response\n",
"\n",
"def is_feature_detected(llm_response: str):\n",
" \"\"\"Checks if feature is detected\n",
"\n",
" Args:\n",
" llm_response: string llm respose\n",
" Returns:\n",
" detected: whether the feature was detected or not\n",
"\n",
" \"\"\"\n",
" detected = llm_response and (\n",
" '\"feature_detected\" : \"True\"' in llm_response\n",
" or '\"feature_detected\" : \"true\"' in llm_response\n",
" or '\"feature_detected\": \"True\"' in llm_response\n",
" or '\"feature_detected\": \"true\"' in llm_response\n",
" )\n",
" return detected\n",
"\n",
"\n",
"def clean_llm_response(response: str) -> str:\n",
" \"\"\"Cleans LLM response\n",
" Args:\n",
" response: llm response to clean\n",
" Returns:\n",
" reponse: without extra characters\n",
" \"\"\"\n",
" return response.replace(\"```\", \"\").replace(\"json\", \"\")\n",
"\n",
"def get_bucket() -> any:\n",
" \"\"\"Builds GCS bucket\"\"\"\n",
" # Init cloud storage bucket\n",
" storage_client = storage.Client()\n",
" bucket = storage_client.get_bucket(bucket_name)\n",
" return bucket\n",
"\n",
"\n",
"# Knowledge Graph module\n",
"\n",
"\n",
"def get_knowledge_graph_entities(queries: list[str]) -> dict[str, dict]:\n",
" \"\"\"Get the knowledge Graph Entities for a list of queries\n",
" Args:\n",
" queries: a list of entities to find in KG\n",
" Returns:\n",
" kg_entities: entities found in KG\n",
" Format example: entity id is the key and entity details the value\n",
" kg_entities = {\n",
" \"mcy/12\": {} (ae) add here\n",
" }\n",
" \"\"\"\n",
" kg_entities = {}\n",
" try:\n",
" for query in queries:\n",
" service_url = \"https://kgsearch.googleapis.com/v1/entities:search\"\n",
" params = {\n",
" \"query\": query,\n",
" \"limit\": 10,\n",
" \"indent\": True,\n",
" \"key\": KNOWLEDGE_GRAPH_API_KEY,\n",
" }\n",
" url = f\"{service_url}?{urllib.parse.urlencode(params)}\"\n",
" response = json.loads(urllib.request.urlopen(url).read())\n",
" for element in response[\"itemListElement\"]:\n",
" kg_entity_name = element[\"result\"][\"name\"]\n",
" # To only add the exact KG entity\n",
" if query.lower() == kg_entity_name.lower():\n",
" kg_entities[element[\"result\"][\"@id\"][3:]] = element[\"result\"]\n",
" return kg_entities\n",
" except Exception as ex:\n",
" print(\n",
" f\"\\n\\x1b[31mERROR: There was an error fetching the Knowledge Graph entities. Please check that your API key is correct. ERROR: {ex}\\x1b[0m\"\n",
" )\n",
" raise\n",
"\n",
"\n",
"def get_file_name_from_gcs_url(gcs_url: str) -> tuple[str]:\n",
" \"\"\"Get file name from GCS url\n",
" Args:\n",
" gcs_url: the gcs url with the file name\n",
" Returns:\n",
" file_name_with_format: the file name with its format\n",
" file_name: the file name\n",
" \"\"\"\n",
" url_parts = gcs_url.split(\"/\")\n",
" if len(url_parts) == 3:\n",
" file_name = url_parts[2].split(\".\")[0]\n",
" file_name_with_format = url_parts[2]\n",
" return file_name, file_name_with_format\n",
" return \"\"\n",
"\n",
"\n",
"def get_video_format(video_location: str):\n",
" \"\"\"Gets video format from gcs url\n",
" Args:\n",
" video_location: gcs video location\n",
" Returns:\n",
" video_format: video format\n",
" \"\"\"\n",
" gcs_parts = video_location.split(\".\")\n",
" if len(gcs_parts) == 2:\n",
" video_format = gcs_parts[1]\n",
" return video_format\n",
" return \"\"\n",
"\n",
"\n",
"def get_n_secs_video_uri_from_uri(video_uri: str, new_name_part: str):\n",
" \"\"\"Get uri for the n seconds video\n",
" Args:\n",
" video_uri: str\n",
" Return:\n",
" video_name_n_secs\n",
" \"\"\"\n",
" gcs_parts = video_uri.split(\".\")\n",
" if len(gcs_parts) == 2:\n",
" video_format = gcs_parts[1]\n",
" long_video_name_parts = gcs_parts[0].split(\"/\")\n",
" if len(long_video_name_parts) == 6:\n",
" gcs = long_video_name_parts[0]\n",
" bucket_name = long_video_name_parts[2]\n",
" brand = long_video_name_parts[3]\n",
" videos_folder = long_video_name_parts[4]\n",
" # Last element is the video name\n",
" video_name = f\"{long_video_name_parts[-1]}_{new_name_part}.{video_format}\"\n",
" n_secs_video_uri = (\n",
" f\"{gcs}//{bucket_name}/{brand}/{videos_folder}/{video_name}\"\n",
" )\n",
" return n_secs_video_uri\n",
" return \"\"\n",
"\n",
"\n",
"def store_assessment_results_locally(brand_name: str, assessment: any) -> None:\n",
" \"\"\"Store test results in a file\"\"\"\n",
" file_name = f\"results/{brand_name}_{assessment.get('video_uri')}.json\"\n",
" assessment = {\n",
" \"brand_name\": brand_name,\n",
" \"assessment\": assessment\n",
" }\n",
" os.makedirs(os.path.dirname(file_name), exist_ok=True)\n",
" with open(file_name, \"w\", encoding=\"utf-8\") as f:\n",
" json.dump(assessment, f, ensure_ascii=False, indent=4)\n",
"\n",
"\n",
"def trim_videos(brand_name: str):\n",
" \"\"\"Trims videos to create new versions of 5 secs\n",
" Args:\n",
" brand_name: the brand to trim the videos for\n",
" \"\"\"\n",
" local_videos_path = \"abcd_videos\"\n",
" # Check if the directory exists\n",
" if not os.path.exists(local_videos_path):\n",
" os.makedirs(local_videos_path)\n",
" # Get videos from GCS\n",
" brand_videos_folder = f\"{brand_name}/videos\"\n",
" bucket = get_bucket()\n",
" blobs = bucket.list_blobs(prefix=brand_videos_folder)\n",
" # Video processing\n",
" for video in blobs:\n",
" if video.name == f\"{brand_videos_folder}/\" or \"1st_5_secs\" in video.name:\n",
" # Skip parent folder and trimmed versions of videos\n",
" continue\n",
" video_name, video_name_with_format = get_file_name_from_gcs_url(video.name)\n",
" video_name_1st_5_secs = (\n",
" f\"{video_name}_1st_5_secs.{get_video_format(video_name_with_format)}\"\n",
" )\n",
" video_name_1st_5_secs_parent_folder = (\n",
" f\"{brand_videos_folder}/{video_name_1st_5_secs}\"\n",
" )\n",
" video_1st_5_secs_metadata = bucket.get_blob(video_name_1st_5_secs_parent_folder)\n",
" # Only process the video if it was not previously trimmed\n",
" if not video_1st_5_secs_metadata:\n",
" # Download the video from GCS\n",
" download_and_save_video(\n",
" output_path=local_videos_path,\n",
" video_name_with_format=video_name_with_format,\n",
" video_uri=video.name,\n",
" )\n",
" # Trim the video\n",
" trim_and_push_video_to_gcs(\n",
" local_videos_path=local_videos_path,\n",
" gcs_output_path=brand_videos_folder,\n",
" video_name_with_format=video_name_with_format,\n",
" new_video_name=video_name_1st_5_secs,\n",
" trim_start=0,\n",
" trim_end=5,\n",
" )\n",
" else:\n",
" print(f\"Video {video.name} has already been trimmed. Skipping...\\n\")\n",
"\n",
"\n",
"def download_and_save_video(\n",
" output_path: str, video_name_with_format: str, video_uri: str\n",
") -> None:\n",
" \"\"\"Downloads a video from Google Cloud Storage\n",
" and saves it locally\n",
" Args:\n",
" output_path: the path to store the video\n",
" video_name_with_format: the video name with format\n",
" video_uri: the video location\n",
" \"\"\"\n",
" bucket = get_bucket()\n",
" video_blob = bucket.blob(video_uri)\n",
" video = video_blob.download_as_string(client=None)\n",
" with open(f\"{output_path}/{video_name_with_format}\", \"wb\") as f:\n",
" f.write(video) # writing content to file\n",
" if VERBOSE:\n",
" print(f\"Video {video_uri} downloaded and saved!\\n\")\n",
"\n",
"\n",
"def trim_and_push_video_to_gcs(\n",
" local_videos_path: str,\n",
" gcs_output_path: str,\n",
" video_name_with_format: str,\n",
" new_video_name: str,\n",
" trim_start: int,\n",
" trim_end: int,\n",
") -> None:\n",
" \"\"\"Trims a video to generate a 5 secs version\n",
" Args:\n",
" local_videos_path: where the videos are stored locally\n",
" gcs_output_path: the path to store the video in Google Cloud storage\n",
" video_name_with_format: the original video name with format\n",
" new_video_name: the new name for the trimmed video\n",
" trim_start: the start time to trim the video\n",
" trim_end: the end time to trim the video\n",
" \"\"\"\n",
" bucket = get_bucket()\n",
" # Load video dsa gfg intro video\n",
" local_video_path = f\"{local_videos_path}/{video_name_with_format}\"\n",
" clip = VideoFileClip(local_video_path)\n",
" # Get only first N seconds\n",
" clip = clip.subclip(trim_start, trim_end)\n",
" # Save the clip\n",
" new_video_name_path = f\"{local_videos_path}/{new_video_name}\"\n",
" clip.write_videofile(new_video_name_path)\n",
" # Upload back to Google Cloud Storage\n",
" blob = bucket.blob(f\"{gcs_output_path}/{new_video_name}\")\n",
" # Optional: set a generation-match precondition to avoid potential race conditions\n",
" # and data corruptions.\n",
" generation_match_precondition = 0\n",
" blob.upload_from_filename(\n",
" new_video_name_path, if_generation_match=generation_match_precondition\n",
" )\n",
" if VERBOSE:\n",
" print(f\"File {new_video_name} uploaded to {gcs_output_path}.\\n\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "s1NWFRvqMgk0"
},
"source": [
"#### player(video_url)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "9uU0Ns_EcUWa"
},
"outputs": [],
"source": [
"# Define video player\n",
"def player(video_url):\n",
" # Loads a video file and plays it\n",
" print(f\"Displaying Video URL: {video_url}\")\n",
" HTML(f\"\"\"\n",
" <video width=600 height=337 controls>\n",
" <source src=\"{video_url}\" type=\"video/mp4\">\n",
" </video>\n",
" \"\"\")\n",
" return"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2Xbfw8kQHFJu"
},
"source": [
"## <font color='#4285f4'>Define ABDCs Criteria</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "eZ-9XFzxfpSG"
},
"outputs": [],
"source": [
"# @title 1, 2) Attract: Quick Pacing & Quick Pacing (First 5 seconds)\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown **Quick Pacing:** Within ANY 5 consecutive seconds there are 5 or more shots in the video. These include hard cuts, soft transitions and camera changes such as camera pans, swipes, zooms, depth of field changes, tracking shots and movement of the camera.\n",
"\n",
"# @markdown **Quick Pacing (First 5 seconds):** There are at least 5 shot changes or visual cuts detected within the first 5 seconds (up to 4.99s) of the video. These include hard cuts, soft transitions and camera changes such as camera pans, swipes, zooms, depth of field changes, tracking shots and movement of the camera.\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_quick_pacing(\n",
" shot_annotation_results: any, video_uri: str\n",
") -> tuple[bool, bool]:\n",
" \"\"\"Detect Quick Pacing & Quick Pacing (First 5 seconds)\n",
" Args:\n",
" shot_annotation_results: shot annotations\n",
" video_uri: video location in gcs\n",
" Returns:\n",
" quick_pacing, quick_pacing_1st_5_secs: quick pacing evaluation tuple\n",
" \"\"\"\n",
" required_secs_for_quick_pacing = 5\n",
" required_shots_for_quick_pacing = 5\n",
" # Feature Quick Pacing\n",
" quick_pacing_feature = \"Quick Pacing\"\n",
" quick_pacing = False\n",
" quick_pacing_criteria = \"\"\"Within ANY 5 consecutive seconds there are 5 or more shots in the video.\n",
" These include hard cuts, soft transitions and camera changes such as camera pans, swipes, zooms,\n",
" depth of field changes, tracking shots and movement of the camera.\"\"\"\n",
" total_shots_count = 0\n",
" total_time_all_shots = 0\n",
" quick_pacing_eval_details = {\n",
" \"feature\": quick_pacing_feature,\n",
" \"feature_description\": quick_pacing_criteria,\n",
" \"feature_detected\": quick_pacing,\n",
" \"llm_details\": [],\n",
" }\n",
" # Feature Quick Pacing (First 5 secs)\n",
" quick_pacing_1st_5_secs_feature = \"Quick Pacing (First 5 seconds)\"\n",
" quick_pacing_1st_5_secs = False\n",
" quick_pacing_1st_5_secs_criteria = \"\"\"There are at least 5 shot changes or visual cuts detected in the video.\n",
" These include hard cuts, soft transitions and camera changes such as camera pans, swipes, zooms, depth of\n",
" field changes, tracking shots and movement of the camera.\"\"\"\n",
" total_shots_count_1st_5_secs = 0\n",
" quick_pacing_1st_5_secs_eval_details = {\n",
" \"feature\": quick_pacing_1st_5_secs_feature,\n",
" \"feature_description\": quick_pacing_1st_5_secs_criteria,\n",
" \"feature_detected\": quick_pacing_1st_5_secs,\n",
" \"llm_details\": [],\n",
" }\n",
"\n",
" # Video API: Evaluate quick_pacing_feature and quick_pacing_1st_5_secs_feature\n",
" if use_annotations:\n",
" if \"shot_annotations\" in shot_annotation_results:\n",
" sorted_shots = sorted(\n",
" shot_annotation_results.get(\"shot_annotations\"),\n",
" key=lambda x: calculate_time_seconds(x, \"start_time_offset\"),\n",
" reverse=False,\n",
" )\n",
" # Video API: Evaluate quick_pacing_feature & quick_pacing_1st_5_secs_feature\n",
" for shot in sorted_shots:\n",
" start_time_secs = calculate_time_seconds(shot, \"start_time_offset\")\n",
" end_time_secs = calculate_time_seconds(shot, \"end_time_offset\")\n",
" shot_total_time = end_time_secs - start_time_secs\n",
" # Quick Pacing calculation\n",
" total_time_all_shots += shot_total_time\n",
" if total_time_all_shots < required_secs_for_quick_pacing:\n",
" total_shots_count += 1\n",
" # Quick Pacing (First 5 secs) calculation\n",
" if start_time_secs < early_time_seconds:\n",
" total_shots_count_1st_5_secs += 1\n",
" else:\n",
" # To start counting shot time and # shots again\n",
" if total_shots_count >= required_shots_for_quick_pacing:\n",
" quick_pacing = True\n",
" # Quick Pacing (First 5 secs) calculation\n",
" if total_shots_count_1st_5_secs >= required_shots_for_quick_pacing:\n",
" quick_pacing_1st_5_secs = True\n",
" total_time_all_shots = 0\n",
" total_shots_count = 0\n",
" else:\n",
" print(\n",
" f\"No Shot annotations found. Skipping {quick_pacing_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate quick_pacing_feature and quick_pacing_1st_5_secs_feature\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
" # 1. Evaluate quick_pacing_feature\n",
" prompt = (\n",
" \"\"\"Are there 5 or more shots within ANY 5 consecutive seconds in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the shot changes count in the following format:\n",
" Number of shots: #\n",
" Provide the exact timestamp when the shot changes happen and the shot description.\n",
" Return False if the number of shots identified is less than 5.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", quick_pacing_feature\n",
" )\n",
" .replace(\"{criteria}\", quick_pacing_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" quick_pacing_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" quick_pacing = True\n",
"\n",
" # Include llm details\n",
" quick_pacing_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # 2. Evaluate quick_pacing_1st_5_secs_feature\n",
" # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs\n",
" prompt = (\n",
" \"\"\"Are there at least 5 shot changes or visual cuts detected in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the shot changes count in the following format:\n",
" Number of shots: #\n",
" Provide the exact timestamp when the shot changes happen and the shot description.\n",
" Return False if the number of shots identified is less than 5.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", quick_pacing_1st_5_secs_feature\n",
" )\n",
" .replace(\"{criteria}\", quick_pacing_1st_5_secs_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use first 5 secs video for this feature\n",
" video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, \"1st_5_secs\")\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri_1st_5_secs})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" quick_pacing_1st_5_secs_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" quick_pacing_1st_5_secs = True\n",
"\n",
" # Include llm details\n",
" quick_pacing_1st_5_secs_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" print(f\"{quick_pacing_feature}: {quick_pacing}\")\n",
" quick_pacing_eval_details[\"feature_detected\"] = quick_pacing\n",
" print(f\"{quick_pacing_1st_5_secs_feature}: {quick_pacing_1st_5_secs}\")\n",
" quick_pacing_1st_5_secs_eval_details[\"feature_detected\"] = quick_pacing_1st_5_secs\n",
"\n",
" return quick_pacing_eval_details, quick_pacing_1st_5_secs_eval_details"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "zVwTfzTnfqCI"
},
"outputs": [],
"source": [
"# @title 3) Attract: Dynamic Start\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown **Dynamic Start:** The first shot in the video changes in less than 3 seconds.\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_dynamic_start(shot_annotation_results: any, video_uri: str) -> dict:\n",
" \"\"\"Detects Dynamic Start\n",
" Args:\n",
" shot_annotation_results: shot annotations\n",
" video_uri: video location in gcs\n",
" Returns:\n",
" dynamic_start_eval_details: dynamic start evaluation\n",
" \"\"\"\n",
" # Feature Dynamic Start\n",
" dynamic_start_feature = \"Dynamic Start\"\n",
" dynamic_start = False\n",
" dynamic_start_criteria = (\n",
" \"\"\"The first shot in the video changes in less than 3 seconds.\"\"\"\n",
" )\n",
" dynamic_start_eval_details = {\n",
" \"feature\": dynamic_start_feature,\n",
" \"feature_description\": dynamic_start_criteria,\n",
" \"feature_detected\": dynamic_start,\n",
" \"llm_details\": [],\n",
" }\n",
"\n",
" # Video API: Evaluate dynamic_start_feature\n",
" if use_annotations:\n",
" if \"shot_annotations\" in shot_annotation_results:\n",
" first_shot_end_time_off_set = shot_annotation_results.get(\n",
" \"shot_annotations\"\n",
" )[0]\n",
" nanos = first_shot_end_time_off_set.get(\"end_time_offset\").get(\"nanos\")\n",
" seconds = first_shot_end_time_off_set.get(\"end_time_offset\").get(\"seconds\")\n",
" if nanos:\n",
" if seconds:\n",
" total_ms_first_shot = (nanos + seconds * 1e9) / 1e6\n",
" else:\n",
" total_ms_first_shot = nanos / 1e6\n",
" else:\n",
" if seconds:\n",
" total_ms_first_shot = (seconds * 1e9) / 1e6\n",
"\n",
" if total_ms_first_shot < dynamic_cutoff_ms:\n",
" dynamic_start = True\n",
" else:\n",
" print(\n",
" f\"No Shot annotations found. Skipping {dynamic_start_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate dynamic_start_feature\n",
" if use_llms:\n",
" # 1. Evaluate dynamic_start_feature\n",
" prompt = (\n",
" \"\"\"Does the first shot in the video change in less than 3 seconds?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the exact timestamp when the first shot in the video changes.\n",
" Return True if and only if the first shot in the video changes in less than 3 seconds.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", dynamic_start_feature\n",
" )\n",
" .replace(\"{criteria}\", dynamic_start_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
" # Use first 5 secs video for this feature\n",
" video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, \"1st_5_secs\")\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri_1st_5_secs})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" dynamic_start_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" dynamic_start = True\n",
"\n",
" # Include llm details\n",
" dynamic_start_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" print(f\"{dynamic_start_feature}: {dynamic_start}\")\n",
" dynamic_start_eval_details[\"feature_detected\"] = dynamic_start\n",
"\n",
" return dynamic_start_eval_details"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "SlDCXfhEfzT9"
},
"outputs": [],
"source": [
"# @title 4 & 5) Attract: Supers & Supers with Audio\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown 1. **Supers:** Any supers (text overlays) have been incorporated at any time in the video.\n",
"\n",
"# @markdown 2. **Supers with Audio**: The speech heard in the audio of the video matches OR is contextually supportive of the overlaid text shown on screen.\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_supers(text_annotation_results: any, video_uri: str) -> dict:\n",
" \"\"\"Detect Supers\n",
" Args:\n",
" text_annotation_results: text annotations\n",
" video_uri: video location in gcs\n",
" Returns:\n",
" supers_eval_details: supers evaluation\n",
" \"\"\"\n",
" # Feature Supers\n",
" supers = False\n",
" supers_feature = \"Supers\"\n",
" supers_criteria = \"\"\"Any supers (text overlays) have been incorporated at any time in the video.\"\"\"\n",
" supers_eval_details = {\n",
" \"feature\": supers_feature,\n",
" \"feature_description\": supers_criteria,\n",
" \"feature_detected\": supers,\n",
" \"llm_details\": None,\n",
" }\n",
"\n",
" # Video API: Evaluate supers_feature\n",
" if use_annotations:\n",
" if \"text_annotations\" in text_annotation_results:\n",
" if len(text_annotation_results.get(\"text_annotations\")) > 0:\n",
" supers = True\n",
" else:\n",
" print(\n",
" f\"No Text annotations found. Skipping {supers_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate supers_feature\n",
" if use_llms:\n",
" # 1. Evaluate supers_feature\n",
" prompt = (\n",
" \"\"\"Are there any supers (text overlays) at any time in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the exact timestamp where supers are found as well as the list of supers.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", supers_feature\n",
" )\n",
" .replace(\"{criteria}\", supers_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" supers_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" supers = True\n",
"\n",
" # Include llm details\n",
" supers_eval_details[\"llm_details\"] = {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
"\n",
" print(f\"{supers_feature}: {supers}\")\n",
" supers_eval_details[\"feature_detected\"] = supers\n",
"\n",
" return supers_eval_details\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_supers_with_audio(\n",
" text_annotation_results: any,\n",
" speech_annotation_results: any,\n",
" video_uri: str,\n",
") -> dict:\n",
" \"\"\"Detect Supers with Audio\n",
" Args:\n",
" text_annotation_results: text annotations\n",
" speech_annotation_results: speech annotations\n",
" video_uri: video location in gcs\n",
" Returns:\n",
" supers_with_audio_eval_details: supers with audio evaluation\n",
" \"\"\"\n",
" # Feature Supers with Audio\n",
" supers_with_audio_feature = \"Supers with Audio\"\n",
" supers_with_audio = False\n",
" supers_with_audio_criteria = \"\"\"The speech heard in the audio of the video matches OR is contextually\n",
" supportive of the overlaid text shown on screen.\"\"\"\n",
" supers_with_audio_eval_details = {\n",
" \"feature\": supers_with_audio_feature,\n",
" \"feature_description\": supers_with_audio_criteria,\n",
" \"feature_detected\": supers_with_audio,\n",
" \"llm_details\": [],\n",
" }\n",
" detected_text_list = []\n",
"\n",
" # Video API: Evaluate supers_with_audio_feature\n",
" if use_annotations:\n",
" if (\n",
" \"text_annotations\" in text_annotation_results\n",
" and \"speech_transcriptions\" in speech_annotation_results\n",
" ):\n",
" # Build list of found supers\n",
" for text_annotation in text_annotation_results.get(\"text_annotations\"):\n",
" detected_text_list.append(text_annotation.get(\"text\"))\n",
"\n",
" # Video API: Evaluate supers_with_audio\n",
" (\n",
" supers_with_audio,\n",
" na,\n",
" ) = find_elements_in_transcript(\n",
" speech_transcriptions=speech_annotation_results.get(\n",
" \"speech_transcriptions\"\n",
" ),\n",
" elements=detected_text_list,\n",
" elements_categories=[],\n",
" apply_condition=True, # flag to filter out text with less than x chars. This is\n",
" # only needed when elements come from text annotations since words are sometimes\n",
" # 1 character only.\n",
" )\n",
" else:\n",
" print(\n",
" f\"No Text or Speech annotations found. Skipping {supers_with_audio_feature} evaluation.\"\n",
" )\n",
"\n",
" # LLM: Evaluate supers_with_audio_feature\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
"\n",
" # LLM Only\n",
" # 1. Evaluate supers_with_audio_feature\n",
" prompt = (\n",
" \"\"\"Does the speech match any supers (text overlays) in the video or is the speech\n",
" contextually supportive of the overlaid text shown on the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the exact timestamp where supers are found and the timestamp when the speech matches\n",
" the supers or is contextually supportive of the overlaid text shown on the video.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", supers_with_audio_feature\n",
" )\n",
" .replace(\"{criteria}\", supers_with_audio_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" supers_with_audio_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" supers_with_audio = True\n",
"\n",
" # Include llm details\n",
" supers_with_audio_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # Combination of Annotations + LLM\n",
" if use_annotations:\n",
" if \"speech_transcriptions\" in speech_annotation_results:\n",
" # 1. Evaluate supers_with_audio_feature\n",
" transcript = get_speech_transcript(\n",
" speech_annotation_results.get(\"speech_transcriptions\")\n",
" )\n",
" prompt = (\n",
" \"\"\"Does the provided speech transcript matches any supers (text overlays) in the video or is the speech transcript\n",
" contextually supportive of the overlaid text shown on the video?\n",
" This is the speech transcript: \"{transcript}\"\n",
" Consider the following criteria for your answer: {criteria}\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", supers_with_audio_feature\n",
" )\n",
" .replace(\"{transcript}\", transcript)\n",
" .replace(\"{criteria}\", supers_with_audio_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" # If transcript is empty, this feature should be False\n",
" if transcript:\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" supers_with_audio_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" supers_with_audio = True\n",
"\n",
" # Include llm details\n",
" supers_with_audio_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
" else:\n",
" supers_with_audio = False\n",
" # Include default details\n",
" supers_with_audio_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": \"Annotations + LLM: Speech was not found in annotations.\",\n",
" }\n",
" )\n",
" else:\n",
" print(\n",
" f\"No Speech annotations found. Skipping {supers_with_audio_feature} evaluation with Annotations + LLM.\"\n",
" )\n",
"\n",
" print(f\"{supers_with_audio_feature}: {supers_with_audio}\")\n",
" supers_with_audio_eval_details[\"feature_detected\"] = supers_with_audio\n",
"\n",
" return supers_with_audio_eval_details"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "1FpSt8Y0f2mF"
},
"outputs": [],
"source": [
"# @title 6 & 7) Brand: Brand Visuals & Brand Visuals (First 5 seconds)\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown 1. **Brand Visuals:** Branding, defined as the brand name or brand logo are shown in-situation or overlaid at any time in the video.\n",
"\n",
"# @markdown 2. **Brand Visuals (First 5 seconds):** Branding, defined as the brand name or brand logo are shown in-situation or overlaid in the first 5 seconds (up to 4.99s) of the video.\n",
"# @markdown Including Logo Big & Logo Early. Is Logo larger than x% (3.5% default) of screen in the first 5 seconds?\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def calculate_surface_area(points) -> float:\n",
" \"\"\"Calculate surface area of an object\"\"\"\n",
" if len(points) != 4:\n",
" return 0\n",
" area1 = 0.5 * abs(points[0][0] * points[1][1] - points[1][0] * points[0][1])\n",
" area2 = 0.5 * abs(points[1][0] * points[2][1] - points[2][0] * points[1][1])\n",
" area3 = 0.5 * abs(points[2][0] * points[3][1] - points[3][0] * points[2][1])\n",
" area4 = 0.5 * abs(points[3][0] * points[0][1] - points[0][0] * points[3][1])\n",
"\n",
" # Add the areas of the four triangles to get the total surface area.\n",
" surface_area = area1 + area2 + area3 + area4\n",
" return surface_area * 100\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_brand_visuals(\n",
" text_annotation_results: any,\n",
" logo_annotation_results: any,\n",
" video_uri: str,\n",
" brand_name: str,\n",
" brand_variations: list[str],\n",
") -> tuple[dict, dict, bool]:\n",
" \"\"\"Detect Brand Visuals & Brand Visuals (First 5 seconds)\n",
" Args:\n",
" text_annotation_results: text annotations\n",
" logo_annotation_results: logo annotations\n",
" video_uri: video location in gcs\n",
" brand_name: name of the brand\n",
" brand_variations: a list of brand name variations\n",
" Returns:\n",
" brand_visuals_eval_details,\n",
" brand_visuals_1st_5_secs_eval_details,\n",
" brand_visuals_logo_big_1st_5_secs: brand visuals evaluation\n",
" \"\"\"\n",
" # Feature Brand Visuals\n",
" brand_visuals_feature = \"Brand Visuals\"\n",
" brand_visuals = False\n",
" brand_visuals_criteria = \"\"\"Branding, defined as the brand name or brand logo are shown\n",
" in-situation or overlaid at any time in the video.\"\"\"\n",
" brand_visuals_eval_details = {\n",
" \"feature\": brand_visuals_feature,\n",
" \"feature_description\": brand_visuals_criteria,\n",
" \"feature_detected\": brand_visuals,\n",
" \"llm_details\": [],\n",
" }\n",
" # Feature Brand Visuals (First 5 seconds)\n",
" brand_visuals_1st_5_secs_feature = \"Brand Visuals (First 5 seconds)\"\n",
" brand_visuals_1st_5_secs = False\n",
" # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs\n",
" brand_visuals_1st_5_secs_criteria = \"\"\"Branding, defined as the brand name or brand logo are shown in-situation\n",
" or overlaid in the video\"\"\"\n",
" brand_visuals_1st_5_secs_eval_details = {\n",
" \"feature\": brand_visuals_1st_5_secs_feature,\n",
" \"feature_description\": brand_visuals_1st_5_secs_criteria,\n",
" \"feature_detected\": brand_visuals_1st_5_secs,\n",
" \"llm_details\": [],\n",
" }\n",
" # Feature Logo Big (First 5 seconds)\n",
" brand_visuals_logo_big_1st_5_secs = False\n",
"\n",
" # Video API: Evaluate brand_visuals_feature and brand_visuals_1st_5_secs_feature 1st_5_secs\n",
" if use_annotations:\n",
" # Evaluate brand_visuals_brand_feature & brand_visuals_brand_1st_5_secs\n",
" # in text annotations\n",
" if \"text_annotations\" in text_annotation_results:\n",
" for text_annotation in text_annotation_results.get(\"text_annotations\"):\n",
" text = text_annotation.get(\"text\")\n",
" found_brand = [\n",
" brand for brand in brand_variations if brand.lower() in text.lower()\n",
" ]\n",
" if found_brand:\n",
" brand_visuals = True\n",
" found_brand_1st_5_secs, frame = detected_text_in_first_5_seconds(\n",
" text_annotation\n",
" )\n",
" if found_brand_1st_5_secs:\n",
" brand_visuals_1st_5_secs = True\n",
" # Check surface area\n",
" if brand_visuals_1st_5_secs and frame:\n",
" coordinates = []\n",
" for vertex in frame.get(\"rotated_bounding_box\").get(\"vertices\"):\n",
" coordinates.append(\n",
" ((float(vertex.get(\"x\"))), float(vertex.get(\"y\")))\n",
" )\n",
" surface_area = calculate_surface_area(coordinates)\n",
" if surface_area > logo_size_threshold:\n",
" brand_visuals_logo_big_1st_5_secs = True\n",
" else:\n",
" print(\n",
" f\"No Text annotations found. Skipping {brand_visuals_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # Evaluate brand_visuals_feature & brand_visuals_1st_5_secs in logo annotations\n",
" brand_kg_entities = get_knowledge_graph_entities(brand_variations)\n",
" brand_kg_entities_list = []\n",
" for key, value in brand_kg_entities.items():\n",
" entity_id = value[\"@id\"][3:] if \"@id\" in value else \"\"\n",
" entity_name = value[\"name\"] if \"name\" in value else \"\"\n",
" entity_description = value[\"description\"] if \"description\" in value else \"\"\n",
" brand_kg_entities_list.append(\n",
" {\n",
" \"entity_id\": entity_id,\n",
" \"entity_name\": entity_name,\n",
" \"entity_description\": entity_description,\n",
" }\n",
" )\n",
"\n",
" if \"logo_recognition_annotations\" in logo_annotation_results:\n",
" for logo_recognition_annotation in logo_annotation_results.get(\n",
" \"logo_recognition_annotations\"\n",
" ):\n",
" entity_id = logo_recognition_annotation.get(\"entity\").get(\"entity_id\")\n",
" entity_description = logo_recognition_annotation.get(\"entity\").get(\n",
" \"description\"\n",
" )\n",
" found_entities = [\n",
" ent\n",
" for ent in brand_kg_entities_list\n",
" if ent[\"entity_id\"] == entity_id\n",
" or ent[\"entity_description\"].lower() == entity_description.lower()\n",
" ]\n",
" if len(found_entities) > 0:\n",
" # All logo tracks where the recognized logo appears. Each track corresponds\n",
" # to one logo instance appearing in consecutive frames.\n",
" for track in logo_recognition_annotation.get(\"tracks\"):\n",
" # Check confidence against user defined threshold\n",
" if track.get(\"confidence\") >= confidence_threshold:\n",
" brand_visuals = True\n",
" # Video segment of a track.\n",
" start_time_secs = calculate_time_seconds(\n",
" track.get(\"segment\"), \"start_time_offset\"\n",
" )\n",
" if start_time_secs <= early_time_seconds:\n",
" brand_visuals_1st_5_secs = True\n",
" # The object with timestamp and attributes per frame in the track.\n",
" for timestamped_object in track.get(\n",
" \"timestamped_objects\"\n",
" ):\n",
" # Normalized Bounding box in a frame, where the object is located.\n",
" normalized_bounding_box = timestamped_object.get(\n",
" \"normalized_bounding_box\"\n",
" )\n",
" bottom_top = (\n",
" normalized_bounding_box.get(\"bottom\") or 0\n",
" ) - (normalized_bounding_box.get(\"top\") or 0)\n",
" right_left = (\n",
" normalized_bounding_box.get(\"right\") or 0\n",
" ) - (normalized_bounding_box.get(\"left\") or 0)\n",
" surface = bottom_top * right_left * 100\n",
" if surface > logo_size_threshold:\n",
" brand_visuals_logo_big_1st_5_secs = True\n",
"\n",
" # All video segments where the recognized logo appears. There might be\n",
" # multiple instances of the same logo class appearing in one VideoSegment.\n",
" # Since there is no confidence here, just check 1st 5 mins feature - CHECK\n",
" for segment in logo_recognition_annotation.get(\"segments\"):\n",
" start_time_secs = calculate_time_seconds(\n",
" segment, \"start_time_offset\"\n",
" )\n",
" if start_time_secs <= early_time_seconds:\n",
" brand_visuals_1st_5_secs = True\n",
" else:\n",
" print(\n",
" f\"No Logo annotations found. Skipping {brand_visuals_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate brand_visuals_feature and brand_visuals_1st_5_secs_feature 1st_5_secs\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
" # 1. Evaluate brand_visuals_feature\n",
" prompt = (\n",
" \"\"\"Is the brand {brand_name} or brand logo {brand_name} visible at any time in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the exact timestamp when the brand {brand_name} or brand logo {brand_name} is found.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{brand_name}\", brand_name\n",
" )\n",
" .replace(\"{feature}\", brand_visuals_feature)\n",
" .replace(\"{criteria}\", brand_visuals_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" brand_visuals_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" brand_visuals = True\n",
"\n",
" # Include llm details\n",
" brand_visuals_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # 2. Evaluate brand_visuals_1st_5_secs_feature\n",
" prompt = (\n",
" \"\"\"Is the brand {brand_name} or brand logo {brand_name} visible in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the exact timestamp when the brand {brand_name} or brand logo {brand_name} is found.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{brand_name}\", brand_name\n",
" )\n",
" .replace(\"{feature}\", brand_visuals_1st_5_secs_feature)\n",
" .replace(\"{criteria}\", brand_visuals_1st_5_secs_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use first 5 secs video for this feature\n",
" video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, \"1st_5_secs\")\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri_1st_5_secs})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" brand_visuals_1st_5_secs_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" brand_visuals_1st_5_secs = True\n",
"\n",
" # Include llm details\n",
" brand_visuals_1st_5_secs_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" print(f\"{brand_visuals_feature}: {brand_visuals}\")\n",
" brand_visuals_eval_details[\"feature_detected\"] = brand_visuals\n",
" print(\n",
" f\"\"\"{brand_visuals_1st_5_secs_feature}: {brand_visuals_1st_5_secs}\n",
" Logo Big: {brand_visuals_logo_big_1st_5_secs}\"\"\"\n",
" )\n",
" brand_visuals_1st_5_secs_eval_details[\"feature_detected\"] = brand_visuals_1st_5_secs\n",
"\n",
" return (\n",
" brand_visuals_eval_details,\n",
" brand_visuals_1st_5_secs_eval_details,\n",
" brand_visuals_logo_big_1st_5_secs,\n",
" )\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "bKQFasy-f5Yw"
},
"outputs": [],
"source": [
"# @title 8 & 9) Brand: Brand Mention (Speech) & Brand Mention (Speech) (First 5 seconds)\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown **Brand Mention (Speech):** The brand name is heard in the audio or speech at any time in the video.\n",
"\n",
"# @markdown **Brand Mention (Speech) (First 5 seconds):** The brand name is heard in the audio or speech in the first 5 seconds (up to 4.99s) of the video.\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_brand_mention_speech(\n",
" speech_annotation_results: any,\n",
" video_uri: str,\n",
" brand_name: str,\n",
" brand_variations: list[str],\n",
") -> tuple[dict, dict]:\n",
" \"\"\"Detect Brand Mention (Speech) & Brand Mention (Speech) (First 5 seconds)\n",
" Args:\n",
" speech_annotation_results: speech annotations\n",
" video_uri: video location in gcs\n",
" brand_name: name of the brand\n",
" brand_variations: a list of brand name variations\n",
" Retirns:\n",
" brand_mention_speech_eval_details,\n",
" brand_mention_speech_1st_5_secs_eval_details: brand mention speech evaluation\n",
" \"\"\"\n",
" # Feature Brand Mention (Speech)\n",
" brand_mention_speech_feature = \"Brand Mention (Speech)\"\n",
" brand_mention_speech = False\n",
" brand_mention_speech_criteria = (\n",
" \"\"\"The brand name is heard in the audio or speech at any time in the video.\"\"\"\n",
" )\n",
" brand_mention_speech_eval_details = {\n",
" \"feature\": brand_mention_speech_feature,\n",
" \"feature_description\": brand_mention_speech_criteria,\n",
" \"feature_detected\": brand_mention_speech,\n",
" \"llm_details\": [],\n",
" }\n",
" # Feature Brand Mention (Speech) (First 5 seconds)\n",
" brand_mention_speech_1st_5_secs_feature = \"Brand Mention (Speech) (First 5 seconds)\"\n",
" brand_mention_speech_1st_5_secs = False\n",
" # remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs\n",
" brand_mention_speech_1st_5_secs_criteria = (\n",
" \"\"\"The brand name is heard in the audio or speech in the video.\"\"\"\n",
" )\n",
" brand_mention_speech_1st_5_secs_eval_details = {\n",
" \"feature\": brand_mention_speech_1st_5_secs_feature,\n",
" \"feature_description\": brand_mention_speech_1st_5_secs_criteria,\n",
" \"feature_detected\": brand_mention_speech_1st_5_secs,\n",
" \"llm_details\": [],\n",
" }\n",
"\n",
" # Video API: Evaluate brand_mention_speech and brand_mention_speech_1st_5_secs\n",
" if use_annotations:\n",
" if \"speech_transcriptions\" in speech_annotation_results:\n",
" # Video API: Evaluate brand_mention & brand_mention_speech_1st_5_secs\n",
" (\n",
" brand_mention_speech,\n",
" brand_mention_speech_1st_5_secs,\n",
" ) = find_elements_in_transcript(\n",
" speech_transcriptions=speech_annotation_results.get(\n",
" \"speech_transcriptions\"\n",
" ),\n",
" elements=brand_variations,\n",
" elements_categories=[],\n",
" apply_condition=False,\n",
" )\n",
" else:\n",
" print(\n",
" f\"No Speech annotations found. Skipping {brand_mention_speech_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate brand_mention_speech and brand_mention_speech_1st_5_secs\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
"\n",
" # LLM Only\n",
" # 1. Evaluate brand_mention_speech_feature\n",
" prompt = (\n",
" \"\"\"Does the speech mention the brand {brand_name} at any time on the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Provide the exact timestamp when the brand {brand_name} is heard in the speech of the video.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{brand_name}\", brand_name\n",
" )\n",
" .replace(\"{feature}\", brand_mention_speech_feature)\n",
" .replace(\"{criteria}\", brand_mention_speech_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" brand_mention_speech_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" brand_mention_speech = True\n",
"\n",
" # Include llm details\n",
" brand_mention_speech_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # 2. Evaluate brand_mention_speech_feature_1st_5_secs\n",
" prompt = (\n",
" \"\"\"Does the speech mention the brand {brand_name} in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Provide the exact timestamp when the brand {brand_name} is heard in the speech of the video.\n",
" Return True if and only if the brand {brand_name} is heard in the speech of the video.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{brand_name}\", brand_name\n",
" )\n",
" .replace(\"{feature}\", brand_mention_speech_1st_5_secs_feature)\n",
" .replace(\"{criteria}\", brand_mention_speech_1st_5_secs_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use first 5 secs video for this feature\n",
" video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, \"1st_5_secs\")\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri_1st_5_secs})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" brand_mention_speech_1st_5_secs_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" brand_mention_speech_1st_5_secs = True\n",
"\n",
" # Include llm details\n",
" brand_mention_speech_1st_5_secs_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # Combination of Annotations + LLM\n",
" if use_annotations:\n",
" if \"speech_transcriptions\" in speech_annotation_results:\n",
" # 1. Evaluate brand_mention_speech_feature\n",
" transcript = get_speech_transcript(\n",
" speech_annotation_results.get(\"speech_transcriptions\")\n",
" )\n",
" prompt = (\n",
" \"\"\"Does the provided speech transcript mention the brand {brand_name}?\n",
" This is the speech transcript: \"{transcript}\"\n",
" Consider the following criteria for your answer: {criteria}\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{brand_name}\", brand_name\n",
" )\n",
" .replace(\"{transcript}\", transcript)\n",
" .replace(\"{feature}\", brand_mention_speech_feature)\n",
" .replace(\"{criteria}\", brand_mention_speech_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Set modality to text since we are not using video for Annotations + LLM\n",
" llm_params.set_modality({\"type\": \"text\"})\n",
" # If transcript is empty, this feature should be False\n",
" if transcript:\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" brand_mention_speech_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" brand_mention_speech = True\n",
"\n",
" # Include llm details\n",
" brand_mention_speech_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
" else:\n",
" brand_mention_speech = False\n",
" # Include default details\n",
" brand_mention_speech_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": \"Annotations + LLM: Speech was not found in annotations.\",\n",
" }\n",
" )\n",
"\n",
" # 2. Evaluate brand_mention_speech_feature_1st_5_secs\n",
" transcript_1st_5_secs = get_speech_transcript_1st_5_secs(\n",
" speech_annotation_results.get(\"speech_transcriptions\")\n",
" )\n",
" prompt = (\n",
" \"\"\"Does the provided speech transcript mention the brand {brand_name}?\n",
" This is the speech transcript: \"{transcript}\"\n",
" Consider the following criteria for your answer: {criteria}\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{brand_name}\", brand_name\n",
" )\n",
" .replace(\"{transcript}\", transcript_1st_5_secs)\n",
" .replace(\"{feature}\", brand_mention_speech_1st_5_secs_feature)\n",
" .replace(\"{criteria}\", brand_mention_speech_1st_5_secs_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Set modality to text since we are not using video for Annotations + LLM\n",
" llm_params.set_modality({\"type\": \"text\"})\n",
" # If transcript is empty, this feature should be False\n",
" if transcript_1st_5_secs:\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" brand_mention_speech_1st_5_secs_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" brand_mention_speech_1st_5_secs = True\n",
"\n",
" # Include llm details\n",
" brand_mention_speech_1st_5_secs_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
" else:\n",
" brand_mention_speech_1st_5_secs = False\n",
" # Include default details\n",
" brand_mention_speech_1st_5_secs_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": \"Annotations + LLM: Speech was not found in annotations.\",\n",
" }\n",
" )\n",
" else:\n",
" print(\n",
" f\"No Speech annotations found. Skipping {brand_mention_speech_feature} evaluation with LLM.\"\n",
" )\n",
"\n",
" print(f\"{brand_mention_speech_feature}: {brand_mention_speech}\")\n",
" brand_mention_speech_eval_details[\"feature_detected\"] = brand_mention_speech\n",
" print(\n",
" f\"{brand_mention_speech_1st_5_secs_feature}: {brand_mention_speech_1st_5_secs}\"\n",
" )\n",
" brand_mention_speech_1st_5_secs_eval_details[\"feature_detected\"] = (\n",
" brand_mention_speech_1st_5_secs\n",
" )\n",
"\n",
" return (\n",
" brand_mention_speech_eval_details,\n",
" brand_mention_speech_1st_5_secs_eval_details,\n",
" )\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "OS7kYDoFf8qE"
},
"outputs": [],
"source": [
"# @title 10 & 11) Brand: Product Visuals & Product Visuals (First 5 seconds)\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown 1. **Product Visuals:** A product or branded packaging is visually present at any time in the video. Where the product is a service a relevant substitute should be shown such as via a branded app or branded service personnel.\n",
"\n",
"# @markdown 2. **Product Visuals (First 5 seconds):** A product or branded packaging is visually present in the first 5 seconds (up to 4.99s) of the video. Where the product is a service a relevant substitute should be shown such as via a branded app or branded service personnel.\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect(\n",
" entity: dict,\n",
" segment: dict,\n",
" branded_products_kg_entities: dict,\n",
" branded_products: list[str],\n",
" branded_products_categories: list[str],\n",
"):\n",
" \"\"\"Detect Product Visuals & Product Visuals (First 5 seconds)\n",
" Args:\n",
" entity: entity found in annotations\n",
" segment: segment of the video\n",
" branded_products_kg_entities\n",
" branded_products: list of products\n",
" branded_products_categories: list of products categories\n",
" Returns:\n",
" product_visuals,\n",
" product_visuals_1st_5_secs: evaluation\n",
" \"\"\"\n",
" product_visuals = False\n",
" product_visuals_1st_5_secs = False\n",
" entity_id = entity.get(\"entity_id\")\n",
" entity_description = entity.get(\"description\")\n",
" # Check if any of the provided products or categories\n",
" # match the label segment description\n",
" found_branded_products = [\n",
" bp for bp in branded_products if bp.lower() == entity_description.lower()\n",
" ]\n",
" found_branded_product_categories = [\n",
" bp\n",
" for bp in branded_products_categories\n",
" if bp.lower() == entity_description.lower()\n",
" ]\n",
" if (\n",
" entity_id in branded_products_kg_entities\n",
" or len(found_branded_products) > 0\n",
" or len(found_branded_product_categories) > 0\n",
" ):\n",
" # Check confidence against user defined threshold\n",
" if segment.get(\"confidence\") >= confidence_threshold:\n",
" product_visuals = True\n",
" start_time_secs = calculate_time_seconds(\n",
" segment.get(\"segment\"), \"start_time_offset\"\n",
" )\n",
" if start_time_secs <= early_time_seconds:\n",
" product_visuals_1st_5_secs = True\n",
"\n",
" return product_visuals, product_visuals_1st_5_secs\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_product_visuals(\n",
" label_annotation_results: any,\n",
" video_uri: str,\n",
" branded_products: list[str],\n",
" branded_products_categories: list[str],\n",
") -> tuple[dict, dict]:\n",
" \"\"\"Detect Product Visuals & Product Visuals (First 5 seconds)\n",
" Args:\n",
" label_annotation_results: label annotations\n",
" video_location: video location in gcs\n",
" branded_products: list of products\n",
" branded_products_categories: list of products categories\n",
" Returns:\n",
" product_visuals_eval_details,\n",
" product_visuals_1st_5_secs_eval_details: product visuals evaluation\n",
" \"\"\"\n",
" # Feature Product Visuals\n",
" product_visuals_feature = \"Product Visuals\"\n",
" product_visuals = False\n",
" product_visuals_criteria = \"\"\"A product or branded packaging is visually present at any time in the video.\n",
" Where the product is a service a relevant substitute should be shown such as via a branded app or branded\n",
" service personnel.\"\"\"\n",
" product_visuals_eval_details = {\n",
" \"feature\": product_visuals_feature,\n",
" \"feature_description\": product_visuals_criteria,\n",
" \"feature_detected\": product_visuals,\n",
" \"llm_details\": [],\n",
" }\n",
" # Feature Product Visuals (First 5 seconds)\n",
" product_visuals_1st_5_secs_feature = \"Product Visuals (First 5 seconds)\"\n",
" product_visuals_1st_5_secs = False\n",
" # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs\n",
" product_visuals_1st_5_secs_criteria = \"\"\"A product or branded packaging is visually present the video.\n",
" Where the product is a service a relevant substitute should be shown such as via a\n",
" branded app or branded service personnel.\"\"\"\n",
" product_visuals_1st_5_secs_eval_details = {\n",
" \"feature\": product_visuals_1st_5_secs_feature,\n",
" \"feature_description\": product_visuals_1st_5_secs_criteria,\n",
" \"feature_detected\": product_visuals_1st_5_secs,\n",
" \"llm_details\": [],\n",
" }\n",
"\n",
" branded_products_kg_entities = get_knowledge_graph_entities(branded_products)\n",
"\n",
" # Video API: Evaluate product_visuals_feature and product_visuals_1st_5_secs_feature\n",
" if use_annotations:\n",
" # Video API: Evaluate product_visuals and product_visuals_1st_5_secs\n",
" # Check in annotations at segment level\n",
" if \"segment_label_annotations\" in label_annotation_results:\n",
" # Process video/segment level label annotations\n",
" for segment_label in label_annotation_results.get(\n",
" \"segment_label_annotations\"\n",
" ):\n",
" for segment in segment_label.get(\"segments\"):\n",
" pv, pv_1st_5_secs = detect(\n",
" segment_label.get(\"entity\"),\n",
" segment,\n",
" branded_products_kg_entities,\n",
" branded_products,\n",
" branded_products_categories,\n",
" )\n",
" if pv:\n",
" product_visuals = True\n",
" if pv_1st_5_secs:\n",
" product_visuals_1st_5_secs = True\n",
" else:\n",
" print(\n",
" f\"No Segment Label annotations found. Skipping {product_visuals_feature} Segment Label evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # Check in annotations at shot level\n",
" if \"shot_label_annotations\" in label_annotation_results:\n",
" # Process shot level label annotations\n",
" for shot_label in label_annotation_results.get(\"shot_label_annotations\"):\n",
" for segment in shot_label.get(\"segments\"):\n",
" pv, pv_1st_5_secs = detect(\n",
" shot_label.get(\"entity\"),\n",
" segment,\n",
" branded_products_kg_entities,\n",
" branded_products,\n",
" branded_products_categories,\n",
" )\n",
" if pv:\n",
" product_visuals = True\n",
" if pv_1st_5_secs:\n",
" product_visuals_1st_5_secs = True\n",
" else:\n",
" print(\n",
" f\"No Shot Label annotations found. Skipping {product_visuals_feature} Shot Label evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # Check in annotations at frame level\n",
" if \"frame_label_annotations\" in label_annotation_results:\n",
" # Process frame level label annotations\n",
" for frame_label in label_annotation_results.get(\"frame_label_annotations\"):\n",
" for frame in frame_label.get(\"frames\"):\n",
" pv, pv_1st_5_secs = detect(\n",
" frame_label.get(\"entity\"),\n",
" frame,\n",
" branded_products_kg_entities,\n",
" branded_products,\n",
" branded_products_categories,\n",
" )\n",
" if pv:\n",
" product_visuals = True\n",
" if pv_1st_5_secs:\n",
" product_visuals_1st_5_secs = True\n",
" else:\n",
" print(\n",
" f\"No Frame Label annotations found. Skipping {product_visuals_feature} Frame Label evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate product_visuals_feature and product_visuals_1st_5_secs_feature\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
" # 1. Evaluate product_visuals_feature\n",
" prompt = (\n",
" \"\"\"Is any of the following products: {branded_products}\n",
" or product categories: {branded_products_categories}\n",
" visually present at any time in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Provide the exact timestamp when the products {branded_products}\n",
" or product categories: {branded_products_categories} are found.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{branded_products}\", \", \".join(branded_products)\n",
" )\n",
" .replace(\n",
" \"{branded_products_categories}\", \", \".join(branded_products_categories)\n",
" )\n",
" .replace(\"{feature}\", product_visuals_feature)\n",
" .replace(\"{criteria}\", product_visuals_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" product_visuals_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" product_visuals = True\n",
"\n",
" # Include llm details\n",
" product_visuals_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # 2. Evaluate product_visuals_1st_5_secs_feature\n",
" prompt = (\n",
" \"\"\"Is any of the following products: {branded_products}\n",
" or product categories: {branded_products_categories}\n",
" visually present in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Provide the exact timestamp when the products {branded_products}\n",
" or product categories: {branded_products_categories} are visually present.\n",
" Return True if and only if the branded producs or product categories are\n",
" visually present in the video.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{branded_products}\", \", \".join(branded_products)\n",
" )\n",
" .replace(\n",
" \"{branded_products_categories}\", \", \".join(branded_products_categories)\n",
" )\n",
" .replace(\"{feature}\", product_visuals_1st_5_secs_feature)\n",
" .replace(\"{criteria}\", product_visuals_1st_5_secs_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use first 5 secs video for this feature\n",
" video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, \"1st_5_secs\")\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri_1st_5_secs})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" product_visuals_1st_5_secs_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" product_visuals_1st_5_secs = True\n",
"\n",
" product_visuals_1st_5_secs_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" print(f\"{product_visuals_feature}: {product_visuals}\")\n",
" product_visuals_eval_details[\"feature_detected\"] = product_visuals\n",
" print(f\"{product_visuals_1st_5_secs_feature}: {product_visuals_1st_5_secs}\")\n",
" product_visuals_1st_5_secs_eval_details[\"feature_detected\"] = (\n",
" product_visuals_1st_5_secs\n",
" )\n",
"\n",
" return product_visuals_eval_details, product_visuals_1st_5_secs_eval_details"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "zRsrckZOf_mm"
},
"outputs": [],
"source": [
"# @title 12, 13) Brand: Product Mention (Text) & Product Mention (Text) (First 5 seconds)\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown **Product Mention (Text):** The branded product names or generic product categories are present in any text or overlay at any time in the video.\n",
"\n",
"# @markdown **Product Mention (Text) (First 5 seconds):** The branded product names or generic product categories are present in any text or overlay in the first 5 seconds (up to 4.99s) of the video.\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_product_mention_text(\n",
" text_annotation_results: any,\n",
" video_uri: str,\n",
" branded_products: list[str],\n",
" branded_products_categories: list[str],\n",
") -> tuple[dict, dict]:\n",
" \"\"\"Detect Product Mention (Text) & Product Mention (Text) (First 5 seconds)\n",
" Args:\n",
" text_annotation_results: text annotations\n",
" video_uri: video location in gcs\n",
" branded_products: list of products\n",
" branded_products_categories: list of products categories\n",
" Returns:\n",
" product_mention_text_eval_details,\n",
" product_mention_text_1st_5_secs_eval_details: product mention text evaluation\n",
" \"\"\"\n",
" # Feature Product Mention (Text)\n",
" product_mention_text_feature = \"Product Mention (Text)\"\n",
" product_mention_text = False\n",
" product_mention_text_criteria = \"\"\"The branded product names or generic product categories\n",
" are present in any text or overlay at any time in the video.\"\"\"\n",
" product_mention_text_eval_details = {\n",
" \"feature\": product_mention_text_feature,\n",
" \"feature_description\": product_mention_text_criteria,\n",
" \"feature_detected\": product_mention_text,\n",
" \"llm_details\": [],\n",
" }\n",
" # Feature Product Mention (Text) (First 5 seconds)\n",
" product_mention_text_1st_5_secs_feature = \"Product Mention (Text) (First 5 seconds)\"\n",
" product_mention_text_1st_5_secs = False\n",
" # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs\n",
" product_mention_text_1st_5_secs_criteria = \"\"\"The branded product names or generic product categories\n",
" are present in any text or overlay in the video.\"\"\"\n",
" product_mention_text_1st_5_secs_eval_details = {\n",
" \"feature\": product_mention_text_1st_5_secs_feature,\n",
" \"feature_description\": product_mention_text_1st_5_secs_criteria,\n",
" \"feature_detected\": product_mention_text_1st_5_secs,\n",
" \"llm_details\": [],\n",
" }\n",
"\n",
" # Video API: Evaluate product_mention_text_feature and product_mention_text_1st_5_secs_feature\n",
" if use_annotations:\n",
" if \"text_annotations\" in text_annotation_results:\n",
" # Video API: Evaluate product_mention_text_feature and product_mention_text_1st_5_secs_feature\n",
" for text_annotation in text_annotation_results.get(\"text_annotations\"):\n",
" text = text_annotation.get(\"text\")\n",
" found_branded_products = [\n",
" prod for prod in branded_products if prod.lower() in text.lower()\n",
" ]\n",
" found_branded_products_categories = [\n",
" prod\n",
" for prod in branded_products_categories\n",
" if prod.lower() in text.lower()\n",
" ]\n",
" if (\n",
" len(found_branded_products) > 0\n",
" or len(found_branded_products_categories) > 0\n",
" ):\n",
" product_mention_text = True\n",
" pmt_1st_5_secs, frame = detected_text_in_first_5_seconds(\n",
" text_annotation\n",
" )\n",
" if pmt_1st_5_secs:\n",
" product_mention_text_1st_5_secs = True\n",
" else:\n",
" print(\n",
" f\"No Text annotations found. Skipping {product_mention_text_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate product_mention_text_feature and product_mention_text_1st_5_secs_feature\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
" # 1. Evaluate product_mention_text_feature\n",
" prompt = (\n",
" \"\"\"Is any of the following products: {branded_products}\n",
" or product categories: {branded_products_categories}\n",
" present in any text or overlay at any time in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Provide the exact timestamp when the products {branded_products}\n",
" or product categories: {branded_products_categories} are found\n",
" in any text or overlay in the video.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{branded_products}\", f\"{', '.join(branded_products)}\"\n",
" )\n",
" .replace(\n",
" \"{branded_products_categories}\",\n",
" f\"{', '.join(branded_products_categories)}\",\n",
" )\n",
" .replace(\"{feature}\", product_mention_text_feature)\n",
" .replace(\"{criteria}\", product_mention_text_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" product_mention_text_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" product_mention_text = True\n",
"\n",
" # Include llm details\n",
" product_mention_text_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # 2. Evaluate product_mention_text_1st_5_secs_feature\n",
" prompt = (\n",
" \"\"\"Is any of the following products: {branded_products}\n",
" or product categories: {branded_products_categories}\n",
" present in any text or overlay in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Provide the exact timestamp when the products {branded_products}\n",
" or product categories: {branded_products_categories} are found\n",
" in any text or overlay in the video.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{branded_products}\", f\"{', '.join(branded_products)}\"\n",
" )\n",
" .replace(\n",
" \"{branded_products_categories}\",\n",
" f\"{', '.join(branded_products_categories)}\",\n",
" )\n",
" .replace(\"{feature}\", product_mention_text_1st_5_secs_feature)\n",
" .replace(\"{criteria}\", product_mention_text_1st_5_secs_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use first 5 secs video for this feature\n",
" video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, \"1st_5_secs\")\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri_1st_5_secs})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" product_mention_text_1st_5_secs_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" product_mention_text_1st_5_secs = True\n",
"\n",
" product_mention_text_1st_5_secs_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" print(f\"{product_mention_text_feature}: {product_mention_text}\")\n",
" product_mention_text_eval_details[\"feature_detected\"] = product_mention_text\n",
" print(\n",
" f\"{product_mention_text_1st_5_secs_feature}: {product_mention_text_1st_5_secs}\"\n",
" )\n",
" product_mention_text_1st_5_secs_eval_details[\"feature_detected\"] = (\n",
" product_mention_text_1st_5_secs\n",
" )\n",
"\n",
" return (\n",
" product_mention_text_eval_details,\n",
" product_mention_text_1st_5_secs_eval_details,\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "CrChmUgZgB-u"
},
"outputs": [],
"source": [
"# @title 14, 15) Brand: Product Mention (Speech), Product Mention (Speech) (First 5 seconds)\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown **Product Mention (Speech):** The branded product names or generic product categories are heard or mentioned in the audio or speech at any time in the video.\n",
"\n",
"# @markdown **Product Mention (Speech) (First 5 seconds):** The branded product names or generic product categories are heard or mentioned in the audio or speech in the first 5 seconds (up to 4.99s) of the video.\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_product_mention_speech(\n",
" speech_annotation_results: any,\n",
" video_uri: str,\n",
" branded_products: list[str],\n",
" branded_products_categories: list[str],\n",
") -> tuple[dict, dict]:\n",
" \"\"\"Detect Product Mention (Speech) & Product Mention (Speech) (First 5 seconds)\n",
" Args:\n",
" speech_annotation_results: peech annotations\n",
" video_uri: video location in gcs\n",
" branded_products: list of products\n",
" branded_products_categories: list of products categories\n",
" Returns:\n",
" product_mention_speech_eval_details,\n",
" product_mention_speech_1st_5_secs_eval_details: product mention speech evaluation\n",
" \"\"\"\n",
" # Feature Product Mention (Speech)\n",
" product_mention_speech_feature = \"Product Mention (Speech)\"\n",
" product_mention_speech = False\n",
" product_mention_speech_criteria = \"\"\"The branded product names or generic product categories\n",
" are heard or mentioned in the audio or speech at any time in the video.\"\"\"\n",
" product_mention_speech_eval_details = {\n",
" \"feature\": product_mention_speech_feature,\n",
" \"feature_description\": product_mention_speech_criteria,\n",
" \"feature_detected\": product_mention_speech,\n",
" \"llm_details\": [],\n",
" }\n",
" # Feature Product Mention (Speech) (First 5 seconds)\n",
" product_mention_speech_1st_5_secs_feature = (\n",
" \"Product Mention (Speech) (First 5 seconds)\"\n",
" )\n",
" product_mention_speech_1st_5_secs = False\n",
" # remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs\n",
" product_mention_speech_1st_5_secs_criteria = \"\"\"The branded product names or generic product categories\n",
" are heard or mentioned in the audio or speech in the the video.\"\"\"\n",
" product_mention_speech_1st_5_secs_eval_details = {\n",
" \"feature\": product_mention_speech_1st_5_secs_feature,\n",
" \"feature_description\": product_mention_speech_1st_5_secs_criteria,\n",
" \"feature_detected\": product_mention_speech_1st_5_secs,\n",
" \"llm_details\": [],\n",
" }\n",
"\n",
" # Video API: Evaluate product_mention_speech_feature and product_mention_speech_1st_5_secs_feature\n",
" if use_annotations:\n",
" if \"speech_transcriptions\" in speech_annotation_results:\n",
" # Video API: Evaluate product_mention_speech & product_mention_speech_1st_5_secs\n",
" (\n",
" product_mention_speech,\n",
" product_mention_speech_1st_5_secs,\n",
" ) = find_elements_in_transcript(\n",
" speech_transcriptions=speech_annotation_results.get(\n",
" \"speech_transcriptions\"\n",
" ),\n",
" elements=branded_products,\n",
" elements_categories=branded_products_categories,\n",
" apply_condition=False,\n",
" )\n",
" else:\n",
" print(\n",
" f\"No Speech annotations found. Skipping {product_mention_speech_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate product_mention_speech_feature and product_mention_speech_1st_5_secs_feature\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
"\n",
" # LLM Only\n",
" # 1. Evaluate product_mention_speech_feature\n",
" prompt = (\n",
" \"\"\"Are any of the following products: {branded_products}\n",
" or product categories: {branded_products_categories} heard\n",
" at any time in the speech of the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Provide the exact timestamp when the products {branded_products}\n",
" or product categories {branded_products_categories} are heard in the speech of the video.\n",
" Return False if the products or product categories are not heard in the speech.\n",
" Only strictly use the speech of the video to answer, don't consider visual elements.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{branded_products}\", f\"{', '.join(branded_products)}\"\n",
" )\n",
" .replace(\n",
" \"{branded_products_categories}\",\n",
" f\"{', '.join(branded_products_categories)}\",\n",
" )\n",
" .replace(\"{feature}\", product_mention_speech_feature)\n",
" .replace(\"{criteria}\", product_mention_speech_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" product_mention_speech_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" product_mention_speech = True\n",
"\n",
" # Include llm details\n",
" product_mention_speech_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # 2. Evaluate product_mention_speech_feature_1st_5_secs\n",
" prompt = (\n",
" \"\"\"Are any of the following products: {branded_products}\n",
" or product categories: {branded_products_categories} heard in the speech of the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Provide the exact timestamp when the products {branded_products}\n",
" or product categories {branded_products_categories} are heard in the speech of the video.\n",
" Return False if the products or product categories are not heard in the speech.\n",
" Only strictly use the speech of the video to answer, don't consider visual elements.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{branded_products}\", f\"{', '.join(branded_products)}\"\n",
" )\n",
" .replace(\n",
" \"{branded_products_categories}\",\n",
" f\"{', '.join(branded_products_categories)}\",\n",
" )\n",
" .replace(\"{feature}\", product_mention_speech_1st_5_secs_feature)\n",
" .replace(\"{criteria}\", product_mention_speech_1st_5_secs_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use first 5 secs video for this feature\n",
" video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, \"1st_5_secs\")\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri_1st_5_secs})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" product_mention_speech_1st_5_secs_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" product_mention_speech_1st_5_secs = True\n",
"\n",
" # Include llm details\n",
" product_mention_speech_1st_5_secs_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # Combination of Annotations + LLM\n",
" if use_annotations:\n",
" if \"speech_transcriptions\" in speech_annotation_results:\n",
" # 1. Evaluate product_mention_speech_feature\n",
" transcript = get_speech_transcript(\n",
" speech_annotation_results.get(\"speech_transcriptions\")\n",
" )\n",
" prompt = (\n",
" \"\"\"Does the provided speech transcript mention any of the following products: {branded_products}\n",
" or product categories: {branded_products_categories} at any time in the video?\n",
" This is the speech transcript: \"{transcript}\"\n",
" Consider the following criteria for your answer: {criteria}\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{branded_products}\", f\"{', '.join(branded_products)}\"\n",
" )\n",
" .replace(\n",
" \"{branded_products_categories}\",\n",
" f\"{', '.join(branded_products_categories)}\",\n",
" )\n",
" .replace(\"{transcript}\", transcript)\n",
" .replace(\"{feature}\", product_mention_speech_feature)\n",
" .replace(\"{criteria}\", product_mention_speech_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Set modality to text since we are not using video for Annotations + LLM\n",
" llm_params.set_modality({\"type\": \"text\"})\n",
" # If transcript is empty, this feature should be False\n",
" if transcript:\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" product_mention_speech_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" product_mention_speech = True\n",
"\n",
" # Include llm details\n",
" product_mention_speech_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
" else:\n",
" product_mention_speech = False\n",
" # Include default details\n",
" product_mention_speech_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": \"Annotations + LLM: Speech was not found in annotations.\",\n",
" }\n",
" )\n",
"\n",
" # 2. Evaluate product_mention_speech_feature_1st_5_secs\n",
" transcript_1st_5_secs = get_speech_transcript_1st_5_secs(\n",
" speech_annotation_results.get(\"speech_transcriptions\")\n",
" )\n",
" prompt = (\n",
" \"\"\"Does the provided speech transcript mention any of the following products: {branded_products}\n",
" or product categories: {branded_products_categories} in the video?\n",
" This is the speech transcript: \"{transcript}\"\n",
" Consider the following criteria for your answer: {criteria}\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{branded_products}\", f\"{', '.join(branded_products)}\"\n",
" )\n",
" .replace(\n",
" \"{branded_products_categories}\",\n",
" f\"{', '.join(branded_products_categories)}\",\n",
" )\n",
" .replace(\"{transcript}\", transcript_1st_5_secs)\n",
" .replace(\"{feature}\", product_mention_speech_1st_5_secs_feature)\n",
" .replace(\"{criteria}\", product_mention_speech_1st_5_secs_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Set modality to text since we are not using video for Annotations + LLM\n",
" llm_params.set_modality({\"type\": \"text\"})\n",
" # If transcript is empty, this feature should be False\n",
" if transcript_1st_5_secs:\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" product_mention_speech_1st_5_secs_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" product_mention_speech_1st_5_secs = True\n",
"\n",
" # Include llm details\n",
" product_mention_speech_1st_5_secs_eval_details[\n",
" \"llm_details\"\n",
" ].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
" else:\n",
" product_mention_speech_1st_5_secs = False\n",
" # Include default details\n",
" product_mention_speech_1st_5_secs_eval_details[\n",
" \"llm_details\"\n",
" ].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": \"Annotations + LLM: Speech was not found in annotations.\",\n",
" }\n",
" )\n",
" else:\n",
" print(\n",
" f\"No Speech annotations found. Skipping {product_mention_speech_feature} evaluation with LLM.\"\n",
" )\n",
"\n",
" print(f\"{product_mention_speech_feature}: {product_mention_speech}\")\n",
" product_mention_speech_eval_details[\"feature_detected\"] = product_mention_speech\n",
" print(\n",
" f\"{product_mention_speech_1st_5_secs_feature}: {product_mention_speech_1st_5_secs}\"\n",
" )\n",
" product_mention_speech_1st_5_secs_eval_details[\"feature_detected\"] = (\n",
" product_mention_speech_1st_5_secs\n",
" )\n",
"\n",
" return (\n",
" product_mention_speech_eval_details,\n",
" product_mention_speech_1st_5_secs_eval_details,\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "iPTNST3BgEeG"
},
"outputs": [],
"source": [
"# @title 16 & 17) Connect: Visible Face (First 5 seconds) & Visible Face (Close Up)\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown **Visible Face (First 5 seconds):** At least one human face is present in the first 5 seconds (up to 4.99s) of the video. Alternate representations of people such as Animations or Cartoons ARE acceptable.\n",
"\n",
"# @markdown **Visible Face (Close Up):** There is a close up of a human face at any time in the video.\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_visible_face(\n",
" face_annotation_results: any, video_uri: str\n",
") -> tuple[bool, bool]:\n",
" \"\"\"Detect Visible Face (First 5 seconds) & Visible Face (Close Up)\n",
" Args:\n",
" face_annotation_results: face annotations\n",
" video_uri: video location in gcs\n",
" Returns:\n",
" visible_face_1st_5_secs_eval_details,\n",
" visible_face_close_up_eval_details: visible face evaluation\n",
" \"\"\"\n",
" # Feature Visible Face (First 5 seconds)\n",
" visible_face_1st_5_secs_feature = \"Visible Face (First 5 seconds)\"\n",
" visible_face_1st_5_secs = False\n",
" # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs\n",
" visible_face_1st_5_secs_criteria = \"\"\"At least one human face is present in the video.\n",
" Alternate representations of people such as Animations or Cartoons ARE acceptable.\"\"\"\n",
" visible_face_1st_5_secs_eval_details = {\n",
" \"feature\": visible_face_1st_5_secs_feature,\n",
" \"feature_description\": visible_face_1st_5_secs_criteria,\n",
" \"feature_detected\": visible_face_1st_5_secs,\n",
" \"llm_details\": [],\n",
" }\n",
" # Feature Visible Face (Close Up)\n",
" visible_face_close_up_feature = \"Visible Face (Close Up)\"\n",
" visible_face_close_up = False\n",
" visible_face_close_up_criteria = (\n",
" \"\"\"There is a close up of a human face at any time in the video.\"\"\"\n",
" )\n",
" visible_face_close_up_eval_details = {\n",
" \"feature\": visible_face_close_up_feature,\n",
" \"feature_description\": visible_face_close_up_criteria,\n",
" \"feature_detected\": visible_face_close_up,\n",
" \"llm_details\": [],\n",
" }\n",
"\n",
" # Video API: Evaluate visible_face_1st_5_secs_feature and visible_face_close_up_feature\n",
" if use_annotations:\n",
" if \"face_detection_annotations\" in face_annotation_results:\n",
" # Video API: Evaluate visible_face_1st_5_secs_feature and visible_face_close_up_feature\n",
" if face_annotation_results.get(\"face_detection_annotations\"):\n",
" for annotation in face_annotation_results.get(\n",
" \"face_detection_annotations\"\n",
" ):\n",
" for track in annotation.get(\"tracks\"):\n",
" start_time_secs = calculate_time_seconds(\n",
" track.get(\"segment\"), \"start_time_offset\"\n",
" )\n",
" # Check confidence against user defined threshold\n",
" if track.get(\"confidence\") >= confidence_threshold:\n",
" if start_time_secs < early_time_seconds:\n",
" visible_face_1st_5_secs = True\n",
" for face_object in track.get(\"timestamped_objects\"):\n",
" box = face_object.get(\"normalized_bounding_box\")\n",
" left = box.get(\"left\") or 0\n",
" right = box.get(\"right\") or 1\n",
" top = box.get(\"top\") or 0\n",
" bottom = box.get(\"bottom\") or 1\n",
" width = right - left\n",
" height = bottom - top\n",
" surface = width * height\n",
" if surface >= face_surface_threshold:\n",
" visible_face_close_up = True\n",
" else:\n",
" print(\n",
" f\"No Face annotations found. Skipping {visible_face_1st_5_secs_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate visible_face_1st_5_secs_feature and visible_face_close_up_feature\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
" # 1. Evaluate visible_face_1st_5_secs_feature\n",
" prompt = (\n",
" \"\"\"Is there a human face present in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the exact timestamp when the human face is present.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", visible_face_1st_5_secs_feature\n",
" )\n",
" .replace(\"{criteria}\", visible_face_1st_5_secs_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use first 5 secs video for this feature\n",
" video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, \"1st_5_secs\")\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri_1st_5_secs})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" visible_face_1st_5_secs_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" visible_face_1st_5_secs = True\n",
"\n",
" # Include llm details\n",
" visible_face_1st_5_secs_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # 2. Evaluate visible_face_close_up_feature\n",
" prompt = (\n",
" \"\"\"Is there a close up of a human face present at any time the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the exact timestamp when there is a close up of a human face.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", visible_face_close_up_feature\n",
" )\n",
" .replace(\"{criteria}\", visible_face_close_up_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" visible_face_close_up_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" visible_face_close_up = True\n",
"\n",
" # Include llm details\n",
" visible_face_close_up_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" print(f\"{visible_face_1st_5_secs_feature}: {visible_face_1st_5_secs}\")\n",
" visible_face_1st_5_secs_eval_details[\"feature_detected\"] = visible_face_1st_5_secs\n",
" print(f\"{visible_face_close_up_feature}: {visible_face_close_up}\")\n",
" visible_face_close_up_eval_details[\"feature_detected\"] = visible_face_close_up\n",
"\n",
" return visible_face_1st_5_secs_eval_details, visible_face_close_up_eval_details\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "nOMtIO9hgGtu"
},
"outputs": [],
"source": [
"# @title 18 & 19) Connect: Presence of People & Presence of People (First 5 seconds)\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown **Presence of People:** People are shown in any capacity at any time in the video. Any human body parts are acceptable to pass this guideline. Alternate representations of people such as Animations or Cartoons ARE acceptable.\n",
"\n",
"# @markdown **Presence of People (First 5 seconds):** People are shown in any capacity in the first 5 seconds (up to 4.99s) of the video. Any human body parts are acceptable to pass this guideline. Alternate representations of people such as Animations or Cartoons ARE acceptable.\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_presence_of_people(\n",
" people_annotation_results: any, video_uri: str\n",
") -> tuple[dict, dict]:\n",
" \"\"\"Detect Presence of People & Presence of People (First 5 seconds)\n",
" Args:\n",
" people_annotation_results: people annotations\n",
" video_uri: video location in gcs\n",
" Returns:\n",
" presence_of_people_eval_details,\n",
" presence_of_people_1st_5_secs_eval_details: presence of people evaluation\n",
" \"\"\"\n",
" # Feature Presence of People\n",
" presence_of_people_feature = \"Presence of People\"\n",
" presence_of_people = False\n",
" presence_of_people_criteria = \"\"\"People are shown in any capacity at any time in the video.\n",
" Any human body parts are acceptable to pass this guideline. Alternate representations of\n",
" people such as Animations or Cartoons ARE acceptable.\"\"\"\n",
" presence_of_people_eval_details = {\n",
" \"feature\": presence_of_people_feature,\n",
" \"feature_description\": presence_of_people_criteria,\n",
" \"feature_detected\": presence_of_people,\n",
" \"llm_details\": [],\n",
" }\n",
" # Feature Presence of People (First 5 seconds)\n",
" presence_of_people_1st_5_secs_feature = \"Presence of People (First 5 seconds)\"\n",
" presence_of_people_1st_5_secs = False\n",
" # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs\n",
" presence_of_people_1st_5_secs_criteria = \"\"\"People are shown in any capacity in the video.\n",
" Any human body parts are acceptable to pass this guideline. Alternate\n",
" representations of people such as Animations or Cartoons ARE acceptable.\"\"\"\n",
" presence_of_people_1st_5_secs_eval_details = {\n",
" \"feature\": presence_of_people_1st_5_secs_feature,\n",
" \"feature_description\": presence_of_people_1st_5_secs_criteria,\n",
" \"feature_detected\": presence_of_people_1st_5_secs,\n",
" \"llm_details\": [],\n",
" }\n",
"\n",
" # Video API: Evaluate presence_of_people_feature and presence_of_people_1st_5_secs_feature\n",
" if use_annotations:\n",
" if \"person_detection_annotations\" in people_annotation_results:\n",
" # Video API: Evaluate presence_of_people_feature and presence_of_people_1st_5_secs_feature\n",
" for people_annotation in people_annotation_results.get(\n",
" \"person_detection_annotations\"\n",
" ):\n",
" for track in people_annotation.get(\"tracks\"):\n",
" # Check confidence against user defined threshold\n",
" if track.get(\"confidence\") >= confidence_threshold:\n",
" presence_of_people = True\n",
" start_time_secs = calculate_time_seconds(\n",
" track.get(\"segment\"), \"start_time_offset\"\n",
" )\n",
" if start_time_secs < early_time_seconds:\n",
" presence_of_people_1st_5_secs = True\n",
" # Each segment includes track.get(\"timestamped_objects\") that include\n",
" # characteristics - -e.g.clothes, posture of the person detected.\n",
" else:\n",
" print(\n",
" f\"No People annotations found. Skipping {presence_of_people_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate presence_of_people_feature and presence_of_people_1st_5_secs_feature\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
" # 1. Evaluate presence_of_people_feature\n",
" prompt = (\n",
" \"\"\"Are there people present at any time in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the exact timestamp when people are present in the video.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", presence_of_people_feature\n",
" )\n",
" .replace(\"{criteria}\", presence_of_people_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" presence_of_people_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" presence_of_people = True\n",
"\n",
" # Include llm details\n",
" presence_of_people_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # 2. Evaluate presence_of_people_1st_5_secs_feature\n",
" prompt = (\n",
" \"\"\"Are there people present in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the exact timestamp when people are present in the video.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", presence_of_people_1st_5_secs_feature\n",
" )\n",
" .replace(\"{criteria}\", presence_of_people_1st_5_secs_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use first 5 secs video for this feature\n",
" video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, \"1st_5_secs\")\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri_1st_5_secs})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" presence_of_people_1st_5_secs_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" presence_of_people_1st_5_secs = True\n",
"\n",
" # Include llm details\n",
" presence_of_people_1st_5_secs_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" print(f\"{presence_of_people_feature}: {presence_of_people}\")\n",
" presence_of_people_eval_details[\"feature_detected\"] = presence_of_people\n",
" print(f\"{presence_of_people_1st_5_secs_feature}: {presence_of_people_1st_5_secs}\")\n",
" presence_of_people_1st_5_secs_eval_details[\"feature_detected\"] = (\n",
" presence_of_people_1st_5_secs\n",
" )\n",
"\n",
" return presence_of_people_eval_details, presence_of_people_1st_5_secs_eval_details"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "ZJFgfBmBgJHu"
},
"outputs": [],
"source": [
"# @title 20) Direct: Audio Speech Early\n",
"\n",
"# @markdown **Features**\n",
"\n",
"# @markdown **Audio Early (First 5 seconds):** Speech is detected in the audio in the first 5 seconds (up to 4.99s) of the video\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_audio_speech_early(speech_annotation_results: any, video_uri: str) -> bool:\n",
" \"\"\"Detect Audio Early (First 5 seconds)\n",
" Args:\n",
" speech_annotation_results: speech annotations\n",
" video_uri: video location in gcs\n",
" Returns:\n",
" audio_speech_early_eval_details: audio early evaluation\n",
" \"\"\"\n",
" # Feature Audio Early (First 5 seconds)\n",
" audio_speech_early_feature = \"Audio Early (First 5 seconds)\"\n",
" audio_speech_early = False\n",
" # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs\n",
" audio_speech_early_criteria = \"\"\"Speech is detected in the audio of the video.\"\"\"\n",
" audio_speech_early_eval_details = {\n",
" \"feature\": audio_speech_early_feature,\n",
" \"feature_description\": audio_speech_early_criteria,\n",
" \"feature_detected\": audio_speech_early,\n",
" \"llm_details\": [],\n",
" }\n",
"\n",
" # Video API: Evaluate audio_speech_early_feature\n",
" if use_annotations:\n",
" if \"speech_transcriptions\" in speech_annotation_results:\n",
" # Video API: Evaluate audio_speech_early_feature\n",
" for speech_transcription in speech_annotation_results.get(\n",
" \"speech_transcriptions\"\n",
" ):\n",
" for alternative in speech_transcription.get(\"alternatives\"):\n",
" # Check confidence against user defined threshold\n",
" if (\n",
" alternative\n",
" and alternative.get(\"confidence\") >= confidence_threshold\n",
" ):\n",
" # For 1st 5 secs, check elements and elements_categories in words\n",
" # since only the words[] contain times\n",
" words = (\n",
" alternative.get(\"words\") if \"words\" in alternative else []\n",
" )\n",
" for word_info in words:\n",
" start_time_secs = calculate_time_seconds(\n",
" word_info, \"start_time\"\n",
" )\n",
" if start_time_secs <= early_time_seconds:\n",
" audio_speech_early = True\n",
" else:\n",
" print(\n",
" f\"No Speech annotations found. Skipping {audio_speech_early_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate audio_speech_early_feature\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
"\n",
" # LLM Only\n",
" # 1. Evaluate product_mention_speech_feature\n",
" prompt = (\n",
" \"\"\"Is speech detected in the audio of the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Only strictly use the speech of the video to answer.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", audio_speech_early_feature\n",
" )\n",
" .replace(\"{criteria}\", audio_speech_early_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use first 5 secs video for this feature\n",
" video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, \"1st_5_secs\")\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri_1st_5_secs})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" audio_speech_early_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" audio_speech_early = True\n",
"\n",
" # Include llm details\n",
" audio_speech_early_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # Combination of Annotations + LLM\n",
" if use_annotations:\n",
" if \"speech_transcriptions\" in speech_annotation_results:\n",
" # 1. Evaluate product_mention_speech_feature\n",
" transcript_1st_5_secs = get_speech_transcript_1st_5_secs(\n",
" speech_annotation_results.get(\"speech_transcriptions\")\n",
" )\n",
" prompt = (\n",
" \"\"\"Does the provided speech transcript mention any words?\n",
" This is the speech transcript: \"{transcript}\"\n",
" Consider the following criteria for your answer: {criteria}\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{transcript}\", transcript_1st_5_secs\n",
" )\n",
" .replace(\"{feature}\", audio_speech_early_feature)\n",
" .replace(\"{criteria}\", audio_speech_early_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Set modality to text since we are not using video for Annotations + LLM\n",
" llm_params.set_modality({\"type\": \"text\"})\n",
" # If transcript is empty, this feature should be False\n",
" if transcript_1st_5_secs:\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" audio_speech_early_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" audio_speech_early = True\n",
"\n",
" # Include llm details\n",
" audio_speech_early_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
" else:\n",
" audio_speech_early = False\n",
" # Include default details\n",
" audio_speech_early_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": \"Annotations + LLM: Speech was not found in annotations.\",\n",
" }\n",
" )\n",
" else:\n",
" print(\n",
" f\"No Speech annotations found. Skipping {audio_speech_early_feature} evaluation with LLM.\"\n",
" )\n",
"\n",
" print(f\"{audio_speech_early_feature}: {audio_speech_early}\")\n",
" audio_speech_early_eval_details[\"feature_detected\"] = audio_speech_early\n",
"\n",
" return audio_speech_early_eval_details"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "R0DPLM0KgLC7"
},
"outputs": [],
"source": [
"# @title 21) Connect: Overall Pacing\n",
"\n",
"# @markdown **Features:**\n",
"\n",
"# @markdown **Overall Pacing:** The pace of the video is greater than 2 seconds per shot/frame\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_overall_pacing(shot_annotation_results: any, video_uri: str) -> dict:\n",
" \"\"\"Detect Overall Pacing\n",
" Args:\n",
" shot_annotation_results: shot annotations\n",
" video_uri: video location in gcs\n",
" Returns:\n",
" overall_pacing_eval_details: overall pacing evaluation\n",
" \"\"\"\n",
" # Feature Overall Pacing\n",
" overall_pacing_feature = \"Overall Pacing\"\n",
" overall_pacing = False\n",
" overall_pacing_criteria = (\n",
" \"\"\"The pace of the video is greater than 2 seconds per shot/frame\"\"\"\n",
" )\n",
" overall_pacing_eval_details = {\n",
" \"feature\": overall_pacing_feature,\n",
" \"feature_description\": overall_pacing_criteria,\n",
" \"feature_detected\": overall_pacing,\n",
" \"llm_details\": [],\n",
" }\n",
" total_time_all_shots = 0\n",
" total_shots = 0\n",
"\n",
" # Video API: Evaluate overall_pacing_feature\n",
" if use_annotations:\n",
" if \"shot_annotations\" in shot_annotation_results:\n",
" # Video API: Evaluate overall_pacing_feature\n",
" for shot in shot_annotation_results.get(\"shot_annotations\"):\n",
" start_time_secs = calculate_time_seconds(shot, \"start_time_offset\")\n",
" end_time_secs = calculate_time_seconds(shot, \"end_time_offset\")\n",
" total_shot_time = end_time_secs - start_time_secs\n",
" total_time_all_shots += total_shot_time\n",
" total_shots += 1\n",
" avg_pacing = total_time_all_shots / total_shots\n",
" if avg_pacing <= avg_shot_duration_seconds:\n",
" overall_pacing = True\n",
" else:\n",
" print(\n",
" f\"No Shot annotations found. Skipping {overall_pacing_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate overall_pacing_feature\n",
" if use_llms:\n",
" # 1. Evaluate overall_pacing_feature\n",
" prompt = (\n",
" \"\"\"Is the pace of video greater than 2 seconds per shot/frame?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Return True if and only if the pace of video greater than 2 seconds per shot/frame\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{feature}\", overall_pacing_feature\n",
" )\n",
" .replace(\"{criteria}\", overall_pacing_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" overall_pacing_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" overall_pacing = True\n",
"\n",
" # Include llm details\n",
" overall_pacing_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" print(f\"{overall_pacing_feature}: {overall_pacing}\")\n",
" overall_pacing_eval_details[\"feature_detected\"] = overall_pacing\n",
"\n",
" return overall_pacing_eval_details\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "HKVoPBQ5gLvW"
},
"outputs": [],
"source": [
"# @title 22 & 23) Direct: Call To Action (Text) & Call To Action (Speech)\n",
"\n",
"# @markdown: **Features**\n",
"\n",
"# @markdown **Call To Action (Text):** A 'Call To Action' phrase is detected in the video supers (overlaid text) at any time in the video.\n",
"\n",
"# @markdown **Call To Action (Speech):** A 'Call To Action' phrase is heard or mentioned in the audio or speech at any time in the video.\n",
"\n",
"\n",
"call_to_action_api_list = [\n",
" \"LEARN MORE\",\n",
" \"GET QUOTE\",\n",
" \"APPLY NOW\",\n",
" \"SIGN UP\",\n",
" \"CONTACT US\",\n",
" \"SUBSCRIBE\",\n",
" \"DOWNLOAD\",\n",
" \"BOOK NOW\",\n",
" \"SHOP NOW\",\n",
" \"BUY NOW\",\n",
" \"DONATE NOW\",\n",
" \"ORDER NOW\",\n",
" \"PLAY NOW\",\n",
" \"SEE MORE\",\n",
" \"START NOW\",\n",
" \"VISIT SITE\",\n",
" \"WATCH NOW\",\n",
"]\n",
"call_to_action_verbs_api_list = [\n",
" \"LEARN\",\n",
" \"QUOTE\",\n",
" \"APPLY\",\n",
" \"SIGN UP\",\n",
" \"CONTACT\",\n",
" \"SUBSCRIBE\",\n",
" \"DOWNLOAD\",\n",
" \"BOOK\",\n",
" \"SHOP\",\n",
" \"BUY\",\n",
" \"DONATE\",\n",
" \"ORDER\",\n",
" \"PLAY\",\n",
" \"SEE\",\n",
" \"START\",\n",
" \"VISIT\",\n",
" \"WATCH\",\n",
"]\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_call_to_action_speech(\n",
" speech_annotation_results: any,\n",
" video_uri: str,\n",
" branded_call_to_actions: list[str],\n",
") -> bool:\n",
" \"\"\"Detect Call To Action (Speech)\n",
" Args:\n",
" speech_annotation_results: speech annotations\n",
" video_uri: video location in gcs\n",
" branded_call_to_actions: list of branded call to actions\n",
" Returns:\n",
" call_to_action_speech_eval_details: call to action speech evaluation\n",
" \"\"\"\n",
" # Feature Call To Action (Speech)\n",
" call_to_action_speech_feature = \"Call To Action (Speech)\"\n",
" call_to_action_speech = False\n",
" call_to_action_speech_criteria = \"\"\"A 'Call To Action' phrase is heard or mentioned in the audio or speech\n",
" at any time in the video.\"\"\"\n",
" call_to_action_speech_eval_details = {\n",
" \"feature\": call_to_action_speech_feature,\n",
" \"feature_description\": call_to_action_speech_criteria,\n",
" \"feature_detected\": call_to_action_speech,\n",
" \"llm_details\": [],\n",
" }\n",
" call_to_action_api_list.extend(branded_call_to_actions)\n",
"\n",
" # Video API: Evaluate call_to_action_speech_feature\n",
" if use_annotations:\n",
" if \"speech_transcriptions\" in speech_annotation_results:\n",
" # Video API: Evaluate call_to_action_speech_feature\n",
" (\n",
" call_to_action_speech,\n",
" na,\n",
" ) = find_elements_in_transcript(\n",
" speech_transcriptions=speech_annotation_results.get(\n",
" \"speech_transcriptions\"\n",
" ),\n",
" elements=call_to_action_api_list,\n",
" elements_categories=[],\n",
" apply_condition=False,\n",
" )\n",
" else:\n",
" print(\n",
" f\"No Speech annotations found. Skipping {call_to_action_speech} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate call_to_action_speech_feature\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
"\n",
" # LLM Only\n",
" prompt = (\n",
" \"\"\"Is any call to action heard or mentioned in the speech of the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Some examples of call to actions are: {call_to_actions}\n",
" Provide the exact timestamp when the call to actions are heard or mentioned in the\n",
" speech of the video.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{call_to_actions}\", \", \".join(call_to_action_api_list)\n",
" )\n",
" .replace(\"{feature}\", call_to_action_speech_feature)\n",
" .replace(\"{criteria}\", call_to_action_speech_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" call_to_action_speech_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" call_to_action_speech = True\n",
"\n",
" # Include llm details\n",
" call_to_action_speech_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" # Combination of Annotations + LLM\n",
" if use_annotations:\n",
" if \"speech_transcriptions\" in speech_annotation_results:\n",
" # Evaluate call_to_action_speech_feature\n",
" transcript = get_speech_transcript(\n",
" speech_annotation_results.get(\"speech_transcriptions\")\n",
" )\n",
" prompt = (\n",
" \"\"\"Does the provided speech transcript mention any call to actions in the video?\n",
" This is the speech transcript: \"{transcript}\"\n",
" Consider the following criteria for your answer: {criteria}\n",
" Some examples of call to actions are: {call_to_actions}\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{call_to_actions}\", \", \".join(call_to_action_api_list)\n",
" )\n",
" .replace(\"{transcript}\", transcript)\n",
" .replace(\"{feature}\", call_to_action_speech_feature)\n",
" .replace(\"{criteria}\", call_to_action_speech_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Set modality to text since we are not using video for Annotations + LLM\n",
" llm_params.set_modality({\"type\": \"text\"})\n",
" # If transcript is empty, this feature should be False\n",
" if transcript:\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" call_to_action_speech_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" call_to_action_speech = True\n",
"\n",
" # Include llm details\n",
" call_to_action_speech_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
" else:\n",
" call_to_action_speech = False\n",
" # Include default details\n",
" call_to_action_speech_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": \"Annotations + LLM: Speech was not found in annotations.\",\n",
" }\n",
" )\n",
" else:\n",
" print(\n",
" f\"No Speech annotations found. Skipping {call_to_action_speech_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" print(f\"{call_to_action_speech_feature}: {call_to_action_speech}\")\n",
" call_to_action_speech_eval_details[\"feature_detected\"] = call_to_action_speech\n",
"\n",
" return call_to_action_speech_eval_details\n",
"\n",
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def detect_call_to_action_text(\n",
" text_annotation_results: any,\n",
" video_uri: str,\n",
" branded_call_to_actions: list[str],\n",
") -> bool:\n",
" \"\"\"Detect Call To Action (Text)\n",
" Args:\n",
" text_annotation_results: text annotations\n",
" video_uri: video location in gcs\n",
" branded_call_to_actions: list of branded call to actions\n",
" Returns:\n",
" call_to_action_text_eval_details: call to action text evaluation\n",
" \"\"\"\n",
" # Feature Call To Action (Text)\n",
" call_to_action_text_feature = \"Call To Action (Text)\"\n",
" call_to_action_text = False\n",
" call_to_action_text_criteria = \"\"\"A 'Call To Action' phrase is detected in the video supers (overlaid text)\n",
" at any time in the video.\"\"\"\n",
" call_to_action_text_eval_details = {\n",
" \"feature\": call_to_action_text_feature,\n",
" \"feature_description\": call_to_action_text_criteria,\n",
" \"feature_detected\": call_to_action_text,\n",
" \"llm_details\": [],\n",
" }\n",
" call_to_action_api_list.extend(branded_call_to_actions)\n",
"\n",
" # Video API: Evaluate call_to_action_text_feature\n",
" if use_annotations:\n",
" if \"text_annotations\" in text_annotation_results:\n",
" # Video API: Evaluate call_to_action_text_feature\n",
" for text_annotation in text_annotation_results.get(\"text_annotations\"):\n",
" text = text_annotation.get(\"text\")\n",
" found_call_to_actions = [\n",
" cta\n",
" for cta in call_to_action_api_list\n",
" if cta.lower() in text.lower()\n",
" ]\n",
" if len(found_call_to_actions) > 0:\n",
" call_to_action_text = True\n",
" else:\n",
" print(\n",
" f\"No Text annotations found. Skipping {call_to_action_text_feature} evaluation with Video Intelligence API.\"\n",
" )\n",
"\n",
" # LLM: Evaluate call_to_action_text_feature\n",
" if use_llms:\n",
" llm_params = LLMParameters(\n",
" model_name=GEMINI_PRO,\n",
" location=llm_location,\n",
" generation_config=llm_generation_config,\n",
" )\n",
" # 1. Evaluate call_to_action_text_feature\n",
" prompt = (\n",
" \"\"\"Is any call to action detected in any text overlay at any time in the video?\n",
" Consider the following criteria for your answer: {criteria}\n",
" Some examples of call to actions are: {call_to_actions}\n",
" Look through each frame in the video carefully and answer the question.\n",
" Provide the exact timestamp when the call to action is detected in any text overlay in the video.\n",
" {context_and_examples}\n",
" \"\"\".replace(\n",
" \"{call_to_actions}\", \", \".join(call_to_action_api_list)\n",
" )\n",
" .replace(\"{feature}\", call_to_action_text_feature)\n",
" .replace(\"{criteria}\", call_to_action_text_criteria)\n",
" .replace(\"{context_and_examples}\", context_and_examples)\n",
" )\n",
" # Use full video for this feature\n",
" llm_params.set_modality({\"type\": \"video\", \"video_uri\": video_uri})\n",
" feature_detected, llm_explanation = detect_feature_with_llm(\n",
" call_to_action_text_feature, prompt, llm_params\n",
" )\n",
" if feature_detected:\n",
" call_to_action_text = True\n",
"\n",
" # Include llm details\n",
" call_to_action_text_eval_details[\"llm_details\"].append(\n",
" {\n",
" \"llm_params\": llm_params.__dict__,\n",
" \"prompt\": prompt,\n",
" \"llm_explanation\": llm_explanation,\n",
" }\n",
" )\n",
"\n",
" print(f\"{call_to_action_text_feature}: {call_to_action_text}\")\n",
" call_to_action_text_eval_details[\"feature_detected\"] = call_to_action_text\n",
"\n",
" return call_to_action_text_eval_details"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NAZX9CzAgP_3"
},
"source": [
"## <font color='#4285f4'>Execute ABCD Assessment</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "G-COzBpaxwjb"
},
"source": [
"### Define Assessment Functions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "OjS5QOGqgPj1"
},
"outputs": [],
"source": [
"def parse_abcd_assessment_results(abcd_assessment: dict) -> None:\n",
" \"\"\"Print ABCD Assessments\n",
" Args:\n",
" abcd_assessments: dict of video abcd assessments\n",
" \"\"\"\n",
"\n",
" result_array = []\n",
"\n",
" for video_assessment in abcd_assessment.get(\"video_assessments\"):\n",
" intermediate_result_text = \"\"\n",
"\n",
" video_url = f\"/content/{bucket_name}/{brand_name}/videos/{video_assessment.get('video_name')}\"\n",
" intermediate_result_text = f\"\\nAsset name: {video_assessment.get('video_name')}\\n\"\n",
" passed_features_count = video_assessment.get(\"passed_features_count\")\n",
" total_features = len(video_assessment.get(\"features\"))\n",
"\n",
" intermediate_result_text = intermediate_result_text + f\"Video score: {round(video_assessment.get('score'), 2)}%, adherence ({passed_features_count}/{total_features})\\n\"\n",
"\n",
" if video_assessment.get(\"score\") >= 80:\n",
" intermediate_result_text = intermediate_result_text + \"Asset result: ✅ Excellent \\n\"\n",
" elif video_assessment.get(\"score\") >= 65 and video_assessment.get(\"score\") < 80:\n",
" intermediate_result_text = intermediate_result_text + \"Asset result: ⚠ Might Improve \\n\"\n",
" else:\n",
" intermediate_result_text = intermediate_result_text + \"Asset result: ❌ Needs Review \\n\"\n",
"\n",
" intermediate_result_text = intermediate_result_text + \"Evaluated Features:\\n\"\n",
" for feature in video_assessment.get(\"features\"):\n",
" if feature.get(\"feature_detected\"):\n",
" intermediate_result_text = intermediate_result_text + f\" * ✅ {feature.get('feature')}\\n\"\n",
" else:\n",
" intermediate_result_text = intermediate_result_text + f\" * ❌ {feature.get('feature')}\\n\"\n",
"\n",
" result_array.append({\n",
" 'brand_name': brand_name,\n",
" 'video_name': video_assessment.get('video_name'),\n",
" 'video_url': video_url,\n",
" 'score': video_assessment.get('score'),\n",
" 'result_text': intermediate_result_text,\n",
" 'passed_features_count': passed_features_count,\n",
" 'total_features_count': total_features,\n",
" 'features_detail': video_assessment.get('features'),\n",
" })\n",
"\n",
" return result_array\n",
"\n",
"\n",
"def execute_abcd_assessment_for_videos():\n",
" \"\"\"Execute ABCD Assessment for all brand videos in GCS\"\"\"\n",
"\n",
" assessments = {\"brand_name\": brand_name, \"video_assessments\": []}\n",
"\n",
" # Get videos for ABCD Assessment\n",
" brand_videos_folder = f\"{brand_name}/videos\"\n",
" bucket = get_bucket()\n",
" blobs = bucket.list_blobs(prefix=brand_videos_folder)\n",
"\n",
" # Video processing\n",
" for video in blobs:\n",
" if video.name == f\"{brand_videos_folder}/\" or \"1st_5_secs\" in video.name:\n",
" # Skip parent folder\n",
" continue\n",
" video_name, video_name_with_format = get_file_name_from_gcs_url(video.name)\n",
" if not video_name or not video_name_with_format:\n",
" print(f\"Video name not resolved for {video.name}... Skipping execution\")\n",
" continue\n",
" # Check size of video to avoid processing videos > 7MB\n",
" video_metadata = bucket.get_blob(video.name)\n",
" size_mb = video_metadata.size / 1e6\n",
" if use_llms and size_mb > VIDEO_SIZE_LIMIT_MB:\n",
" print(\n",
" f\"The size of video {video.name} is greater than {VIDEO_SIZE_LIMIT_MB} MB. Skipping execution.\"\n",
" )\n",
" continue\n",
"\n",
" print(f\"\\n\\nProcessing ABCD Assessment for video {video.name}...\")\n",
"\n",
" label_annotation_results = {}\n",
" face_annotation_results = {}\n",
" people_annotation_results = {}\n",
" shot_annotation_results = {}\n",
" text_annotation_results = {}\n",
" logo_annotation_results = {}\n",
" speech_annotation_results = {}\n",
"\n",
" if use_annotations:\n",
" # 2) Download generated video annotations\n",
" (\n",
" label_annotation_results,\n",
" face_annotation_results,\n",
" people_annotation_results,\n",
" shot_annotation_results,\n",
" text_annotation_results,\n",
" logo_annotation_results,\n",
" speech_annotation_results,\n",
" ) = download_video_annotations(brand_name, video_name)\n",
"\n",
" # 3) Execute ABCD Assessment\n",
" video_uri = f\"gs://{bucket_name}/{video.name}\"\n",
" features = []\n",
"\n",
" # Quick pacing\n",
" quick_pacing, quick_pacing_1st_5_secs = detect_quick_pacing(\n",
" shot_annotation_results, video_uri\n",
" )\n",
" features.append(quick_pacing)\n",
" features.append(quick_pacing_1st_5_secs)\n",
"\n",
" # Dynamic Start\n",
" dynamic_start = detect_dynamic_start(shot_annotation_results, video_uri)\n",
" features.append(dynamic_start)\n",
"\n",
" # Supers and Supers with Audio\n",
" supers = detect_supers(text_annotation_results, video_uri)\n",
" supers_with_audio = detect_supers_with_audio(\n",
" text_annotation_results, speech_annotation_results, video_uri\n",
" )\n",
" features.append(supers)\n",
" features.append(supers_with_audio)\n",
"\n",
" # Brand Visuals & Brand Visuals (First 5 seconds)\n",
" (\n",
" brand_visuals,\n",
" brand_visuals_1st_5_secs,\n",
" brand_visuals_logo_big_1st_5_secs,\n",
" ) = detect_brand_visuals(\n",
" text_annotation_results,\n",
" logo_annotation_results,\n",
" video_uri,\n",
" brand_name,\n",
" brand_variations,\n",
" )\n",
" features.append(brand_visuals)\n",
" features.append(brand_visuals_1st_5_secs)\n",
"\n",
" # Brand Mention (Speech) & Brand Mention (Speech) (First 5 seconds)\n",
" (\n",
" brand_mention_speech,\n",
" brand_mention_speech_1st_5_secs,\n",
" ) = detect_brand_mention_speech(\n",
" speech_annotation_results, video_uri, brand_name, brand_variations\n",
" )\n",
" features.append(brand_mention_speech)\n",
" features.append(brand_mention_speech_1st_5_secs)\n",
"\n",
" # Product Visuals & Product Visuals (First 5 seconds)\n",
" product_visuals, product_visuals_1st_5_secs = detect_product_visuals(\n",
" label_annotation_results,\n",
" video_uri,\n",
" branded_products,\n",
" branded_products_categories,\n",
" )\n",
" features.append(product_visuals)\n",
" features.append(product_visuals_1st_5_secs)\n",
"\n",
" # Product Mention (Text) & Product Mention (Text) (First 5 seconds)\n",
" (\n",
" product_mention_text,\n",
" product_mention_text_1st_5_secs,\n",
" ) = detect_product_mention_text(\n",
" text_annotation_results,\n",
" video_uri,\n",
" branded_products,\n",
" branded_products_categories,\n",
" )\n",
" features.append(product_mention_text)\n",
" features.append(product_mention_text_1st_5_secs)\n",
"\n",
" # Product Mention (Speech) & Product Mention (Speech) (First 5 seconds)\n",
" (\n",
" product_mention_speech,\n",
" product_mention_speech_1st_5_secs,\n",
" ) = detect_product_mention_speech(\n",
" speech_annotation_results,\n",
" video_uri,\n",
" branded_products,\n",
" branded_products_categories,\n",
" )\n",
" features.append(product_mention_speech)\n",
" features.append(product_mention_speech_1st_5_secs)\n",
"\n",
" # Visible Face (First 5s) & Visible Face (Close Up)\n",
" visible_face_1st_5_secs, visible_face_close_up = detect_visible_face(\n",
" face_annotation_results, video_uri\n",
" )\n",
" features.append(visible_face_1st_5_secs)\n",
" features.append(visible_face_close_up)\n",
"\n",
" # Presence of People & Presence of People (First 5 seconds)\n",
" presence_of_people, presence_of_people_1st_5_secs = detect_presence_of_people(\n",
" people_annotation_results, video_uri\n",
" )\n",
" features.append(presence_of_people)\n",
" features.append(presence_of_people_1st_5_secs)\n",
"\n",
" # Audio Early (First 5 seconds)\n",
" audio_speech_early = detect_audio_speech_early(\n",
" speech_annotation_results, video_uri\n",
" )\n",
" features.append(audio_speech_early)\n",
"\n",
" # Overall Pacing\n",
" overall_pacing = detect_overall_pacing(shot_annotation_results, video_uri)\n",
" features.append(overall_pacing)\n",
"\n",
" # Call To Action (Speech)\n",
" call_to_action_speech = detect_call_to_action_speech(\n",
" speech_annotation_results, video_uri, branded_call_to_actions\n",
" )\n",
" features.append(call_to_action_speech)\n",
"\n",
" # Call To Action (Text)\n",
" call_to_action_text = detect_call_to_action_text(\n",
" text_annotation_results, video_uri, branded_call_to_actions\n",
" )\n",
" features.append(call_to_action_text)\n",
"\n",
" # Calculate ABCD final score\n",
" total_features = len(features)\n",
" passed_features_count = 0\n",
" for feature in features:\n",
" if feature.get(\"feature_detected\"):\n",
" passed_features_count += 1\n",
" # Get score\n",
" score = (passed_features_count * 100) / total_features\n",
" video_assessment = {\n",
" \"video_name\": video_name_with_format,\n",
" \"video_uri\": video_uri,\n",
" \"features\": features,\n",
" \"passed_features_count\": passed_features_count,\n",
" \"score\": score,\n",
" }\n",
" assessments.get(\"video_assessments\").append(video_assessment)\n",
"\n",
" if STORE_ASSESSMENT_RESULTS_LOCALLY:\n",
" # Store assessment results locally\n",
" store_assessment_results_locally(brand_name, video_assessment)\n",
"\n",
" return assessments\n",
"\n",
"\n",
"def execute_abcd_detector():\n",
" \"\"\"Main ABCD Assessment execution\"\"\"\n",
"\n",
" if use_annotations:\n",
" generate_video_annotations(brand_name)\n",
"\n",
" trim_videos(brand_name)\n",
"\n",
" abcd_assessments = execute_abcd_assessment_for_videos()\n",
" if len(abcd_assessments.get(\"video_assessments\")) == 0:\n",
" print(\"There are no videos to display.\")\n",
"\n",
" return abcd_assessments\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "gbiOYeq8x8KL"
},
"source": [
"### Run Assessment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "2bYnZ4kSWnMZ"
},
"outputs": [],
"source": [
"# Run the assessments\n",
"assessments = execute_abcd_detector()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "bTPtY4ROr0yz"
},
"source": [
"### Parse and Display Results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "EZgKVQQwxlOl"
},
"outputs": [],
"source": [
"# Parse ABCD Assessments Results\n",
"parsed_results = parse_abcd_assessment_results(assessments)\n",
"\n",
"# Display Assessment Results\n",
"result_html = \"\"\n",
"for result in parsed_results:\n",
" # Reformat URI\n",
" video_uri = result['video_url']\n",
" content_index = video_uri.index('/content/') + len('/content/')\n",
" video_uri = 'https://storage.cloud.google.com/' + video_uri[content_index:]\n",
"\n",
" #Build result HTML\n",
" result_html = result_html + f\"\"\"\n",
" <br><br><b>ABCDs Result For: {result['video_name']}</b><br>\n",
"\n",
" <video width=600 height=337 controls>\n",
" <source src=\"{video_uri}\" type=\"video/mp4\">\n",
" </video>\n",
" \"\"\"\n",
"\n",
" split_text_result = result['result_text'].split('\\n')\n",
" for line in split_text_result:\n",
" result_html = result_html + f\"\"\"\n",
" <p>{line}</p>\n",
" \"\"\"\n",
"\n",
"HTML(result_html)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "riZ8DVYlyAPp"
},
"source": [
"### Save Results to BigQuery"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "Vb2SaJJRL3op"
},
"outputs": [],
"source": [
"for res in parsed_results:\n",
"\n",
" # Extract timestamps for features\n",
" timestamps = ExtractTimestampsFromText(res)\n",
"\n",
" # Save profile to database\n",
" job_config = bigquery.QueryJobConfig(\n",
" query_parameters=[\n",
" bigquery.ScalarQueryParameter(\"features_detail\", \"JSON\", res['features_detail']),\n",
" bigquery.ScalarQueryParameter(\"result_text\", \"STRING\", res['result_text']),\n",
" bigquery.ScalarQueryParameter(\"feature_timestamps\", \"JSON\", timestamps)\n",
" ],\n",
" priority=bigquery.QueryPriority.INTERACTIVE\n",
" )\n",
"\n",
" sql=f\"\"\"INSERT INTO `chocolate-ai-demo-b2kvrbnkb3.chocolate_ai.campaign_abcd_results`\n",
" (assessment_id, assessment_date, brand_name, video_name, video_url, score, result_text, passed_features_count, total_features_count, features_detail, feature_timestamps)\n",
" VALUES(GENERATE_UUID(), CURRENT_TIMESTAMP(),'{res['brand_name']}','{res['video_name']}','{res['video_url']}',{res['score']},@result_text,{res['passed_features_count']},{res['total_features_count']},@features_detail,@feature_timestamps);\"\"\"\n",
"\n",
" RunQuery(sql, job_config)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "42IxhtRRrvR-"
},
"source": [
"## <font color='#4285f4'>Clean Up</font>\n",
"\n",
"Uncomment the lines below to cleanup resources created by this notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "6lF2Z7skFbvf"
},
"outputs": [],
"source": [
"# Unmount bucket\n",
"#!fusermount -u /content/$bucket_name\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ASQ2BPisXDA0"
},
"source": [
"## <font color='#4285f4'>Reference Links</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "BrfYANXA9h83"
},
"source": [
"- [ABCDs Detector Solution on GitHub](https://github.com/google-marketing-solutions/abcds-detector/blob/main/%5BGitHub%5D_ABCDs_Detector.ipynb)\n",
"- [YouTube ABCDs: Best practices for effective creative on YouTube](https://www.thinkwithgoogle.com/future-of-marketing/creativity/youtube-video-ad-creative/)"
]
}
],
"metadata": {
"colab": {
"name": "Create-Campaign-Quality-Control-ABCD-1.ipynb",
"private_outputs": true,
"provenance": [],
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}