experiments/babel/app/pages/explore.py (301 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Explore Mesop Page""" from dataclasses import field import logging import json import random # from typing import List, TypedDict, Any, cast import urllib import google.auth import google.auth.transport.requests as googlerequests import google.oauth2.id_token import mesop as me from state.state import AppState from config.default import Default, BabelMetadata # from set_up.set_up import VoicesSetup # from components.page_scaffold import page_scaffold, page_frame from components.styles import CONTENT_STYLE, BACKGROUND_COLOR from set_up.set_up import Voice logging.basicConfig(level=logging.DEBUG) config = Default() BUCKET_PATH = "https://storage.mtls.cloud.google.com/" + config.STATIC_PUBLIC_BUCKET @me.stateclass class PageState: """Local Page State""" # pylint: disable=invalid-field-call location: int = 1 voices: list[Voice] = field(default_factory=lambda: []) is_loading: bool = False statement: str = "" audio_output_uri: str = "" audio_output_infos: list[str] = field(default_factory=lambda: []) audio_output_metadata: list[BabelMetadata] = field(default_factory=lambda: []) audio_status: str = "" loaded: bool = False # pylint: disable=invalid-field-call def get_chosen_voices(): """ Filters a list of Voice dictionaries, keeping only those whose name contains "Puck" or "Leda". Args: voices: A list of Voice dictionaries. Returns: A new list of Voice dictionaries, filtered based on the name. """ app_state = me.state(AppState) print(f"there are {len(app_state.voices)} total voices") voices = app_state.voices filtered_voices = [ voice for voice in voices if "Puck" in voice["name"] or "Leda" in voice["name"] ] return filtered_voices def filter_babel_metadata(filepath: str) -> list[BabelMetadata]: """ Reads a JSON file, filters the 'audio_metadata' list to keep only entries with voice_name containing "Puck" or "Leda", and returns the filtered data as a List[BabelMetadata]. Args: filepath: The path to the JSON file. Returns: A List[BabelMetadata] containing the filtered data. """ try: with open(filepath, "r") as f: data = json.load(f) except FileNotFoundError: print(f"Error: File '{filepath}' not found.") return [] except json.JSONDecodeError: print(f"Error: Invalid JSON format in '{filepath}'.") return [] if "audio_metadata" not in data or not isinstance(data["audio_metadata"], list): print(f"Warning: 'audio_metadata' key not found or not a list in '{filepath}'.") return [] filtered_metadata: list[BabelMetadata] = [ { "voice_name": item["voice_name"], "language_code": item["language_code"], "gender": item["gender"], "text": item["text"], "audio_path": item["audio_path"], } for item in data["audio_metadata"] if "voice_name" in item and ("Puck" in item["voice_name"] or "Leda" in item["voice_name"]) ] return filtered_metadata photos = [ { "photo": "local_assets/free-photo-of-iconic-big-ben-and-red-buses-in-london.jpeg", "audio": "pages/explore_london.json", "credit": "Laura Meinhardt, Pexels" }, { "photo": "local_assets/pexels-thorsten-technoman-109353-338515.jpg", "audio": "pages/explore_paris.json", "credit": "Thorsten technoman, Pexels" }, ] def change_location(e: me.ClickEvent): """Change location""" state = me.state(PageState) print(f"changing {e.key}") if e.key == "back": state.location -= 1 if state.location < 0: state.location = 0 else: if state.location + 1 >= len(photos): state.location = len(photos) - 1 else: state.location += 1 print(f"index: {state.location}") def explore_page(app_state: me.state): """Describe an image page""" state = me.state(PageState) state.voices = get_chosen_voices() if not state.loaded: print("There're no voices to display") state.audio_output_metadata = filter_babel_metadata("pages/explore_paris.json") state.loaded = True print(f"loaded {len(state.audio_output_metadata)} voices") with me.box(style=CONTENT_STYLE): with me.box( # on_click=regenerate_welcome, ): me.text( # state.welcome_statement, "Explore", type="headline-4", style=me.Style( # text_align="center", # color="transparent", # background=( # "linear-gradient(74deg,#4285f4 0%,#9b72cb 9%,#d96570 20%,#d96570 24%,#9b72cb 35%,#4285f4 44%,#9b72cb 50%,#d96570 56%, #fff 75%, #fff 100%)" # " text" # ), ), ) with me.box( style=me.Style(text_align="center", flex_direction="row", display="flex", justify_content="center", align_items="center") ): with me.content_button( on_click=change_location, key="back", ): me.icon("navigate_before") location_image = photos[state.location]["photo"].replace( "local_assets/", "static/" ) location_credit = photos[state.location]["credit"] location_audio = photos[state.location]["audio"] print(f"{location_audio} & {location_image}") state.audio_output_metadata = filter_babel_metadata(location_audio) with me.box(style=me.Style(display="flex", flex_direction="column", gap=5 )): me.image( src=location_image, style=me.Style( width="600px", #height="400px", border_radius="16px", ), ) me.text(location_credit, style=me.Style(font_style="italic", font_size="10pt")) with me.content_button( on_click=change_location, key="forward", ): me.icon("navigate_next") # me.text("Enter text to voice", type="headline-6") # me.text( # f"Using {len(state.voices)} Chirp 3: HD voices", # style=me.Style(font_style="italic"), # ) # subtle_chat_input_journey() if state.is_loading: with me.box(style=me.Style(text_align="center")): me.progress_spinner() elif state.audio_output_metadata: with me.box( style=me.Style( display="grid", grid_template_columns="1fr 1fr", text_align="center" ) ): # for uri in state.audio_output_infos: # me.audio(src=uri) sorted_metadata = sorted( state.audio_output_metadata, key=lambda voice: voice["language_code"], ) for item in sorted_metadata: # print(item) audio_url = f"{BUCKET_PATH}/{item['audio_path']}" # print(audio_url) with me.box( style=me.Style( display="flex", flex_direction="column", gap=5, padding=me.Padding(top=10, left=10, right=10, bottom=12), ) ): me.text( f"{item["language_code"]} ({item["gender"].lower()}, {item["voice_name"]})", style=me.Style(font_weight="bold"), ) me.audio(src=audio_url) me.text(item["text"]) @me.component def subtle_chat_input_journey(): """input component""" with me.box( style=me.Style( border_radius=16, padding=me.Padding.all(8), background=BACKGROUND_COLOR, display="flex", width="100%", ) ): with me.box( style=me.Style( flex_grow=1, ) ): me.native_textarea( autosize=True, min_rows=4, placeholder="Statement to voice with Chirp 3: HD voices", style=me.Style( padding=me.Padding(top=16, left=16), background=BACKGROUND_COLOR, outline="none", width="100%", overflow_y="auto", border=me.Border.all( me.BorderSide(style="none"), ), color=me.theme_var("on-surface"), ), on_blur=on_blur_statement, ) # with me.content_button(type="icon"): # me.icon("upload") # with me.content_button(type="icon"): # me.icon("photo") with me.box(style=me.Style(display="flex", gap=5, flex_direction="column")): with me.content_button(type="icon", on_click=on_click_babel): me.icon("send") with me.content_button(type="icon", on_click=on_click_clear_babel): me.icon("clear") def on_blur_statement(e: me.InputBlurEvent): """updates the statement to synthesize""" state = me.state(PageState) state.statement = e.value def on_click_clear_babel(e: me.ClickEvent): # pylint: disable=unused-argument """clear babel input event""" state = me.state(PageState) state.is_loading = False state.audio_output_infos.clear() state.audio_output_metadata.clear() def regenerate_welcome(e: me.ClickEvent): # pylint: disable=unused-argument """regenerate welcome statement""" state = me.state(PageState) state.is_loading = True state.audio_output_infos.clear() yield greetings = [ "Welcome!", "Welcome to Chirp 3 H D!", "Welcome, great to see you!", "Welcome to Chirp 3: HD!", "Welcome!", "Greetings!", ] random_greeting = random.choice(greetings) state.welcome_statement = random_greeting yield data = generate_audio(random_greeting) filtered_metadata: list[BabelMetadata] = [ { "voice_name": item["voice_name"], "language_code": item["language_code"], "gender": item["gender"], "text": item["text"], "audio_path": item["audio_path"], } for item in data["audio_metadata"] if "voice_name" in item and ("Puck" in item["voice_name"] or "Leda" in item["voice_name"]) ] state.audio_output_metadata = filtered_metadata state.is_loading = False print(f"Received {len(state.audio_output_metadata)} voices") yield def generate_audio(statement: str): """Generates audio given a statement""" post_object = {"statement": statement} print(post_object) endpoint = f"{config.BABEL_ENDPOINT}/babel" print(f"endpoint: {endpoint}") req = urllib.request.Request(endpoint) if "localhost" not in endpoint: credentials, project_id = google.auth.default() print(f"project id: {project_id}") credentials.refresh(googlerequests.Request()) print(f"credentials.token {credentials.token}") urlinfo = urllib.parse.urlparse(endpoint) audience = f"{urlinfo.scheme}://{urlinfo.netloc}/" print(f"audience: {audience}") auth_req = google.auth.transport.requests.Request() id_token = google.oauth2.id_token.fetch_id_token(auth_req, audience) print(f"id token {id_token}") req.add_header("Authorization", f"Bearer {id_token}") req.add_header("Content-Type", "application/json; charset=utf-8") bindata = str(json.dumps(post_object)).encode("utf-8") response = urllib.request.urlopen(req, bindata) response_as_string = response.read().decode("utf-8") # print(response_as_string) data = json.loads(response_as_string) return data def on_click_babel(e: me.ClickEvent): # pylint: disable=unused-argument """invokes the babel endpoint Args: e (me.ClickEvent): event click """ state = me.state(PageState) state.is_loading = True if not state.statement: print("no statement provided. not synthesizing.") return state.audio_output_infos.clear() yield post_object = {"statement": state.statement} print(post_object) endpoint = f"{config.BABEL_ENDPOINT}/babel" print(f"endpoint: {endpoint}") req = urllib.request.Request(endpoint) if "localhost" not in endpoint: credentials, project_id = google.auth.default() print(f"project id: {project_id}") credentials.refresh(googlerequests.Request()) print(f"credentials.token {credentials.token}") urlinfo = urllib.parse.urlparse(endpoint) audience = f"{urlinfo.scheme}://{urlinfo.netloc}/" print(f"audience: {audience}") auth_req = google.auth.transport.requests.Request() id_token = google.oauth2.id_token.fetch_id_token(auth_req, audience) print(f"id token {id_token}") req.add_header("Authorization", f"Bearer {id_token}") req.add_header("Content-Type", "application/json; charset=utf-8") bindata = str(json.dumps(post_object)).encode("utf-8") response = urllib.request.urlopen(req, bindata) response_as_string = response.read().decode("utf-8") print(response_as_string) data = json.loads(response_as_string) # state.audio_output_uri = f"{BUCKET_PATH}{data.get("outputfiles")[0]}" # state.audio_output_infos.clear() # for f in data.get("audio_metadata"): # state.audio_output_infos.append(f"{BUCKET_PATH}{f}") print(data.get("audio_metadata")) state.audio_output_metadata.clear() state.audio_output_metadata = [ BabelMetadata(item) for item in data.get("audio_metadata") ] state.is_loading = False yield