in source/ingestion/main.py [0:0]
def vod_ingestion(cloud_event):
"""Triggered by a change to a Cloud Storage bucket.
Args:
cloud_event: Eventarc cloud event
"""
project_id = getenv('TRANSCODE_PROJECT_ID')
location = getenv('TRANSCODE_REGION')
output_bucket = getenv('TRANSCODE_SERVING_BUCKET')
parent = f"projects/{project_id}/locations/{location}"
bucket = cloud_event.data['bucket']
path = cloud_event.data['name']
mime_type = cloud_event.data['contentType']
# Only process video files
if not mime_type.startswith('video/'):
guessed_mime_type = mimetypes.guess_type(path)[0]
if guessed_mime_type.startswith('video/'):
mime_type = guessed_mime_type
else:
print(f"Ignoring gs://{bucket}/{path} non video "
f"mime_type: {mime_type}; guessed {guessed_mime_type}")
return
# Remove mime type video suffix to form output directory
# e.g. gs://vod_upload/foo/bar/baz/example.mp4 (video/mp4) ->
# gs://vod_serving/foo/bar/baz/example/
output_path = path
for suffix in mimetypes.guess_all_extensions(mime_type):
if output_path.endswith(suffix):
output_path = output_path.removesuffix(suffix)
break
client = TranscoderServiceClient()
job = transcoder_v1.types.Job()
job.input_uri = f"gs://{bucket}/{path}"
job.output_uri = f"gs://{output_bucket}/{output_path}/"
# Transcode job preset
job.template_id = "preset/web-hd"
try:
client.create_job(parent=parent, job=job)
print(f"Transcoding {job.input_uri} -> {job.output_uri}")
except GoogleAPIError as e:
# Capturing the exception - prevents Eventarc retrying
print("ERROR: Transcode job creation failed.", e)
return