in handlers/aws/utils.py [0:0]
def summarize_lambda_event(event: dict[str, Any], max_records: int = 10) -> dict[str, Any]:
"""
Summarize the lambda event to include only the most relevant information.
"""
summary: dict[str, Any] = {}
try:
first_records_key = f"first_{max_records}_records"
records = event.get("Records", [])
for record in records:
event_source = record.get("eventSource", "unknown")
if event_source == "aws:sqs":
aws_sqs_summary = summary.get(
"aws:sqs",
# if `aws:sqs` key does not exist yet,
# we initialize the summary.
{
"total_records": 0,
first_records_key: [],
},
)
# We keep track of the total number of notifications in the
# lambda event, so users know if the summary is incomplete.
notifications = json_parser(record["body"])
# So users know if we included only a
# subset of the records.
aws_sqs_summary["total_records"] += len(notifications["Records"])
for r in notifications["Records"]:
# we only include the s3 object key in the summary.
#
# Here is an example of a notification record:
#
# {
# "Records": [
# {
# "awsRegion": "eu-west-1",
# "eventName": "ObjectCreated:Put",
# "eventSource": "aws:s3",
# "eventVersion": "2.1",
# "s3": {
# "bucket": {
# "arn": "arn:aws:s3:::mbranca-esf-data",
# "name": "mbranca-esf-data"
# },
# "object": {
# "key": "AWSLogs/1234567890/CloudTrail-Digest/"
# }
# }
# }
# ]
# }
# We stop adding records to the summary once we reach
# the `max_records` limit.
if len(aws_sqs_summary[first_records_key]) == max_records:
break
# Add the s3 object key to the summary.
aws_sqs_summary[first_records_key].append(r.get("s3"))
# Update the summary with the new information.
summary["aws:sqs"] = aws_sqs_summary
except Exception as exc:
shared_logger.exception("error summarizing lambda event", exc_info=exc)
# We add an error message to the summary so users know if the summary
# is incomplete.
summary["error"] = str(exc)
return summary