in emails/management/commands/process_emails_from_sqs.py [0:0]
def process_message(self, message: SQSMessage) -> dict[str, Any]:
"""
Process an SQS message, which may include sending an email.
Return is a dict suitable for logging context, with these keys:
* success: True if message was processed successfully
* error: The processing error, omitted on success
* message_body_quoted: Set if the message was non-JSON, omitted for valid JSON
* pause_count: Set to 1 if paused due to temporary error, or omitted
with no error
* pause_s: The pause in seconds (ms precision) for temp error, or omitted
* pause_error: The temporary error, or omitted if no temp error
* client_error_code: The error code for non-temp or retry error,
omitted on success
"""
incr_if_enabled("process_message_from_sqs", 1)
results = {"success": True, "sqs_message_id": message.message_id}
raw_body = message.body
try:
json_body = json.loads(raw_body)
except ValueError as e:
results["success"] = False
results["error"] = f"Failed to load message.body: {e}"
results["message_body_quoted"] = shlex.quote(raw_body)
return results
try:
verified_json_body = verify_from_sns(json_body)
except (KeyError, VerificationFailed) as e:
logger.error("Failed SNS verification", extra={"error": str(e)})
results["success"] = False
results["error"] = f"Failed SNS verification: {e}"
return results
topic_arn = verified_json_body["TopicArn"]
message_type = verified_json_body["Type"]
error_details = validate_sns_arn_and_type(topic_arn, message_type)
if error_details:
results["success"] = False
results.update(error_details)
return results
def success_callback(result: HttpResponse) -> None:
"""Handle return from successful call to _sns_inbound_logic"""
# TODO: extract data from _sns_inbound_logic return
pass
def error_callback(exc_info: BaseException) -> None:
"""Handle exception raised by _sns_inbound_logic"""
capture_exception(exc_info)
results["success"] = False
if isinstance(exc_info, ClientError):
incr_if_enabled("message_from_sqs_error")
err = exc_info.response["Error"]
logger.error("sqs_client_error", extra=err)
results["error"] = err
results["client_error_code"] = err["Code"].lower()
else:
incr_if_enabled("email_processing_failure")
results["error"] = str(exc_info)
results["error_type"] = type(exc_info).__name__
# Run in a multiprocessing Pool
# This will start a subprocess, which needs to run django.setup
# The benefit is that the subprocess can be terminated
# The penalty is that is is slower to start
pool_start_time = time.monotonic()
with Pool(1, initializer=setup) as pool:
future = pool.apply_async(
run_sns_inbound_logic,
[topic_arn, message_type, verified_json_body],
callback=success_callback,
error_callback=error_callback,
)
setup_time = time.monotonic() - pool_start_time
results["subprocess_setup_time_s"] = round(setup_time, 3)
message_start_time = time.monotonic()
message_duration = 0.0
while message_duration < self.max_seconds_per_message:
self.write_healthcheck()
future.wait(1.0)
message_duration = time.monotonic() - message_start_time
if future.ready():
break
results["message_process_time_s"] = round(message_duration, 3)
if not future.ready():
error = f"Timed out after {self.max_seconds_per_message:0.1f} seconds."
results["success"] = False
results["error"] = error
return results