def lambda_handler()

in sdlf-utils/pipeline-examples/manifests/stageA/lambda/stage-a-loaddatafile-metadata/src/lambda_function.py [0:0]


def lambda_handler(event, context):
    """ Load Datafile metadata in manifests control table
        Check if manifest file is available within the threshold
    
    Arguments:
        event {dict} -- Dictionary with details on previous processing step
        context {dict} -- Dictionary with details on Lambda context

    Returns:
        {dict} -- Dictionary with outcome of the process
    """
    s3_interface = S3Interface()
    stage_bucket = S3Configuration().stage_bucket

    dynamo_config = DynamoConfiguration()
    dynamo_interface = DynamoInterface(dynamo_config)
    current_time = dt.datetime.utcnow()
    current_timestamp = current_time.timestamp()


    try:
        logger.info("Fetching event data from previous step")
        team = event['body']['team']
        pipeline = event['body']['pipeline']
        stage = event['body']['pipeline_stage']
        dataset = event['body']['dataset']
        peh_id = event['body']['peh_id']
        env = event['body']['env']
        bucket = event['body']['bucket']
        input_file_key = event['body']['key']
        input_file_name = input_file_key.split("/")[-1]
        manifest_file_pattern = event['body']['manifest_details']['regex_pattern']
        manifest_timeout = int(event['body']['manifest_details']['manifest_timeout'])
        
        if 'manifest_interval' in event['body']:
            manifest_interval = event['body']['manifest_interval']
        else:
            manifest_interval = current_timestamp


        logger.info('Initializing Octagon client')
        component = context.function_name.split('-')[-2].title()
        octagon_client = (
            octagon.OctagonClient()
            .with_run_lambda(True)
            .with_configuration_instance(env)
            .build()
        )
        peh.PipelineExecutionHistoryAPI(
            octagon_client).retrieve_pipeline_execution(peh_id)

        octagon_client.update_pipeline_execution(status="{} {} Processing".format(stage, component),
                                                 component=component)
        
        ### List S3 Objects for the manifest file in the manifest prefix
        ### For this to work the manifest should have been loaded into DynamoDB

        manifest_key = "pre-stage/{}/manifests/{}/".format(team, dataset)
        processed_manifest_keys = s3_interface.list_objects(
            stage_bucket, manifest_key)
        
        matched_keys =[]
        items = []
        
        if not processed_manifest_keys:
            logger.info("Manifest File has not been loaded, sleeping for 5 mins")
            time.sleep(300)
            manifest_file_loaded="False"

        else:
            for manifest_file_key in processed_manifest_keys:
                manifest_file_name = manifest_file_key.split("/")[-1]
                match = re.match(manifest_file_pattern, manifest_file_name)
                if match:
                    matched_keys.append(manifest_file_name)
                
                ### Query Manifests Control table
                for keys in matched_keys:
                    dataset_name=team+"-"+dataset
                    try:
                        items.append(dynamo_interface.get_item_from_manifests_control_table(
                        dataset_name, keys, input_file_name))
                    except KeyError:
                        logger.info("Manifest File has not been loaded, sleeping for 5 mins")
                        manifest_file_loaded="False"
                
                ### Update Manifests Control table

                if not items:
                    logger.info(
                        "Manifest File has not been loaded, sleeping for 5 mins")
                    time.sleep(300)
                    manifest_file_loaded="False"
                else:
                    ddb_key = {
                        'dataset_name': items[0]['dataset_name'], 'datafile_name': items[0]['datafile_name']}
                    STATUS="STARTED"
                    dynamo_interface.update_manifests_control_table_stagea(
                        ddb_key, STATUS)
                    manifest_file_loaded="True"
                    event['body']['manifest_ddb_key'] = ddb_key
        
        ### Check if Manifest threshold has exceeded

        if current_timestamp == manifest_interval:
            current_timestamp = dt.datetime.utcnow().timestamp()

        if int((current_timestamp - manifest_interval)/60) >= manifest_timeout:
            logger.error("Manifest Threshold Breached")
            raise Exception("Manifest Threshold Breached")

        event['body']['manifest_interval'] = manifest_interval
        event['body']['manifest_file_loaded'] = manifest_file_loaded

    except Exception as e:
        logger.error("Fatal error", exc_info=True)
        octagon_client.end_pipeline_execution_failed(component=component,
                                                     issue_comment="{} {} Error: {}".format(stage, component, repr(e)))
        raise e

    return event