in sdlf-utils/pipeline-examples/manifests/stageB/lambda/stage-b-postupdate-metadata/src/lambda_function.py [0:0]
def lambda_handler(event, context):
"""Updates the S3 objects metadata catalog
Arguments:
event {dict} -- Dictionary with details on Bucket and Keys
context {dict} -- Dictionary with details on Lambda context
Returns:
{dict} -- Dictionary with response
"""
try:
logger.info('Fetching event data from previous step')
bucket = event['body']['bucket']
processed_keys_path = event['body']['job']['processedKeysPath']
processed_keys = S3Interface().list_objects(bucket, processed_keys_path)
team = event['body']['team']
pipeline = event['body']['pipeline']
stage = event['body']['pipeline_stage']
dataset = event['body']['dataset']
peh_id = event['body']['job']['peh_id']
keys_to_process = event['body']['keysToProcess']
s3_path = "post-stage/{}/manifests/{}/{}".format(
team, dataset, keys_to_process[0].split("/")[-1])
logger.info('Initializing Octagon client')
component = context.function_name.split('-')[-2].title()
octagon_client = (
octagon.OctagonClient()
.with_run_lambda(True)
.with_configuration_instance(event['body']['env'])
.build()
)
peh.PipelineExecutionHistoryAPI(
octagon_client).retrieve_pipeline_execution(peh_id)
logger.info('Initializing DynamoDB config and Interface')
dynamo_config = DynamoConfiguration()
dynamo_interface = DynamoInterface(dynamo_config)
logger.info('Storing metadata to DynamoDB')
for key in processed_keys:
object_metadata = {
'bucket': bucket,
'key': key,
'size': S3Interface().get_size(bucket, key),
'last_modified_date': S3Interface().get_last_modified(bucket, key),
'org': event['body']['org'],
'app': event['body']['app'],
'env': event['body']['env'],
'team': team,
'pipeline': pipeline,
'dataset': dataset,
'stage': 'stage',
'pipeline_stage': stage,
'peh_id': peh_id
}
dynamo_interface.update_object_metadata_catalog(object_metadata)
logger.info("Updating manifests control table")
items = get_manifest_data(bucket, team, dataset, keys_to_process[0])
ddb_keys = get_ddb_keys(items)
for ddb_key in ddb_keys:
dynamo_interface.update_manifests_control_table_stageb(
ddb_key, "COMPLETED")
logger.info("Move manifest file to post stage")
kms_key = KMSConfiguration(team).get_kms_arn
s3_interface = S3Interface()
s3_interface.copy_object(
bucket, keys_to_process[0], bucket, s3_path, kms_key=kms_key)
logger.info("Removing manifest file from pre-stage")
s3_interface.delete_objects(bucket, keys_to_process[0])
# Only uncomment if a queue for the next stage exists
# logger.info('Sending messages to next SQS queue if it exists')
# sqs_config = SQSConfiguration(team, dataset, ''.join([stage[:-1], chr(ord(stage[-1]) + 1)]))
# sqs_interface = SQSInterface(sqs_config.get_stage_queue_name)
# sqs_interface.send_batch_messages_to_fifo_queue(processed_keys, 10, '{}-{}'.format(team, dataset))
octagon_client.update_pipeline_execution(
status="{} {} Processing".format(stage, component), component=component)
octagon_client.end_pipeline_execution_success()
except Exception as e:
logger.error("Fatal error", exc_info=True)
octagon_client.end_pipeline_execution_failed(component=component,
issue_comment="{} {} Error: {}".format(stage, component, repr(e)))
for ddb_key in ddb_keys:
dynamo_interface.update_manifests_control_table_stageb(
ddb_key, "FAILED",None, "Failed in Post Update")
raise e
return 200