def lambda_handler()

in sdlf-utils/pipeline-examples/event-dataset-dependencies/sdlf-engineering-pipeline/nested-stacks/lambda/craete-dependencies-by-table/src/lambda_function.py [0:0]


def lambda_handler(event, context):
    def deserialize(data):
        if isinstance(data, list):
            return [deserialize(v) for v in data]

        if isinstance(data, dict):
            try:
                return deserializer.deserialize(data)
            except TypeError:
                return {k: deserialize(v) for k, v in data.items()}
        else:
            return data

    def serialize(data):
        try:
            return {k: serializer.serialize(v) for k, v in data.items() if v != ""}
        except ClientError as err:
            raise err

    def update_table_transforms(table_name, transform_list):
        expr_attr = serialize({':transforms': transform_list})
        response = dynamodb_client.update_item(
            TableName=DYNAMO_DEPENDENCIES_BYTABLE,
            Key=serialize({'table_name': table_name}),
            UpdateExpression='SET list_transforms = :transforms',
            ExpressionAttributeValues=expr_attr)
        return response

    def delete_table_transforms(table_name):
        response = dynamodb_client.delete_item(
            TableName=DYNAMO_DEPENDENCIES_BYTABLE,
            Key=serialize({'table_name': table_name}))
        return response

    def get_table_transforms(table_name):
        table_dependencies = dynamodb_client.get_item(
            TableName=DYNAMO_DEPENDENCIES_BYTABLE,
            Key=serialize({'table_name': table_name})) # dep['TableName']
        table_dependencies = deserialize(table_dependencies)
        return table_dependencies['Item']['list_transforms']

    print(event)
    dynamodb_client = boto3.client("dynamodb")

   # If new item arrive to dependencies table
    if event['Records'][0]['eventName'] == 'INSERT':
        item = event['Records'][0]['dynamodb']['NewImage']
        deserialized_item = deserialize(item)
        print(deserialized_item)
        transform_name = deserialized_item['dataset']
        dependencies = deserialized_item['dependencies']

        # for each dependency
        for dependency in dependencies:
            list_transforms = []
            table = dependency['TableName']
            print(f'Mapping transform "{transform_name}" to "{table}" table')
            try:
                list_transforms = get_table_transforms(table)
                if list_transforms.count(transform_name) == 0:
                    list_transforms.append(transform_name)
                    update_table_transforms(table, list_transforms)
            except:
                list_transforms.append(transform_name)
                update_table_transforms(table, list_transforms)

    # If dependencies table item was modified
    if event['Records'][0]['eventName'] == 'MODIFY':
        old_item = deserialize(event['Records'][0]['dynamodb']['OldImage'])
        new_item = deserialize(event['Records'][0]['dynamodb']['NewImage'])
        array_old = []
        array_new = []

        for i in old_item['dependencies']:
            array_old.append(i["TableName"])
        for i in new_item['dependencies']:
            array_new.append(i["TableName"])

        diff_old = list(set(array_old) - set(array_new))
        diff_new = list(set(array_new) - set(array_old))
        old_transform = old_item['dataset']
        new_transform = new_item['dataset']
        for table in diff_old:
            if array_old.count(table) != 0:
                print(f'Removing transform "{old_transform}" from "{table}" table')
                try:
                    list_transforms = get_table_transforms(table)
                    transformation_count = len(list_transforms)
                    if list_transforms.count(old_transform) != 0:
                        if transformation_count <= 1:
                            delete_table_transforms(table)
                        else:
                            list_transforms.remove(old_transform)
                            update_table_transforms(table, list_transforms)
                except Exception as exp:
                    exception_type, exception_value, exception_traceback = sys.exc_info()
                    traceback_string = traceback.format_exception(exception_type, exception_value,
                                                                  exception_traceback)
                    err_msg = json.dumps({
                        "errorType": exception_type.__name__,
                        "errorMessage": str(exception_value),
                        "stackTrace": traceback_string
                    })
                    print(err_msg)
        for table in diff_new:
            if array_new.count(table) != 0:
                list_transforms = []
                print(f'Mapping transform "{old_transform}" to "{table}" table')
                try:
                    list_transforms = get_table_transforms(table)
                    if list_transforms.count(new_transform) != 0:
                        list_transforms.append(new_transform)
                        update_table_transforms(table, list_transforms)
                except:
                    list_transforms.append(new_transform)
                    update_table_transforms(table, list_transforms)
               # agregar logica para cuando se agregó un registro                                                                                        transform_name))

    elif event['Records'][0]['eventName'] == 'REMOVE':
        old_item = deserialize(event['Records'][0]['dynamodb']['OldImage'])
        transform_name =old_item['dataset']
        dependencies = old_item['dependencies']
        # print(event['Records'][0]['dynamodb']['OldImage'])
        # tables=list_transforms["Items"]

        for dependency in dependencies:
            table = dependency['TableName']
            try:
                print(f'Removing transform "{transform_name}" from "{table}" table')
                list_transforms = get_table_transforms(table)
                transformation_count = len(list_transforms)
                if list_transforms.count(transform_name) != 0:
                    if transformation_count <= 1:
                        delete_table_transforms(table)
                    else:
                        list_transforms.remove(transform_name)
                        update_table_transforms(table, list_transforms)
            except Exception as exp:
                exception_type, exception_value, exception_traceback = sys.exc_info()
                traceback_string = traceback.format_exception(exception_type, exception_value,
                                                              exception_traceback)
                err_msg = json.dumps({
                    "errorType": exception_type.__name__,
                    "errorMessage": str(exception_value),
                    "stackTrace": traceback_string
                })
                print(err_msg)
                print(f'Error with dependencies-by-table, table {table} does not have dependencies')