def process_queue()

in emails/management/commands/process_emails_from_sqs.py [0:0]


    def process_queue(self) -> dict[str, Any]:
        """
        Process the SQS email queue until an exit condition is reached.

        Return is a dict suitable for logging context, with these keys:
        * exit_on: Why processing exited - "interrupt", "max_seconds", "unknown"
        * cycles: How many polling cycles completed
        * total_s: The total execution time, in seconds with millisecond precision
        * total_messages: The number of messages processed, with and without errors
        * failed_messages: The number of messages that failed with errors,
          omitted if none
        * pause_count: The number of 1-second pauses due to temporary errors
        """
        exit_on = "unknown"
        self.cycles = 0
        self.total_messages = 0
        self.failed_messages = 0
        self.pause_count = 0
        self.start_time = time.monotonic()

        while not self.halt_requested:
            try:
                cycle_data: dict[str, Any] = {
                    "cycle_num": self.cycles,
                    "cycle_s": 0.0,
                }
                cycle_data.update(self.refresh_and_emit_queue_count_metrics())
                self.write_healthcheck()

                # Check if we should exit due to time limit
                if self.max_seconds is not None:
                    elapsed = time.monotonic() - self.start_time
                    if elapsed >= self.max_seconds:
                        exit_on = "max_seconds"
                        break

                # Request and process a chunk of messages
                with Timer(logger=None) as cycle_timer:
                    message_batch, queue_data = self.poll_queue_for_messages()
                    cycle_data.update(queue_data)
                    cycle_data.update(self.process_message_batch(message_batch))

                # Collect data and log progress
                self.total_messages += len(message_batch)
                self.failed_messages += int(cycle_data.get("failed_count", 0))
                self.pause_count += int(cycle_data.get("pause_count", 0))
                cycle_data["message_total"] = self.total_messages
                cycle_data["cycle_s"] = round(cycle_timer.last, 3)
                logger.log(
                    (
                        logging.INFO
                        if (message_batch or self.verbosity > 1)
                        else logging.DEBUG
                    ),
                    (
                        f"Cycle {self.cycles}: processed"
                        f" {self.pluralize(len(message_batch), 'message')}"
                    ),
                    extra=cycle_data,
                )

                self.cycles += 1
                gc.collect()  # Force garbage collection of boto3 SQS client resources

            except KeyboardInterrupt:
                self.halt_requested = True
                exit_on = "interrupt"

        process_data = {
            "exit_on": exit_on,
            "cycles": self.cycles,
            "total_s": round(time.monotonic() - self.start_time, 3),
            "total_messages": self.total_messages,
        }
        if self.failed_messages:
            process_data["failed_messages"] = self.failed_messages
        if self.pause_count:
            process_data["pause_count"] = self.pause_count
        return process_data