def lambda_handler()

in sdlf-utils/pipeline-examples/manifests/stageA/lambda/stage-a-postupdate-metadata/src/lambda_function.py [0:0]


def lambda_handler(event, context):
    """Updates the S3 objects metadata catalog

    Arguments:
        event {dict} -- Dictionary with details on previous processing step
        context {dict} -- Dictionary with details on Lambda context

    Returns:
        {dict} -- Dictionary with outcome of the process
    """
    try:
        logger.info('Fetching event data from previous step')
        processed_keys = event['body']['processedKeys']
        team = event['body']['team']
        pipeline = event['body']['pipeline']
        stage = event['body']['pipeline_stage']
        dataset = event['body']['dataset']
        peh_id = event['body']['peh_id']
        manifest_enabled = event['body']['manifest_enabled']
        manifest_file_flag = event['body']['is_manifest_file']
        if "manifest_ddb_key" in event['body']:
            ddb_key = event['body']['manifest_ddb_key']
        

        logger.info('Initializing Octagon client')
        component = context.function_name.split('-')[-2].title()
        octagon_client = (
            octagon.OctagonClient()
            .with_run_lambda(True)
            .with_configuration_instance(event['body']['env'])
            .build()
        )
        peh.PipelineExecutionHistoryAPI(
            octagon_client).retrieve_pipeline_execution(peh_id)

        logger.info('Initializing DynamoDB config and Interface')
        dynamo_config = DynamoConfiguration()
        dynamo_interface = DynamoInterface(dynamo_config)

        logger.info('Storing metadata to DynamoDB')
        bucket = S3Configuration().stage_bucket
        for key in processed_keys:
            object_metadata = {
                'bucket': bucket,
                'key': key,
                'size': S3Interface().get_size(bucket, key),
                'last_modified_date': S3Interface().get_last_modified(bucket, key),
                'org': event['body']['org'],
                'app': event['body']['app'],
                'env': event['body']['env'],
                'team': team,
                'pipeline': pipeline,
                'dataset': dataset,
                'stage': 'stage',
                'pipeline_stage': stage,
                'peh_id': peh_id
            }

            dynamo_interface.update_object_metadata_catalog(object_metadata)
            if (manifest_enabled == "True" and manifest_file_flag == "False"):
                dynamo_interface.update_manifests_control_table_stagea(ddb_key, "COMPLETED", key)

        if (manifest_enabled == "True" and manifest_file_flag == "True"):
            logger.info('Sending messages to next SQS queue if it exists')
            sqs_config = SQSConfiguration(team, dataset, ''.join(
                [stage[:-1], chr(ord(stage[-1]) + 1)]))
            sqs_interface = SQSInterface(sqs_config.get_stage_queue_name)
            sqs_interface.send_batch_messages_to_fifo_queue(
                processed_keys, 10, '{}-{}'.format(team, dataset))

        octagon_client.update_pipeline_execution(status="{} {} Processing".format(stage, component),
                                                 component=component)
        octagon_client.end_pipeline_execution_success()
    except Exception as e:
        logger.error("Fatal error", exc_info=True)
        octagon_client.end_pipeline_execution_failed(component=component,
                                                     issue_comment="{} {} Error: {}".format(stage, component, repr(e)))
        dynamo_interface.update_manifests_control_table_stagea(
            ddb_key, "FAILED")
        raise e
    return 200