in backend/lambdas/jobs/stats_updater.py [0:0]
def _aggregate_stats(events):
stats = Counter({})
for event in events:
event_name = event["EventName"]
event_data = event.get("EventData", {})
if event_name in ["QuerySucceeded", "QueryFailed"]:
stats += Counter(
{
"TotalQueryCount": 1,
"TotalQuerySucceededCount": 1
if event_name == "QuerySucceeded"
else 0,
"TotalQueryFailedCount": 1 if event_name == "QueryFailed" else 0,
"TotalQueryScannedInBytes": event_data.get("Statistics", {}).get(
"DataScannedInBytes", 0
),
"TotalQueryTimeInMillis": event_data.get("Statistics", {}).get(
"EngineExecutionTimeInMillis", 0
),
}
)
if event_name in [
"ObjectUpdated",
"ObjectUpdateSkipped",
"ObjectUpdateFailed",
"ObjectRollbackFailed",
]:
stats += Counter(
{
"TotalObjectUpdatedCount": 1
if event_name == "ObjectUpdated"
else 0,
"TotalObjectUpdateSkippedCount": 1
if event_name == "ObjectUpdateSkipped"
else 0,
"TotalObjectUpdateFailedCount": 1
if event_name == "ObjectUpdateFailed"
else 0,
"TotalObjectRollbackFailedCount": 1
if event_name == "ObjectRollbackFailed"
else 0,
}
)
return stats