def lambda_handler()

in sdlf-utils/pipeline-examples/event-dataset-dependencies/sdlf-engineering-stageA/lambda/stage-a-postupdate-metadata/src/lambda_function.py [0:0]


def lambda_handler(event, context):
    """Updates the S3 objects metadata catalog

    Arguments:
        event {dict} -- Dictionary with details on previous processing step
        context {dict} -- Dictionary with details on Lambda context

    Returns:
        {dict} -- Dictionary with outcome of the process
    """

    try:
        logger.info('Fetching event data from previous step')
        body = event['body']
        processOutput = body['job']['processOutput']
        processed_keys = processOutput['processed_keys']
        team = body['team']
        pipeline = body['pipeline']
        stage = body['pipeline_stage']
        dataset1 = body['dataset']
        peh_id = body['peh_id']
        prestage_db = processOutput.get('prestage_db', None)
        prestage_table = processOutput.get('prestage_table', None)
        logger.info('Initializing Octagon client')
        component = context.function_name.split('-')[-2].title()
        octagon_client = (
            octagon.OctagonClient()
            .with_run_lambda(True)
            .with_configuration_instance(body['env'])
            .build()
        )
        peh.PipelineExecutionHistoryAPI(
            octagon_client).retrieve_pipeline_execution(peh_id)

        logger.info('Initializing DynamoDB config and Interface')
        dynamo_config = DynamoConfiguration()
        dynamo_interface = DynamoInterface(dynamo_config)

        logger.info('Storing metadata to DynamoDB and tagging resulting S3 Objects')
        bucket = S3Configuration().stage_bucket
        for key in processed_keys:
            object_metadata = {
                'bucket': bucket,
                'key': key,
                'size': S3Interface().get_size(bucket, key),
                'last_modified_date': S3Interface().get_last_modified(bucket, key),
                'org': body['org'],
                'app': body['app'],
                'env': body['env'],
                'team': team,
                'pipeline': pipeline,
                'dataset': dataset1,
                'stage': 'stage',
                'pipeline_stage': stage,
                'peh_id': peh_id
            }

            dynamo_interface.update_object_metadata_catalog(object_metadata)

            tag_keys = ['org', 'app', 'env', 'team', 'dataset']
            tag_dict = {key: object_metadata[key] for key in tag_keys}
            S3Interface().tag_object(bucket, key, tag_dict)

        # New dependencies
        if body['job']['jobDetails']['num_of_steps'] > 0:
            ssmresponse = ssmcli.get_parameter(
                Name=f'/SDLF/DDB/{team}/{pipeline}/DependenciesByTable'
            )
            ddb_dependencies_by_table = ssmresponse['Parameter']['Value']
            ddb_table = dynamodb.Table(ddb_dependencies_by_table)
            ssmresponse = ssmcli.get_parameter(
                Name=f'/SDLF/DDB/{team}/{pipeline}/Dependencies'
            )
            ddb_dependencies = ssmresponse['Parameter']['Value']
            consulta = f'{prestage_db.lower()}.{prestage_table.lower()}'
            logger.info(consulta)
            response = ddb_table.get_item(Key={'table_name': consulta})
            logger.info(f'Response {response}')
            if 'Item' in response:
                list_transforms = response['Item']['list_transforms']
                num_of_transforms = len(list_transforms)
                logger.debug(f'Response {response}')
                logger.info(f'This table triggers {num_of_transforms} datasets')
                next_stage = chr(ord(stage[-1]) + 1)
                stage_b_message = {}
                dest = {}
                tbls = []
                for dataset in list_transforms:
                    ddb_steps = dynamodb.Table(ddb_dependencies)
                    logger.info(dataset)
                    response = ddb_steps.get_item(Key={'dataset': dataset})
                    logger.info(f'Response {response}')
                    num_of_transforms = len(list_transforms)
                    item = response['Item']
                    dest_table = item['dataset'].split('.')[1]
                    dest_db = item['dataset'].split('.')[0]
                    date_substitutions = replace_decimals(item.get('date_substitutions',[]))
                    dependencies = item['dependencies']
                    logger.info(f'Dependencies: {dependencies}')
                    partition = item.get('partitionColumn', '')
                    partition_mask = item.get('partitionPythonMask', None)
                    partition_value_formatted = None
                    for table in dependencies:
                        table_name = table['TableName'].split('.')[1]
                        table_db = table['TableName'].split('.')[0]
                        table_partition = table.get('FieldColumn', '')
                        table_partition_format = table.get('DateExpression', None)
                        relativedelta_attributes = replace_decimals(table.get('relativedelta_attributes', None))
                        relativedelta_attributes = replace_days(relativedelta_attributes)
                        logger.info(f'relativedelta_attributes={relativedelta_attributes}')
                        table_partitions = processOutput.get('partitions', [])
                        usage = table.get('Usage', 'validate').lower()
                        if usage == 'validate':
                            if prestage_db.lower() == table_db.lower() and prestage_table.lower() == table_name.lower():
                                logger.info(f'This table does not update/overwrite {dataset} dataset')
                                break
                            else:
                                logger.debug(f'Table {table_db}.{table_name} is not the trigger table')
                        elif prestage_db.lower() == table_db.lower() and prestage_table.lower() == table_name.lower():
                            # dst_tbl_partitions = get_table_partitions(prestage_db,prestage_table)
                            partition_value_formatted = ''
                            # If dest table has partitions and source table has partitions
                            logger.debug(f'Partition: {partition}, table_partitions: {table_partitions}')
                            if table_partitions and table_partition_format is not None:
                                table_partition_value = table_partitions[0]['value']
                                value = datetime.strptime(table_partition_value, table_partition_format)
                                target_value = value + relativedelta(**relativedelta_attributes)
                                partition_value_formatted = target_value.strftime(partition_mask)
                                logger.info(f'This table {usage.upper()} dataset {dest_table} ' 
                                            f' Partition {partition} = {partition_value_formatted}')
                                # validate(table_db, table_name, table_partitions)
                            stage_b_message['prev_stage_processed_keys'] = processed_keys
                            stage_b_message['team'] = team
                            stage_b_message['pipeline'] = pipeline
                            stage_b_message['pipeline_stage'] = ''.join([stage[:-1], next_stage])
                            stage_b_message['dataset'] = dataset1
                            stage_b_message['org'] = body['org']
                            stage_b_message['app'] = body['app']
                            stage_b_message['env'] = body['env']
                            stage_b_message['behaviour'] = table['Usage'].lower()
                            stage_b_message['dest_db'] = dest_db
                            stage_b_message['dest_table'] = {}
                            stage_b_message['dest_table']['name'] = dest_table
                            stage_b_message['dest_table']['part_name'] = partition
                            stage_b_message['dest_table']['part_value'] = partition_value_formatted
                            stage_b_message['steps'] = item['steps']
                            stage_b_message['date_substitutions'] = date_substitutions

                    logger.info('Sending messages to next SQS queue if it exists')
                    logger.info(stage_b_message)
                    sqs_config = SQSConfiguration(team, pipeline, ''.join(
                        [stage[:-1], next_stage]))
                    sqs_interface = SQSInterface(sqs_config.get_stage_queue_name)
                    sqs_interface.send_message_to_fifo_queue(
                        json.dumps(stage_b_message), '{}-{}'.format(team, pipeline))

            else:
                logger.info('This table triggers 0 datasets')

        octagon_client.update_pipeline_execution(status=f'{stage} {component} Processing',
                                                 component=component)
        octagon_client.end_pipeline_execution_success()
    except Exception as e:
        logger.error("Fatal error", exc_info=True)
        octagon_client.end_pipeline_execution_failed(component=component,
                                                     issue_comment=f'{stage} {component} Error: {repr(e)}')
        raise e
    return 200