in backend/lambdas/jobs/stream_processor.py [0:0]
def handler(event, context):
records = event["Records"]
new_jobs = get_records(records, "Job", "INSERT")
deleted_jobs = get_records(records, "Job", "REMOVE", new_image=False)
events = get_records(records, "JobEvent", "INSERT")
grouped_events = groupby(sorted(events, key=itemgetter("Id")), key=itemgetter("Id"))
for job in new_jobs:
process_job(job)
for job in deleted_jobs:
cleanup_manifests(job)
for job_id, group in grouped_events:
group = [i for i in group]
update_stats(job_id, group)
updated_job = update_status(job_id, group)
# Perform cleanup if required
if (
updated_job
and updated_job.get("JobStatus") == "FORGET_COMPLETED_CLEANUP_IN_PROGRESS"
):
try:
clear_deletion_queue(updated_job)
emit_event(
job_id, "CleanupSucceeded", utc_timestamp(), "StreamProcessor"
)
except Exception as e:
emit_event(
job_id,
"CleanupFailed",
{"Error": "Unable to clear deletion queue: {}".format(str(e))},
"StreamProcessor",
)
elif updated_job and updated_job.get("JobStatus") in skip_cleanup_states:
emit_event(job_id, "CleanupSkipped", utc_timestamp(), "StreamProcessor")