in emails/management/commands/process_emails_from_sqs.py [0:0]
def process_queue(self) -> dict[str, Any]:
"""
Process the SQS email queue until an exit condition is reached.
Return is a dict suitable for logging context, with these keys:
* exit_on: Why processing exited - "interrupt", "max_seconds", "unknown"
* cycles: How many polling cycles completed
* total_s: The total execution time, in seconds with millisecond precision
* total_messages: The number of messages processed, with and without errors
* failed_messages: The number of messages that failed with errors,
omitted if none
* pause_count: The number of 1-second pauses due to temporary errors
"""
exit_on = "unknown"
self.cycles = 0
self.total_messages = 0
self.failed_messages = 0
self.pause_count = 0
self.start_time = time.monotonic()
while not self.halt_requested:
try:
cycle_data: dict[str, Any] = {
"cycle_num": self.cycles,
"cycle_s": 0.0,
}
cycle_data.update(self.refresh_and_emit_queue_count_metrics())
self.write_healthcheck()
# Check if we should exit due to time limit
if self.max_seconds is not None:
elapsed = time.monotonic() - self.start_time
if elapsed >= self.max_seconds:
exit_on = "max_seconds"
break
# Request and process a chunk of messages
with Timer(logger=None) as cycle_timer:
message_batch, queue_data = self.poll_queue_for_messages()
cycle_data.update(queue_data)
cycle_data.update(self.process_message_batch(message_batch))
# Collect data and log progress
self.total_messages += len(message_batch)
self.failed_messages += int(cycle_data.get("failed_count", 0))
self.pause_count += int(cycle_data.get("pause_count", 0))
cycle_data["message_total"] = self.total_messages
cycle_data["cycle_s"] = round(cycle_timer.last, 3)
logger.log(
(
logging.INFO
if (message_batch or self.verbosity > 1)
else logging.DEBUG
),
(
f"Cycle {self.cycles}: processed"
f" {self.pluralize(len(message_batch), 'message')}"
),
extra=cycle_data,
)
self.cycles += 1
gc.collect() # Force garbage collection of boto3 SQS client resources
except KeyboardInterrupt:
self.halt_requested = True
exit_on = "interrupt"
process_data = {
"exit_on": exit_on,
"cycles": self.cycles,
"total_s": round(time.monotonic() - self.start_time, 3),
"total_messages": self.total_messages,
}
if self.failed_messages:
process_data["failed_messages"] = self.failed_messages
if self.pause_count:
process_data["pause_count"] = self.pause_count
return process_data