in sdlf-utils/pipeline-examples/manifests/stageA/lambda/stage-a-postupdate-metadata/src/lambda_function.py [0:0]
def lambda_handler(event, context):
"""Updates the S3 objects metadata catalog
Arguments:
event {dict} -- Dictionary with details on previous processing step
context {dict} -- Dictionary with details on Lambda context
Returns:
{dict} -- Dictionary with outcome of the process
"""
try:
logger.info('Fetching event data from previous step')
processed_keys = event['body']['processedKeys']
team = event['body']['team']
pipeline = event['body']['pipeline']
stage = event['body']['pipeline_stage']
dataset = event['body']['dataset']
peh_id = event['body']['peh_id']
manifest_enabled = event['body']['manifest_enabled']
manifest_file_flag = event['body']['is_manifest_file']
if "manifest_ddb_key" in event['body']:
ddb_key = event['body']['manifest_ddb_key']
logger.info('Initializing Octagon client')
component = context.function_name.split('-')[-2].title()
octagon_client = (
octagon.OctagonClient()
.with_run_lambda(True)
.with_configuration_instance(event['body']['env'])
.build()
)
peh.PipelineExecutionHistoryAPI(
octagon_client).retrieve_pipeline_execution(peh_id)
logger.info('Initializing DynamoDB config and Interface')
dynamo_config = DynamoConfiguration()
dynamo_interface = DynamoInterface(dynamo_config)
logger.info('Storing metadata to DynamoDB')
bucket = S3Configuration().stage_bucket
for key in processed_keys:
object_metadata = {
'bucket': bucket,
'key': key,
'size': S3Interface().get_size(bucket, key),
'last_modified_date': S3Interface().get_last_modified(bucket, key),
'org': event['body']['org'],
'app': event['body']['app'],
'env': event['body']['env'],
'team': team,
'pipeline': pipeline,
'dataset': dataset,
'stage': 'stage',
'pipeline_stage': stage,
'peh_id': peh_id
}
dynamo_interface.update_object_metadata_catalog(object_metadata)
if (manifest_enabled == "True" and manifest_file_flag == "False"):
dynamo_interface.update_manifests_control_table_stagea(ddb_key, "COMPLETED", key)
if (manifest_enabled == "True" and manifest_file_flag == "True"):
logger.info('Sending messages to next SQS queue if it exists')
sqs_config = SQSConfiguration(team, dataset, ''.join(
[stage[:-1], chr(ord(stage[-1]) + 1)]))
sqs_interface = SQSInterface(sqs_config.get_stage_queue_name)
sqs_interface.send_batch_messages_to_fifo_queue(
processed_keys, 10, '{}-{}'.format(team, dataset))
octagon_client.update_pipeline_execution(status="{} {} Processing".format(stage, component),
component=component)
octagon_client.end_pipeline_execution_success()
except Exception as e:
logger.error("Fatal error", exc_info=True)
octagon_client.end_pipeline_execution_failed(component=component,
issue_comment="{} {} Error: {}".format(stage, component, repr(e)))
dynamo_interface.update_manifests_control_table_stagea(
ddb_key, "FAILED")
raise e
return 200