in source/operators/rekognition/generic_data_lookup.py [0:0]
def lambda_handler(event, context):
print("We got the following event:\n", event)
operator_object = MediaInsightsOperationHelper(event)
# Get operator parameters
try:
workflow_id = str(event["WorkflowExecutionId"])
asset_id = event['AssetId']
if "Video" in operator_object.input["Media"]:
bucket = operator_object.input["Media"]["Video"]["S3Bucket"]
key = operator_object.input["Media"]["Video"]["S3Key"]
file_type = key.split('.')[-1]
elif "Audio" in operator_object.input["Media"]:
bucket = operator_object.input["Media"]["Audio"]["S3Bucket"]
key = operator_object.input["Media"]["Audio"]["S3Key"]
file_type = key.split('.')[-1]
elif "Image" in operator_object.input["Media"]:
bucket = operator_object.input["Media"]["Image"]["S3Bucket"]
key = operator_object.input["Media"]["Image"]["S3Key"]
file_type = key.split('.')[-1]
elif "Text" in operator_object.input["Media"]:
bucket = operator_object.input["Media"]["Text"]["S3Bucket"]
key = operator_object.input["Media"]["Text"]["S3Key"]
file_type = key.split('.')[-1]
except Exception:
operator_object.update_workflow_status("Error")
operator_object.add_workflow_metadata(GenericDataLookupError="No valid inputs")
raise MasExecutionError(operator_object.return_output_object())
# Get the metadata filename
print("Looking up metadata for s3://"+bucket+"/"+key)
# Get user-defined location for generic data file
if "Key" in operator_object.configuration:
metadata_filename = operator_object.configuration["Key"]
else:
operator_object.add_workflow_metadata(
GenericDataLookupError="Missing S3 key for data file.")
operator_object.update_workflow_status("Error")
raise MasExecutionError(operator_object.return_output_object())
if "Bucket" in operator_object.configuration:
metadata_bucket = operator_object.configuration["Bucket"]
else:
operator_object.add_workflow_metadata(
GenericDataLookupError="Missing S3 bucket for data file.")
operator_object.update_workflow_status("Error")
raise MasExecutionError(operator_object.return_output_object())
# Get metadata
s3 = boto3.client('s3')
try:
print("Getting data from s3://"+metadata_bucket+"/"+metadata_filename)
data = s3.get_object(Bucket=metadata_bucket, Key=metadata_filename)
metadata_json = json.loads(data['Body'].read().decode('utf-8'))
except Exception as e:
operator_object.update_workflow_status("Error")
operator_object.add_workflow_metadata(GenericDataLookupError="Unable read datafile. " + str(e))
raise MasExecutionError(operator_object.return_output_object())
# Verify that the metadata is a dict, as required by the dataplane
if (type(metadata_json) != dict):
operator_object.update_workflow_status("Error")
operator_object.add_workflow_metadata(
GenericDataLookupError="Metadata must be of type dict. Found " + str(type(metadata_json)) + " instead.")
raise MasExecutionError(operator_object.return_output_object())
# Save metadata to dataplane
operator_object.add_workflow_metadata(AssetId=asset_id,WorkflowExecutionId=workflow_id)
dataplane = DataPlane()
metadata_upload = dataplane.store_asset_metadata(asset_id, operator_object.name, workflow_id, metadata_json)
# Validate that the metadata was saved to the dataplane
if "Status" not in metadata_upload:
operator_object.add_workflow_metadata(
GenericDataLookupError="Unable to upload metadata for asset: {asset}".format(asset=asset_id))
operator_object.update_workflow_status("Error")
raise MasExecutionError(operator_object.return_output_object())
else:
# Update the workflow status
if metadata_upload["Status"] == "Success":
print("Uploaded metadata for asset: {asset}".format(asset=asset_id))
operator_object.update_workflow_status("Complete")
return operator_object.return_output_object()
else:
operator_object.add_workflow_metadata(
GenericDataLookupError="Unable to upload metadata for asset: {asset}".format(asset=asset_id))
operator_object.update_workflow_status("Error")
raise MasExecutionError(operator_object.return_output_object())