in infrastructure/emr_trigger/lambda_source/trigger.py [0:0]
def process_sns_message(record, table, current_batch_id):
"""
Parse S3 event record, add metadata to DynamoDB, assign to LATEST batch, and update LATEST batch metadata
:param record:
:param table:
:param current_batch_id:
:return:
"""
message = parse_s3_event(record)
# Add new file to latest batch
updated_item = table.update_item(
Key={
"BatchId": current_batch_id,
"Name": message["bag_file"],
},
UpdateExpression="SET bag_file=:bf, files = list_append(if_not_exists(files, :empty_list), :new_object), topics = list_append(if_not_exists(topics, :empty_list), :topic)",
ExpressionAttributeValues={
":empty_list": [],
":new_object": [f"s3://{message['bucket']}/{message['key']}"],
":topic": [message["topic"]],
":bf": message["bag_file"],
},
ReturnValues="ALL_NEW",
)["Attributes"]
# Update Latest
latest = table.update_item(
Key={
"BatchId": "LATEST",
"Name": "LATEST",
},
UpdateExpression="set FileSizeKb = FileSizeKb + :s, NumFiles = NumFiles + :n",
ExpressionAttributeValues={":s": message["size"], ":n": 1},
ReturnValues="ALL_NEW",
)["Attributes"]
return latest, updated_item