movie_search_metadata/demo_app/backend/utils.py (27 lines of code) (raw):
import datetime
import os
import re
from google.cloud import storage
from google import auth
credentials, project_id = auth.default()
PROJECT_ID = os.getenv('PROJECT_ID', project_id)
DATASTORE_ID = os.getenv('DATASTORE_ID')
LOCATION = os.getenv('LOCATION', 'global')
def get_bucket_and_blobnames(metadata_uri: str) -> tuple[str, str, str]:
bucket, blob_metadata = re.findall(r'gs://([^/]+)/(.+)', metadata_uri)[0]
blob_mp4 = 'mp4/s_' + blob_metadata.lstrip('metadata/').rstrip('.txt') + '.mp4'
return bucket, blob_metadata, blob_mp4
def generate_download_signed_url_v4(bucket_name: str, blob_name: str) -> str:
if not credentials.valid:
credentials.refresh(auth.transport.requests.Request())
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
url = blob.generate_signed_url(
version='v4',
expiration=datetime.timedelta(minutes=15),
method='GET',
access_token=credentials.token,
service_account_email=credentials.service_account_email
)
return url