in ApplicationCode/dockerfiles/stage-b-postupdate-metadata/src/lambda_function.py [0:0]
def lambda_handler(event, context):
"""Updates the S3 objects metadata catalog
Arguments:
event {dict} -- Dictionary with details on Bucket and Keys
context {dict} -- Dictionary with details on Lambda context
Returns:
{dict} -- Dictionary with response
"""
try:
component = context.function_name.split('-')[-2].title()
peh.PipelineExecutionHistoryAPI(octagon_client).retrieve_pipeline_execution(event['body']['job']['peh_id'])
octagon_client.update_pipeline_execution(status="Post-Stage {} Processing".format(component), component=component)
bucket = event['body']['bucket']
processed_keys_path = event['body']['job']['processedKeysPath']
processed_keys = S3Interface().list_objects(bucket, processed_keys_path)
team = event['body']['team']
pipeline = event['body']['pipeline']
dataset = event['body']['dataset']
logger.info('Initializing DynamoDB config and Interface')
dynamo_config = DynamoConfiguration()
dynamo_interface = DynamoInterface(dynamo_config)
logger.info('Storing metadata to DynamoDB')
for key in processed_keys:
object_metadata = {
'bucket': bucket,
'key': key,
'team': team,
'pipeline': pipeline,
'dataset': dataset,
'stage': 'post-stage'
}
dynamo_interface.update_object_metadata_catalog(object_metadata)
octagon_client.end_pipeline_execution_success()
except Exception as e:
logger.error("Fatal error", exc_info=True)
octagon_client.end_pipeline_execution_failed(component=component,
issue_comment="Post-Stage {} Error: {}".format(component, repr(e)))
raise e
return 200