def process_sns_message()

in infrastructure/emr_trigger/lambda_source/trigger.py [0:0]


def process_sns_message(record, table, current_batch_id):
    """
    Parse S3 event record, add metadata to DynamoDB, assign to LATEST batch, and update LATEST batch metadata
    :param record:
    :param table:
    :param current_batch_id:
    :return:
    """
    message = parse_s3_event(record)

    # Add new file to latest batch
    updated_item = table.update_item(
        Key={
            "BatchId": current_batch_id,
            "Name": message["bag_file"],
        },
        UpdateExpression="SET bag_file=:bf, files = list_append(if_not_exists(files, :empty_list), :new_object), topics = list_append(if_not_exists(topics, :empty_list), :topic)",
        ExpressionAttributeValues={
            ":empty_list": [],
            ":new_object": [f"s3://{message['bucket']}/{message['key']}"],
            ":topic": [message["topic"]],
            ":bf": message["bag_file"],
        },
        ReturnValues="ALL_NEW",
    )["Attributes"]

    # Update Latest
    latest = table.update_item(
        Key={
            "BatchId": "LATEST",
            "Name": "LATEST",
        },
        UpdateExpression="set FileSizeKb = FileSizeKb + :s, NumFiles = NumFiles + :n",
        ExpressionAttributeValues={":s": message["size"], ":n": 1},
        ReturnValues="ALL_NEW",
    )["Attributes"]

    return latest, updated_item