def _handle_bounce()

in emails/views.py [0:0]


def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
    """
    Handle an AWS SES bounce notification.

    For more information, see:
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object

    Returns:
    * 404 response if any email address does not match a user,
    * 200 response if all match or none are given

    Emits a counter metric "email_bounce" with these tags:
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'

    Emits an info log "bounce_notification", same data as metric, plus:
    * bounce_action: 'action' from bounced recipient data, or None
    * bounce_status: 'status' from bounced recipient data, or None
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
    * bounce_extra: Extra data from bounce_recipient data, if any
    * domain: User's real email address domain, if an address was given
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
    """
    bounce = message_json.get("bounce", {})
    bounce_type = bounce.get("bounceType", "none")
    bounce_subtype = bounce.get("bounceSubType", "none")
    bounced_recipients = bounce.get("bouncedRecipients", [])

    now = datetime.now(UTC)
    bounce_data = []
    for recipient in bounced_recipients:
        recipient_address = recipient.pop("emailAddress", None)
        data = {
            "bounce_type": bounce_type,
            "bounce_subtype": bounce_subtype,
            "bounce_action": recipient.pop("action", ""),
            "bounce_status": recipient.pop("status", ""),
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
            "user_match": "no_address",
            "relay_action": "no_action",
        }
        if recipient:
            data["bounce_extra"] = recipient.copy()
        bounce_data.append(data)

        if recipient_address is None:
            continue

        recipient_address = parseaddr(recipient_address)[1]
        recipient_domain = recipient_address.split("@")[1]
        data["domain"] = recipient_domain

        try:
            user = User.objects.get(email=recipient_address)
            profile = user.profile
            data["user_match"] = "found"
            if (fxa := profile.fxa) and profile.metrics_enabled:
                data["fxa_id"] = fxa.uid
            else:
                data["fxa_id"] = ""
        except User.DoesNotExist:
            # TODO: handle bounce for a user who no longer exists
            # add to SES account-wide suppression list?
            data["user_match"] = "missing"
            continue

        action = None
        if "spam" in data["bounce_diagnostic"].lower():
            # if an email bounced as spam, set to auto block spam for this user
            # and DON'T set them into bounce pause state
            action = "auto_block_spam"
            profile.auto_block_spam = True
        elif bounce_type == "Permanent":
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
            action = "hard_bounce"
            profile.last_hard_bounce = now
        elif bounce_type == "Transient":
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
            action = "soft_bounce"
            profile.last_soft_bounce = now
        if action:
            data["relay_action"] = action
            profile.save()

    if not bounce_data:
        # Data when there are no identified recipients
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]

    for data in bounce_data:
        tags = {
            "bounce_type": bounce_type,
            "bounce_subtype": bounce_subtype,
            "user_match": data["user_match"],
            "relay_action": data["relay_action"],
        }
        incr_if_enabled(
            "email_bounce",
            1,
            tags=[generate_tag(key, val) for key, val in tags.items()],
        )
        info_logger.info("bounce_notification", extra=data)

    if any(data["user_match"] == "missing" for data in bounce_data):
        return HttpResponse("Address does not exist", status=404)
    return HttpResponse("OK", status=200)