in emails/views.py [0:0]
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
"""
Handle an AWS SES bounce notification.
For more information, see:
https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
Returns:
* 404 response if any email address does not match a user,
* 200 response if all match or none are given
Emits a counter metric "email_bounce" with these tags:
* bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
* bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
* user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
* relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
Emits an info log "bounce_notification", same data as metric, plus:
* bounce_action: 'action' from bounced recipient data, or None
* bounce_status: 'status' from bounced recipient data, or None
* bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
* bounce_extra: Extra data from bounce_recipient data, if any
* domain: User's real email address domain, if an address was given
* fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
"""
bounce = message_json.get("bounce", {})
bounce_type = bounce.get("bounceType", "none")
bounce_subtype = bounce.get("bounceSubType", "none")
bounced_recipients = bounce.get("bouncedRecipients", [])
now = datetime.now(UTC)
bounce_data = []
for recipient in bounced_recipients:
recipient_address = recipient.pop("emailAddress", None)
data = {
"bounce_type": bounce_type,
"bounce_subtype": bounce_subtype,
"bounce_action": recipient.pop("action", ""),
"bounce_status": recipient.pop("status", ""),
"bounce_diagnostic": recipient.pop("diagnosticCode", ""),
"user_match": "no_address",
"relay_action": "no_action",
}
if recipient:
data["bounce_extra"] = recipient.copy()
bounce_data.append(data)
if recipient_address is None:
continue
recipient_address = parseaddr(recipient_address)[1]
recipient_domain = recipient_address.split("@")[1]
data["domain"] = recipient_domain
try:
user = User.objects.get(email=recipient_address)
profile = user.profile
data["user_match"] = "found"
if (fxa := profile.fxa) and profile.metrics_enabled:
data["fxa_id"] = fxa.uid
else:
data["fxa_id"] = ""
except User.DoesNotExist:
# TODO: handle bounce for a user who no longer exists
# add to SES account-wide suppression list?
data["user_match"] = "missing"
continue
action = None
if "spam" in data["bounce_diagnostic"].lower():
# if an email bounced as spam, set to auto block spam for this user
# and DON'T set them into bounce pause state
action = "auto_block_spam"
profile.auto_block_spam = True
elif bounce_type == "Permanent":
# TODO: handle sub-types: 'General', 'NoEmail', etc.
action = "hard_bounce"
profile.last_hard_bounce = now
elif bounce_type == "Transient":
# TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
action = "soft_bounce"
profile.last_soft_bounce = now
if action:
data["relay_action"] = action
profile.save()
if not bounce_data:
# Data when there are no identified recipients
bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
for data in bounce_data:
tags = {
"bounce_type": bounce_type,
"bounce_subtype": bounce_subtype,
"user_match": data["user_match"],
"relay_action": data["relay_action"],
}
incr_if_enabled(
"email_bounce",
1,
tags=[generate_tag(key, val) for key, val in tags.items()],
)
info_logger.info("bounce_notification", extra=data)
if any(data["user_match"] == "missing" for data in bounce_data):
return HttpResponse("Address does not exist", status=404)
return HttpResponse("OK", status=200)