in emails/management/commands/process_emails_from_sqs.py [0:0]
def process_message_batch(self, message_batch: list[SQSMessage]) -> dict[str, Any]:
"""
Process a batch of messages.
Arguments:
* messages - a list of SQS messages, possibly empty
Return is a dict suitable for logging context, with these keys:
* process_s: How long processing took, omitted if no messages
* pause_count: How many pauses were taken for temporary errors, omitted if 0
* pause_s: How long pauses took, omitted if no pauses
* failed_count: How many messages failed to process, omitted if 0
Times are in seconds, with millisecond precision
"""
if not message_batch:
return {}
failed_count = 0
pause_time = 0.0
pause_count = 0
process_time = 0.0
for message in message_batch:
self.write_healthcheck()
with Timer(logger=None) as message_timer:
message_data = self.process_message(message)
if not message_data["success"]:
failed_count += 1
if message_data["success"] or self.delete_failed_messages:
message.delete()
pause_time += message_data.get("pause_s", 0.0)
pause_count += message_data.get("pause_count", 0)
message_data["message_process_time_s"] = round(message_timer.last, 3)
process_time += message_timer.last
logger.log(logging.INFO, "Message processed", extra=message_data)
batch_data = {"process_s": round((process_time - pause_time), 3)}
if pause_count:
batch_data["pause_count"] = pause_count
batch_data["pause_s"] = round(pause_time, 3)
if failed_count:
batch_data["failed_count"] = failed_count
return batch_data