experiments/veo-app/pages/veo.py (394 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Veo 2 mesop ui page""" import time import mesop as me import requests from common.metadata import add_video_metadata from common.storage import store_to_gcs from common.utils import print_keys from components.dialog import dialog, dialog_actions from components.header import header from components.page_scaffold import ( page_frame, page_scaffold, ) from config.default import Default from models.model_setup import VeoModelSetup from models.veo import image_to_video, text_to_video from pages.styles import _BOX_STYLE_CENTER_DISTRIBUTED config = Default() veo_model = VeoModelSetup.init() @me.stateclass class PageState: """Mesop Page State""" veo_prompt_input: str = "" veo_prompt_placeholder: str = "" veo_prompt_textarea_key: int = 0 prompt: str original_prompt: str aspect_ratio: str = "16:9" video_length: int = 5 # I2V reference Image reference_image_file: me.UploadedFile = None reference_image_file_key: int = 0 reference_image_gcs: str reference_image_uri: str # Rewriter auto_enhance_prompt: bool = False rewriter_name: str is_loading: bool = False show_error_dialog: bool = False error_message: str = "" result_video: str timing: str def veo_content(app_state: me.state): """Veo 2 Mesop Page""" state = me.state(PageState) with page_scaffold(): # pylint: disable=not-context-manager with page_frame(): # pylint: disable=not-context-manager header("Veo 2", "movie") # tricolumn with me.box( style=me.Style( display="flex", flex_direction="row", gap=10, height=250, ) ): # Controls with me.box( style=me.Style( # flex_basis="450px", flex_basis="max(480px, calc(60% - 48px))", display="flex", flex_direction="column", align_items="stretch", justify_content="space-between", gap=10, ) ): subtle_veo_input() # me.box(style=me.Style(height=12)) # me.text("no video generated") with me.box( style=me.Style(display="flex", flex_basis="row", gap=5) ): me.select( label="aspect", appearance="outline", options=[ me.SelectOption(label="16:9 widescreen", value="16:9"), me.SelectOption(label="9:16 portrait", value="9:16"), ], value=state.aspect_ratio, on_selection_change=on_selection_change_aspect, ) me.select( label="length", options=[ me.SelectOption(label="5 seconds", value="5"), me.SelectOption(label="6 seconds", value="6"), me.SelectOption(label="7 seconds", value="7"), me.SelectOption(label="8 seconds", value="8"), ], appearance="outline", style=me.Style(), value=f"{state.video_length}", on_selection_change=on_selection_change_length, ) me.checkbox( label="auto-enhance prompt", checked=state.auto_enhance_prompt, on_change=on_change_auto_enhance_prompt, ) # Uploaded image with me.box(style=_BOX_STYLE_CENTER_DISTRIBUTED): me.text("Reference Image (optional)") if state.reference_image_uri: output_url = state.reference_image_uri # output_url = f"https://storage.mtls.cloud.google.com/{state.reference_image_uri}" # output_url = "https://storage.mtls.cloud.google.com/ghchinoy-genai-sa-assets-flat/edits/image (30).png" print(f"displaying {output_url}") me.image( src=output_url, style=me.Style( height=150, border_radius=12, ), key=str(state.reference_image_file_key), ) else: me.image(src=None, style=me.Style(height=200)) with me.box( style=me.Style(display="flex", flex_direction="row", gap=5) ): # me.button(label="Upload", type="flat", disabled=True) me.uploader( label="Upload", accepted_file_types=["image/jpeg", "image/png"], on_upload=on_click_upload, type="flat", color="primary", style=me.Style(font_weight="bold"), ) me.button( label="Clear", on_click=on_click_clear_reference_image ) me.box(style=me.Style(height=30)) # Generated video with me.box(style=_BOX_STYLE_CENTER_DISTRIBUTED): me.text("Generated Video") me.box(style=me.Style(height=8)) with me.box(style=me.Style(height="100%")): if state.is_loading: me.progress_spinner() elif state.result_video: video_url = state.result_video.replace( "gs://", "https://storage.mtls.cloud.google.com/", ) print(f"video_url: {video_url}") me.video(src=video_url, style=me.Style(border_radius=6)) me.text(state.timing) with dialog(is_open=state.show_error_dialog): # pylint: disable=not-context-manager # Content within the dialog box me.text( "Generation Error", type="headline-6", style=me.Style(color=me.theme_var("error")), ) me.text(state.error_message, style=me.Style(margin=me.Margin(top=16))) # Use the dialog_actions component for the button with dialog_actions(): # pylint: disable=not-context-manager me.button("Close", on_click=on_close_error_dialog, type="flat") def on_change_auto_enhance_prompt(e: me.CheckboxChangeEvent): """Toggle auto-enhance prompt""" state = me.state(PageState) state.auto_enhance_prompt = e.checked def on_click_upload(e: me.UploadEvent): """Upload image to GCS""" state = me.state(PageState) state.reference_image_file = e.file contents = e.file.getvalue() destination_blob_name = store_to_gcs( "uploads", e.file.name, e.file.mime_type, contents ) # gcs state.reference_image_gcs = f"gs://{destination_blob_name}" # url state.reference_image_uri = ( f"https://storage.mtls.cloud.google.com/{destination_blob_name}" ) # log print( f"{destination_blob_name} with contents len {len(contents)} of type {e.file.mime_type} uploaded to {config.GENMEDIA_BUCKET}." ) def on_click_clear_reference_image(e: me.ClickEvent): # pylint: disable=unused-argument """Clear reference image""" state = me.state(PageState) state.reference_image_file = None state.reference_image_file_key += 1 state.reference_image_uri = None state.reference_image_gcs = None state.is_loading = False def on_selection_change_length(e: me.SelectSelectionChangeEvent): """Adjust the video duration length in seconds based on user event""" state = me.state(PageState) state.video_length = int(e.value) def on_selection_change_aspect(e: me.SelectSelectionChangeEvent): """Adjust aspect ratio based on user event.""" state = me.state(PageState) state.aspect_ratio = e.value def on_click_clear(e: me.ClickEvent): # pylint: disable=unused-argument """Clear prompt and video""" state = me.state(PageState) state.result_video = None state.prompt = None state.veo_prompt_input = None state.original_prompt = None state.veo_prompt_textarea_key += 1 state.video_length = 5 state.aspect_ratio = "16:9" state.is_loading = False state.auto_enhance_prompt = False yield def on_click_veo(e: me.ClickEvent): # pylint: disable=unused-argument """Veo generate request handler""" state = me.state(PageState) state.is_loading = True state.show_error_dialog = False # Reset error state before starting state.error_message = "" state.result_video = "" # Clear previous result state.timing = "" # Clear previous timing yield print(f"Lights, camera, action!:\n{state.veo_prompt_input}") aspect_ratio = state.aspect_ratio # @param ["16:9", "9:16"] seed = 120 sample_count = 1 rewrite_prompt = state.auto_enhance_prompt if rewrite_prompt: print("Default auto-enhance prompt is ON") duration_seconds = state.video_length start_time = time.time() # Record the starting time gcs_uri = "" current_error_message = "" try: if state.reference_image_gcs: print(f"I2V invoked. I see you have an image! {state.reference_image_gcs} ") op = image_to_video( state.veo_prompt_input, state.reference_image_gcs, seed, aspect_ratio, sample_count, f"gs://{config.VIDEO_BUCKET}", rewrite_prompt, duration_seconds, ) else: print("T2V invoked.") op = text_to_video( state.veo_prompt_input, seed, aspect_ratio, sample_count, f"gs://{config.VIDEO_BUCKET}", rewrite_prompt, duration_seconds, ) print(f"Operation result: {op}") # Check for explicit errors in response if op.get("done") and op.get("error"): current_error_message = op["error"].get("message", "Unknown API error") print(f"API Error Detected: {current_error_message}") # No GCS URI in this case gcs_uri = "" elif op.get("done") and op.get("response"): response_data = op["response"] print(f"Response: {response_data}") print_keys(op["response"]) if response_data.get("raiMediaFilteredCount", 0) > 0 and response_data.get( "raiMediaFilteredReasons" ): # Extract the first reason provided filter_reason = response_data["raiMediaFilteredReasons"][0] current_error_message = f"Content Filtered: {filter_reason}" print(f"Filtering Detected: {current_error_message}") gcs_uri = "" # No GCS URI if content was filtered else: # Extract GCS URI from different possible locations if ( "generatedSamples" in response_data and response_data["generatedSamples"] ): print(f"Generated Samples: {response_data["generatedSamples"]}") gcs_uri = ( response_data["generatedSamples"][0] .get("video", {}) .get("uri", "") ) elif "videos" in response_data and response_data["videos"]: print(f"Videos: {response_data["videos"]}") gcs_uri = response_data["videos"][0].get("gcsUri", "") if gcs_uri: file_name = gcs_uri.split("/")[-1] print("Video generated - use the following to copy locally") print(f"gsutil cp {gcs_uri} {file_name}") state.result_video = gcs_uri else: # Success reported, but no video URI found - treat as an error/unexpected state current_error_message = "API reported success but no video URI was found in the response." print(f"Error: {current_error_message}") state.result_video = "" # Ensure no video is shown else: # Handle cases where 'done' is false or response structure is unexpected current_error_message = ( "Unexpected API response structure or operation not done." ) print(f"Error: {current_error_message}") state.result_video = "" # Catch specific exceptions you anticipate except ValueError as err: print(f"ValueError caught: {err}") current_error_message = f"Input Error: {err}" except requests.exceptions.HTTPError as err: print(f"HTTPError caught: {err}") current_error_message = f"Network/API Error: {err}" # Catch any other unexpected exceptions except Exception as err: print(f"Generic Exception caught: {type(err).__name__}: {err}") current_error_message = f"An unexpected error occurred: {err}" finally: end_time = time.time() # Record the ending time execution_time = end_time - start_time # Calculate the elapsed time print(f"Execution time: {execution_time} seconds") # Print the execution time state.timing = f"Generation time: {round(execution_time)} seconds" # If an error occurred, update the state to show the dialog if current_error_message: state.error_message = current_error_message state.show_error_dialog = True # Ensure no result video is displayed on error state.result_video = "" try: add_video_metadata( gcs_uri, state.veo_prompt_input, aspect_ratio, veo_model, execution_time, state.video_length, state.reference_image_gcs, rewrite_prompt, error_message=current_error_message, comment="veo2 default generation", ) except Exception as meta_err: # Handle potential errors during metadata storage itself print(f"CRITICAL: Failed to store metadata: {meta_err}") # Optionally, display another error or log this critical failure if not state.show_error_dialog: # Avoid overwriting primary error state.error_message = f"Failed to store video metadata: {meta_err}" state.show_error_dialog = True state.is_loading = False yield print("Cut! That's a wrap!") def on_blur_veo_prompt(e: me.InputBlurEvent): """Veo prompt blur event""" me.state(PageState).veo_prompt_input = e.value @me.component def subtle_veo_input(): """veo input""" pagestate = me.state(PageState) icon_style = me.Style( display="flex", flex_direction="column", gap=3, font_size=10, align_items="center", ) with me.box( style=me.Style( border_radius=16, padding=me.Padding.all(8), background=me.theme_var("secondary-container"), display="flex", width="100%", ) ): with me.box( style=me.Style( flex_grow=1, ) ): me.native_textarea( autosize=True, min_rows=10, max_rows=13, placeholder="video creation instructions", style=me.Style( padding=me.Padding(top=16, left=16), background=me.theme_var("secondary-container"), outline="none", width="100%", overflow_y="auto", border=me.Border.all( me.BorderSide(style="none"), ), color=me.theme_var("foreground"), flex_grow=1, ), on_blur=on_blur_veo_prompt, key=str(pagestate.veo_prompt_textarea_key), value=pagestate.veo_prompt_placeholder, ) with me.box( style=me.Style( display="flex", flex_direction="column", gap=15, ) ): # do the veo with me.content_button( type="icon", on_click=on_click_veo, ): with me.box(style=icon_style): me.icon("play_arrow") me.text("Create") # invoke gemini with me.content_button( type="icon", disabled=True, ): with me.box(style=icon_style): me.icon("auto_awesome") me.text("Rewriter") # clear all of this with me.content_button( type="icon", on_click=on_click_clear, ): with me.box(style=icon_style): me.icon("clear") me.text("Clear") def on_close_error_dialog(e: me.ClickEvent): """Handler to close the error dialog.""" state = me.state(PageState) state.show_error_dialog = False yield # Update UI to hide dialog