in source/dataplaneapi/app.py [0:0]
def put_asset_metadata(asset_id):
"""
Adds operation metadata for an asset.
If the results are in a paginated format, such as from Rekognition, you must set the paginate query param to
"true" for each page of metadata a put metadata call is made for. For the final page of results,
the "end" query param must be set to "true", which will tell the dataplane that the paginated session is
over and update the pointer for that metadata type.
Query String Params:
:param paginate: Boolean to tell dataplane that the results will come in as pages.
:param end: Boolean to declare the last page in a set of paginated results.
Body:
.. code-block:: python
{
"OperatorName": "{some_operator}",
"Results": "{json_formatted_results}",
"WorkflowId": "workflow-id"
}
Returns:
Dictionary containing the status of the PUT metadata operation. If a pointer is updated, the response will also
include the S3 Bucket and S3 Key that the data was written to.
.. code-block:: python
{
"Status": "$status", "Bucket": $bucket, "Key": $metadata_key
}
Raises:
BadRequestError - 400
ChaliceViewError - 500
"""
# TODO: Maybe add some enforcement around only being able to end paginated calls if called from the same workflow
bucket = dataplane_s3_bucket
table_name = dataplane_table_name
asset = asset_id
body = json.loads(app.current_request.raw_body.decode())
query_params = app.current_request.query_params
paginated = False
end_pagination = False
if query_params is not None:
try:
paginated = query_params["paginated"]
except KeyError:
raise BadRequestError("Must pass required query parameter: paginated")
else:
if paginated == "true":
paginated = True
if paginated is True:
try:
end_pagination = query_params["end"]
except KeyError:
logger.info("Not the end of paginated results")
else:
if end_pagination == "true":
logger.info("Storing the last page of results")
end_pagination = True
else:
raise BadRequestError("Query param end only supports a value of: true")
try:
operator_name = body['OperatorName']
workflow_id = body['WorkflowId']
results = json.loads(json.dumps(body['Results']), parse_float=Decimal)
except KeyError as e:
logger.error("Exception occurred while storing metadata for {asset}: {e}".format(asset=asset, e=e))
raise BadRequestError("Missing required inputs for storing metadata: {e}".format(e=e))
except Exception as e:
logger.error("Exception occurred while storing metadata for {asset}: {e}".format(asset=asset, e=e))
raise ChaliceViewError("Unknown exception when storing asset metadata: {e}".format(e=e))
else:
# check that results is dict
if not isinstance(results, dict):
logger.error("Exception occurred while storing metadata for {asset}".format(asset=asset))
raise BadRequestError(
"Exception occurred while storing metadata for {asset}: results are not the required data type, dict".format(
asset=asset))
else:
logger.info("Storing metadata for {asset}".format(asset=asset))
# Key that we'll write the results too
metadata_key = base_s3_uri + asset + '/' + 'workflows' + '/' + workflow_id + '/' + operator_name + '.json'
# Verify asset exists before adding metadata and check if pointers exist for this operator
# TODO: This check happens every time we have an additional call when storing paginated results,
# could likely refactor this to avoid that
try:
table = dynamo_resource.Table(table_name)
response = table.get_item(
Key={
"AssetId": asset
}
)
except ClientError as e:
error = e.response['Error']['Message']
logger.error("Exception occurred while storing metadata for {asset}: {e}".format(asset=asset, e=error))
raise ChaliceViewError("Exception occurred while verifying asset exists: {e}".format(e=error))
except Exception as e:
logger.error("Exception occurred while storing metadata for {asset}: {e}".format(asset=asset, e=e))
raise ChaliceViewError("Exception occurred while verifying asset exists: {e}".format(e=e))
else:
if 'Item' not in response:
raise NotFoundError(
"Exception occurred while verifying asset exists: {asset} does not exist".format(asset=asset))
else:
try:
pointers = response['Item'][operator_name]
except KeyError:
logger.info("No pointers have been stored for this operator")
pointers = []
else:
logger.info("Retrieved existing pointers")
# We wont update dynamo unless we successfully write to s3
wrote_to_s3 = False
# write results to s3
if paginated:
# Check if the operator already generated metadata in this workflow execution
check_existing = read_metadata_from_s3(bucket, metadata_key)
if check_existing['Status'] == 'Error':
# Write the first page directly, format it as a list
logger.info("Operator has not stored results during this worfklow execution, writing first page to S3")
formatted_result = [results]
store_results = write_metadata_to_s3(bucket, metadata_key, formatted_result)
if store_results['Status'] == 'Success':
logging.info(
'Wrote {operator} metadata page to s3 for asset: {asset}'.format(asset=asset, operator=operator_name))
wrote_to_s3 = True
else:
logging.error('Unable to write paginated metadata to s3 for asset: {asset}'.format(asset=asset))
raise ChaliceViewError("Exception occurred while writing metadata to s3: {e}".format(e=store_results["Message"]))
else:
# Pull in the existing metadata
existing_results = json.loads(check_existing['Object'])
# Append new data
existing_results.append(results)
# Write back to s3
store_results = write_metadata_to_s3(bucket, metadata_key, existing_results)
if store_results['Status'] == 'Success':
logging.info(
'Wrote {operator} metadata page to S3 for asset: {asset}'.format(asset=asset, operator=operator_name))
wrote_to_s3 = True
else:
logging.error('Unable to write paginated metadata to s3 for asset: {asset}'.format(asset=asset))
raise ChaliceViewError("Exception occurred while writing metadata to s3: {e}".format(e=store_results["Message"]))
else:
store_results = write_metadata_to_s3(bucket, metadata_key, results)
if store_results['Status'] == 'Success':
logging.info(
'Wrote {operator} metadata to S3 for asset: {asset}'.format(asset=asset, operator=operator_name))
wrote_to_s3 = True
else:
logging.error('Unable to write metadata to s3 for asset: {asset}'.format(asset=asset))
raise ChaliceViewError(
"Exception occurred while writing metadata to s3: {e}".format(e=store_results["Message"]))
# We only update pointer for results if all the pages are written successfully
if not paginated and wrote_to_s3 or end_pagination and wrote_to_s3:
# update the pointer list in dynamo
# we store pointers as list to keep reference of results from different executions for the same operator
pointer = {"workflow": workflow_id, "pointer": metadata_key}
pointers.insert(0, pointer)
update_expression = "SET #operator_result = :result"
expression_attr_name = {"#operator_result": operator_name}
expression_attr_val = {":result": pointers}
try:
table = dynamo_resource.Table(table_name)
table.update_item(
Key={
"AssetId": asset
},
UpdateExpression=update_expression,
ExpressionAttributeNames=expression_attr_name,
ExpressionAttributeValues=expression_attr_val,
)
except ClientError as e:
error = e.response['Error']['Message']
logger.error("Exception occurred during metadata pointer update: {e}".format(e=error))
raise ChaliceViewError("Unable to update metadata pointer: {e}".format(e=error))
except Exception as e:
logger.error("Exception updating pointer in dynamo {e}".format(e=e))
raise ChaliceViewError("Exception: {e}".format(e=e))
else:
logger.info("Successfully stored {operator} metadata for asset: {asset} in the dataplane".format(
operator=operator_name, asset=asset_id))
return {"Status": "Success", "Bucket": bucket, "Key": metadata_key}
elif paginated and not end_pagination and wrote_to_s3:
return {"Status": "Success"}
else:
return {"Status": "Failed"}