def lambda_handler()

in sdlf-utils/pipeline-examples/event-dataset-dependencies/sdlf-engineering-stageB/lambda/stage-b-postupdate-metadata/src/lambda_function.py [0:0]


def lambda_handler(event, context):
    """Updates the S3 objects metadata catalog

    Arguments:
        event {dict} -- Dictionary with details on Bucket and Keys
        context {dict} -- Dictionary with details on Lambda context

    Returns:
        {dict} -- Dictionary with response
    """
    def replace_decimals(obj):
        if isinstance(obj, list):
            for i in range(len(obj)):
                obj[i] = replace_decimals(obj[i])
            return obj
        elif isinstance(obj, dict):
            for k, v in obj.items():
                obj[k] = replace_decimals(v)
            return obj
        elif isinstance(obj, set):
            return set(replace_decimals(i) for i in obj)
        elif isinstance(obj, decimal.Decimal):
            if obj % 1 == 0:
                return int(obj)
            else:
                return float(obj)
        else:
            return obj

    def get_table_partitions(db, tbl):
        glue_response = glue_client.get_table(
            DatabaseName=db,
            Name=tbl)
        logger.debug('Glue get_table response: {}'.format(glue_response))
        return glue_response['Table']['PartitionKeys']

    try:
        logger.info('Fetching event data from previous step')
        bucket = event['body']['bucket']
        processed_keys_path = event['body']['job']['processedKeysPath']
        processed_keys = S3Interface().list_objects(bucket, processed_keys_path)
        team = event['body']['team']
        pipeline = event['body']['pipeline']
        stage = event['body']['pipeline_stage']
        dataset1 = event['body']['dataset']
        peh_id = event['body']['job']['peh_id']
        env = event['body']['env']

        logger.info('Initializing Octagon client')
        component = context.function_name.split('-')[-2].title()
        octagon_client = (
            octagon.OctagonClient()
            .with_run_lambda(True)
            .with_configuration_instance(event['body']['env'])
            .build()
        )
        peh.PipelineExecutionHistoryAPI(
            octagon_client).retrieve_pipeline_execution(peh_id)

        logger.info('Initializing DynamoDB config and Interface')
        dynamo_config = DynamoConfiguration()
        dynamo_interface = DynamoInterface(dynamo_config)

        logger.info('Storing metadata to DynamoDB and tagging resulting S3 Objects')
        for key in processed_keys:
            object_metadata = {
                'bucket': bucket,
                'key': key,
                'size': S3Interface().get_size(bucket, key),
                'last_modified_date': S3Interface().get_last_modified(bucket, key),
                'org': event['body']['org'],
                'app': event['body']['app'],
                'env': event['body']['env'],
                'team': team,
                'pipeline': pipeline,
                'dataset': dataset1,
                'stage': 'stage',
                'pipeline_stage': stage,
                'peh_id': peh_id
            }
            dynamo_interface.update_object_metadata_catalog(object_metadata)

            tag_keys = ['org', 'app', 'env', 'team', 'dataset']
            tag_dict = {key: object_metadata[key] for key in tag_keys}
            S3Interface().tag_object(bucket, key, tag_dict)

        # Only uncomment if a queue for the next stage exists
        # logger.info('Sending messages to next SQS queue if it exists')
        # sqs_config = SQSConfiguration(team, dataset, ''.join([stage[:-1], chr(ord(stage[-1]) + 1)]))
        # sqs_interface = SQSInterface(sqs_config.get_stage_queue_name)
        # sqs_interface.send_batch_messages_to_fifo_queue(processed_keys, 10, '{}-{}'.format(team, dataset))
        prestage_table = event['body']['dest_table']['name']
        prestage_db = event['body']['dest_db']
        dest_part_name = event['body']['dest_table']['part_name']
        dest_part_value = event['body']['dest_table']['part_value']
        processOutput = {}
        if dest_part_name is not '' and dest_part_value is not '':
            partitions = []
            part_dict = {"name": dest_part_name,
                         "value": dest_part_value}
            partitions.append(part_dict)
            processOutput['partitions'] = partitions
        processOutput['processed_keys'] = processed_keys
        ssmresponse = ssmcli.get_parameter(
            Name=f'/SDLF/DDB/{team}/{pipeline}/DependenciesByTable'
        )
        ddb_dependencies_by_table = ssmresponse['Parameter']['Value']
        ddb_table = dynamodb.Table(ddb_dependencies_by_table)
        ssmresponse = ssmcli.get_parameter(
            Name=f'/SDLF/DDB/{team}/{pipeline}/Dependencies'
        )
        ddb_dependencies = ssmresponse['Parameter']['Value']
        consulta = f'{prestage_db.lower()}.{prestage_table.lower()}'
        logger.info(consulta)
        response = ddb_table.get_item(Key={'table_name': consulta})
        logger.info(f'Response {response}')
        if 'Item' in response:
            list_transforms = response['Item']['list_transforms']
            num_of_transforms = len(list_transforms)
            logger.debug(f'Response {response}')
            logger.info(f'This table triggers {num_of_transforms} datasets')
            next_stage = 'B'
            stage_b_message = {}
            for dataset in list_transforms:
                ddb_steps = dynamodb.Table(ddb_dependencies)
                logger.info(dataset)
                response = ddb_steps.get_item(Key={'dataset': dataset})
                logger.info(f'Response {response}')
                num_of_transforms = len(list_transforms)
                item = response['Item']
                dest_table = item['dataset'].split('.')[1]
                dest_db = item['dataset'].split('.')[0]
                dependencies = item['dependencies']
                date_substitutions = replace_decimals(item.get('date_substitutions',[]))
                logger.info(f'Dependencies: {dependencies}')
                partition = item.get('partitionColumn', '')
                partition_mask = item.get('partitionPythonMask', None)
                partition_value_formatted = None
                table_check = []
                for table in dependencies:
                    table_name = table['TableName'].split('.')[1]
                    table_db = table['TableName'].split('.')[0]
                    table_partition = table.get('FieldColumn', '')
                    table_partition_format = table.get('DateExpression', None)
                    relativedelta_attributes = replace_decimals(table.get('relativedelta_attributes', None))
                    table_partitions = processOutput.get('partitions', [])
                    usage = table.get('Usage', 'validate').lower()
                    if usage == 'validate':
                        if prestage_db == table_db and prestage_table == table_name:
                            logger.info(f'This table does not update/overwrite {dataset} dataset')
                            break
                        else:
                            logger.debug(f'Table {table_db}.{table_name} is not the trigger table')
                    else:
                        if prestage_db.lower() == table_db.lower() and prestage_table.lower() == table_name.lower():
                            # dst_tbl_partitions = get_table_partitions(prestage_db,prestage_table)
                            partition_value_formatted = ''
                            # If dest table has partitions and source table has partitions
                            logger.debug(f'Partition: {partition}, table_partitions: {table_partitions}')
                            if table_partitions and table_partition_format is not None:
                                table_partition_value = table_partitions[0]['value']
                                value = datetime.strptime(table_partition_value, table_partition_format)
                                target_value = value - relativedelta(**relativedelta_attributes)
                                partition_value_formatted = target_value.strftime(partition_mask)
                                logger.info(f'This table {usage.upper()} dataset {dest_table} '
                                            f' Partition {partition} = {partition_value_formatted}')
                                # validate(table_db, table_name, table_partitions)
                            stage_b_message['prev_stage_processed_keys'] = processed_keys
                            stage_b_message['team'] = team
                            stage_b_message['pipeline'] = pipeline
                            stage_b_message['pipeline_stage'] = ''.join([stage[:-1], next_stage])
                            stage_b_message['dataset'] = dataset1
                            stage_b_message['org'] = event['body']['org']
                            stage_b_message['app'] = event['body']['app']
                            stage_b_message['env'] = event['body']['env']
                            stage_b_message['behaviour'] = table['Usage'].lower()
                            stage_b_message['dest_db'] = dest_db
                            stage_b_message['dest_table'] = {}
                            stage_b_message['dest_table']['name'] = dest_table
                            stage_b_message['dest_table']['part_name'] = partition
                            stage_b_message['dest_table']['part_value'] = partition_value_formatted
                            stage_b_message['steps'] = item['steps']
                            stage_b_message['date_substitutions'] = date_substitutions
                            logger.info('Sending messages to next SQS queue if it exists')
                            # GEt queue by SSM
                            logger.info(stage_b_message)
                            sqs_config = SQSConfiguration(team, pipeline, stage)
                            sqs_interface = SQSInterface(sqs_config.get_stage_queue_name)
                            sqs_interface.send_message_to_fifo_queue(
                                json.dumps(stage_b_message), '{}-{}'.format(team, pipeline))
                            break

        else:
            logger.info(f'This table triggers 0 datasets')

        octagon_client.update_pipeline_execution(status=f'{stage} {component} Processing',
                                                 component=component)
        octagon_client.end_pipeline_execution_success()
    except Exception as e:
        logger.error("Fatal error", exc_info=True)
        octagon_client.end_pipeline_execution_failed(component=component,
                                                     issue_comment=f'{stage} {component} Error: {repr(e)}')
        raise e
    return 200