movie_search_metadata/demo_app/frontend/main.py (85 lines of code) (raw):

import os import requests import streamlit as st from google.auth.transport.requests import Request from google.oauth2.id_token import fetch_id_token BACKEND_URL = os.getenv('BACKEND_URL', 'http://localhost:8080') def get_start_end_seconds(timestamp): """ 04:57-05:58 のような形式のタイムスタンプを受け取り、 開始秒と終了秒をタプルで返す """ start, end = timestamp.split('-') start_mm, start_ss = start.split(':') end_mm, end_ss = end.split(':') start_seconds = int(start_mm) * 60 + int(start_ss) end_seconds = int(end_mm) * 60 + int(end_ss) return start_seconds, end_seconds def make_request(endpoint: str, params: dict = None): """Backend API に認証情報を付与してリクエストを送信する""" auth_req = Request() id_token = fetch_id_token(auth_req, BACKEND_URL) headers = {'Authorization': f'Bearer {id_token}'} response = requests.get( f'{BACKEND_URL}/{endpoint}', params=params, headers=headers) response.raise_for_status() # Raise error on error status code return response.json() def file_search(): st.write('# File Search') file_search_query = st.text_input('Search Query', key='file_search') if st.button('Search', key='file_search_button'): st.write(f'Searching for: {file_search_query}') if file_search_query: try: results = make_request( 'file_search', params={'query': file_search_query})['results'] if results: st.write('## Result') for c, result in enumerate(results): if c == 0: st.write(result['summary']) continue st.write(f"Video ID: {result['id']}") st.write(f"Title: {result['title']}") st.video(result['signed_url']) st.divider() else: st.write('No results found.') except requests.exceptions.RequestException as e: st.error(f'An error occurred while searching: {e}') def scene_search(): st.write('# Scene Search') # Show "Search Query" and "Top N" in a single row col1, col2 = st.columns((7, 1)) with col1: scene_search_query = st.text_input('Search Query', key='scene_search') with col2: top_n = st.number_input( 'Top N', min_value=1, max_value=3, value=1, help='Top N とは ... シーンの検索対象最大動画数' '\n\n' 'まず検索文の回答に適した動画が全動画を対象に検索され\n' 'その後、上位 N 個の動画からシーンが検索されます。' ) if st.button('Search', key='scene_search_button'): st.write(f'Searching for: {scene_search_query}') if scene_search_query: try: results = make_request( 'scene_search', params={'query': scene_search_query, 'top_n': top_n})['results'] if results: st.write('## Result') for c, result in enumerate(results): video_id = c + 1 st.write(f'Video ID: {video_id}') st.write(f"Title: {result['title']}") st.write(f"Description: {result['Description']}") st.write(f"Timestamp: {result['Timestamp']}") start_time, end_time = get_start_end_seconds(result['Timestamp']) signed_url = result['signed_url'] st.video(signed_url, start_time=start_time, end_time=end_time) st.divider() else: st.write('No results found.') except requests.exceptions.RequestException as e: st.error(f'An error occurred while searching: {e}') # __main__ # Search options search_option = st.selectbox('Search Option', ('File Search', 'Scene Search')) if search_option == 'File Search': file_search() elif search_option == 'Scene Search': scene_search()