def process_message()

in emails/management/commands/process_emails_from_sqs.py [0:0]


    def process_message(self, message: SQSMessage) -> dict[str, Any]:
        """
        Process an SQS message, which may include sending an email.

        Return is a dict suitable for logging context, with these keys:
        * success: True if message was processed successfully
        * error: The processing error, omitted on success
        * message_body_quoted: Set if the message was non-JSON, omitted for valid JSON
        * pause_count: Set to 1 if paused due to temporary error, or omitted
          with no error
        * pause_s: The pause in seconds (ms precision) for temp error, or omitted
        * pause_error: The temporary error, or omitted if no temp error
        * client_error_code: The error code for non-temp or retry error,
          omitted on success
        """
        incr_if_enabled("process_message_from_sqs", 1)
        results = {"success": True, "sqs_message_id": message.message_id}
        raw_body = message.body
        try:
            json_body = json.loads(raw_body)
        except ValueError as e:
            results["success"] = False
            results["error"] = f"Failed to load message.body: {e}"
            results["message_body_quoted"] = shlex.quote(raw_body)
            return results
        try:
            verified_json_body = verify_from_sns(json_body)
        except (KeyError, VerificationFailed) as e:
            logger.error("Failed SNS verification", extra={"error": str(e)})
            results["success"] = False
            results["error"] = f"Failed SNS verification: {e}"
            return results

        topic_arn = verified_json_body["TopicArn"]
        message_type = verified_json_body["Type"]
        error_details = validate_sns_arn_and_type(topic_arn, message_type)
        if error_details:
            results["success"] = False
            results.update(error_details)
            return results

        def success_callback(result: HttpResponse) -> None:
            """Handle return from successful call to _sns_inbound_logic"""
            # TODO: extract data from _sns_inbound_logic return
            pass

        def error_callback(exc_info: BaseException) -> None:
            """Handle exception raised by _sns_inbound_logic"""
            capture_exception(exc_info)
            results["success"] = False
            if isinstance(exc_info, ClientError):
                incr_if_enabled("message_from_sqs_error")
                err = exc_info.response["Error"]
                logger.error("sqs_client_error", extra=err)
                results["error"] = err
                results["client_error_code"] = err["Code"].lower()
            else:
                incr_if_enabled("email_processing_failure")
                results["error"] = str(exc_info)
                results["error_type"] = type(exc_info).__name__

        # Run in a multiprocessing Pool
        # This will start a subprocess, which needs to run django.setup
        # The benefit is that the subprocess can be terminated
        # The penalty is that is is slower to start
        pool_start_time = time.monotonic()
        with Pool(1, initializer=setup) as pool:
            future = pool.apply_async(
                run_sns_inbound_logic,
                [topic_arn, message_type, verified_json_body],
                callback=success_callback,
                error_callback=error_callback,
            )
            setup_time = time.monotonic() - pool_start_time
            results["subprocess_setup_time_s"] = round(setup_time, 3)

            message_start_time = time.monotonic()
            message_duration = 0.0
            while message_duration < self.max_seconds_per_message:
                self.write_healthcheck()
                future.wait(1.0)
                message_duration = time.monotonic() - message_start_time
                if future.ready():
                    break

            results["message_process_time_s"] = round(message_duration, 3)
            if not future.ready():
                error = f"Timed out after {self.max_seconds_per_message:0.1f} seconds."
                results["success"] = False
                results["error"] = error
        return results