import html
import json
import logging
import re
import shlex
from datetime import UTC, datetime
from email import message_from_bytes
from email.iterators import _structure
from email.message import EmailMessage
from email.utils import parseaddr
from io import StringIO
from json import JSONDecodeError
from textwrap import dedent
from typing import Any, Literal, NamedTuple, TypedDict, TypeVar
from urllib.parse import urlencode
from uuid import uuid4

from django.conf import settings
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.db.models import prefetch_related_objects
from django.http import HttpRequest, HttpResponse
from django.shortcuts import render
from django.template.loader import render_to_string
from django.utils.html import escape
from django.views.decorators.csrf import csrf_exempt

from botocore.exceptions import ClientError
from codetiming import Timer
from decouple import strtobool
from markus.utils import generate_tag
from sentry_sdk import capture_message
from waffle import get_waffle_flag_model, sample_is_active

from privaterelay.ftl_bundles import main as ftl_bundle
from privaterelay.models import Profile
from privaterelay.utils import (
    flag_is_active_in_task,
    get_subplat_upgrade_link_by_language,
    glean_logger,
)

from .exceptions import CannotMakeAddressException
from .models import (
    DeletedAddress,
    DomainAddress,
    RelayAddress,
    Reply,
    address_hash,
    get_domain_numerical,
)
from .policy import relay_policy
from .sns import SUPPORTED_SNS_TYPES, verify_from_sns
from .types import (
    AWS_MailJSON,
    AWS_SNSMessageJSON,
    EmailForwardingIssues,
    EmailHeaderIssues,
    OutgoingHeaders,
)
from .utils import (
    InvalidFromHeader,
    _get_bucket_and_key_from_s3_json,
    b64_lookup_key,
    count_all_trackers,
    decrypt_reply_metadata,
    derive_reply_keys,
    encode_dict_gza85,
    encrypt_reply_metadata,
    generate_from_header,
    get_domains_from_settings,
    get_message_content_from_s3,
    get_message_id_bytes,
    get_reply_to_address,
    histogram_if_enabled,
    incr_if_enabled,
    parse_email_header,
    remove_message_from_s3,
    remove_trackers,
    ses_send_raw_email,
    urlize_and_linebreaks,
)

logger = logging.getLogger("events")
info_logger = logging.getLogger("eventsinfo")


class ReplyHeadersNotFound(Exception):
    def __init__(self, message="No In-Reply-To or References headers."):
        self.message = message


def first_time_user_test(request):
    """
    Demonstrate rendering of the "First time Relay user" email.
    Settings like language can be given in the querystring, otherwise settings
    come from a random free profile.
    """
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
    email_context = {
        "in_bundle_country": in_bundle_country,
        "SITE_ORIGIN": settings.SITE_ORIGIN,
    }
    if request.GET.get("format", "html") == "text":
        return render(
            request,
            "emails/first_time_user.txt",
            email_context,
            "text/plain; charset=utf-8",
        )
    return render(request, "emails/first_time_user.html", email_context)


def reply_requires_premium_test(request):
    """
    Demonstrate rendering of the "Reply requires premium" email.

    Settings like language can be given in the querystring, otherwise settings
    come from a random free profile.
    """
    email_context = {
        "sender": "test@example.com",
        "forwarded": True,
        "SITE_ORIGIN": settings.SITE_ORIGIN,
    }
    for param in request.GET:
        email_context[param] = request.GET.get(param)
        if param == "forwarded" and request.GET[param] == "True":
            email_context[param] = True

    for param in request.GET:
        if param == "content-type" and request.GET[param] == "text/plain":
            return render(
                request,
                "emails/reply_requires_premium.txt",
                email_context,
                "text/plain; charset=utf-8",
            )
    return render(request, "emails/reply_requires_premium.html", email_context)


def disabled_mask_for_spam_test(request):
    """
    Demonstrate rendering of the "Disabled mask for spam" email.

    Settings like language can be given in the querystring, otherwise settings
    come from a random free profile.
    """
    mask = "abc123456@mozmail.com"
    email_context = {
        "mask": mask,
        "SITE_ORIGIN": settings.SITE_ORIGIN,
    }
    for param in request.GET:
        email_context[param] = request.GET.get(param)

    for param in request.GET:
        if param == "content-type" and request.GET[param] == "text/plain":
            return render(
                request,
                "emails/disabled_mask_for_spam.txt",
                email_context,
                "text/plain; charset=utf-8",
            )
    return render(request, "emails/disabled_mask_for_spam.html", email_context)


def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
    # TO DO: Update with correct context when trigger is created
    first_forwarded_email_html = render_to_string(
        "emails/first_forwarded_email.html",
        {
            "SITE_ORIGIN": settings.SITE_ORIGIN,
        },
    )

    wrapped_email = wrap_html_email(
        first_forwarded_email_html,
        "en-us",
        True,
        "test@example.com",
        0,
    )

    return HttpResponse(wrapped_email)


def wrap_html_email(
    original_html: str,
    language: str,
    has_premium: bool,
    display_email: str,
    num_level_one_email_trackers_removed: int | None = None,
    tracker_report_link: str | None = None,
) -> str:
    """Add Relay banners, surveys, etc. to an HTML email"""
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
    email_context = {
        "original_html": original_html,
        "language": language,
        "has_premium": has_premium,
        "subplat_upgrade_link": subplat_upgrade_link,
        "display_email": display_email,
        "tracker_report_link": tracker_report_link,
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
        "SITE_ORIGIN": settings.SITE_ORIGIN,
    }
    content = render_to_string("emails/wrapped_email.html", email_context)
    # Remove empty lines
    content_lines = [line for line in content.splitlines() if line.strip()]
    return "\n".join(content_lines) + "\n"


def wrapped_email_test(request: HttpRequest) -> HttpResponse:
    """
    Demonstrate rendering of forwarded HTML emails.

    Settings like language can be given in the querystring, otherwise settings
    come from a randomly chosen profile.
    """

    if all(key in request.GET for key in ("language", "has_premium")):
        user_profile = None
    else:
        user_profile = Profile.objects.order_by("?").first()

    if "language" in request.GET:
        language = request.GET["language"]
    else:
        if user_profile is None:
            raise ValueError("user_profile must not be None")
        language = user_profile.language

    if "has_premium" in request.GET:
        has_premium = strtobool(request.GET["has_premium"])
    else:
        if user_profile is None:
            raise ValueError("user_profile must not be None")
        has_premium = user_profile.has_premium

    if "num_level_one_email_trackers_removed" in request.GET:
        num_level_one_email_trackers_removed = int(
            request.GET["num_level_one_email_trackers_removed"]
        )
    else:
        num_level_one_email_trackers_removed = 0

    if "has_tracker_report_link" in request.GET:
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
    else:
        has_tracker_report_link = False
    if has_tracker_report_link:
        if num_level_one_email_trackers_removed:
            trackers = {
                "fake-tracker.example.com": num_level_one_email_trackers_removed
            }
        else:
            trackers = {}
        tracker_report_link = (
            "/tracker-report/#{"
            '"sender": "sender@example.com", '
            '"received_at": 1658434657, '
            f'"trackers": { json.dumps(trackers) }'
            "}"
        )
    else:
        tracker_report_link = ""

    path = "/emails/wrapped_email_test"
    old_query = {
        "language": language,
        "has_premium": "Yes" if has_premium else "No",
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
        "num_level_one_email_trackers_removed": str(
            num_level_one_email_trackers_removed
        ),
    }

    def switch_link(key, value):
        if old_query[key] == value:
            return str(value)
        new_query = old_query.copy()
        new_query[key] = value
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'

    html_content = dedent(
        f"""\
    <p>
      <strong>Email rendering Test</strong>
    </p>
    <p>Settings: (<a href="{path}">clear all</a>)</p>
    <ul>
      <li>
        <strong>language</strong>:
        {escape(language)}
        (switch to
        {switch_link("language", "en-us")},
        {switch_link("language", "de")},
        {switch_link("language", "en-gb")},
        {switch_link("language", "fr")},
        {switch_link("language", "ru-ru")},
        {switch_link("language", "es-es")},
        {switch_link("language", "pt-br")},
        {switch_link("language", "it-it")},
        {switch_link("language", "en-ca")},
        {switch_link("language", "de-de")},
        {switch_link("language", "es-mx")})
      </li>
      <li>
        <strong>has_premium</strong>:
        {"Yes" if has_premium else "No"}
        (switch to
        {switch_link("has_premium", "Yes")},
        {switch_link("has_premium", "No")})
      </li>
      <li>
        <strong>has_tracker_report_link</strong>:
        {"Yes" if has_tracker_report_link else "No"}
        (switch to
        {switch_link("has_tracker_report_link", "Yes")},
        {switch_link("has_tracker_report_link", "No")})
      </li>
      <li>
        <strong>num_level_one_email_trackers_removed</strong>:
        {num_level_one_email_trackers_removed}
        (switch to
        {switch_link("num_level_one_email_trackers_removed", "0")},
        {switch_link("num_level_one_email_trackers_removed", "1")},
        {switch_link("num_level_one_email_trackers_removed", "2")})
      </li>
    </ul>
    """
    )

    wrapped_email = wrap_html_email(
        original_html=html_content,
        language=language,
        has_premium=has_premium,
        tracker_report_link=tracker_report_link,
        display_email="test@relay.firefox.com",
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
    )
    return HttpResponse(wrapped_email)


def _store_reply_record(
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
) -> AWS_MailJSON:
    # After relaying email, store a Reply record for it
    reply_metadata = {}
    for header in mail["headers"]:
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
            reply_metadata[header["name"].lower()] = header["value"]
    message_id_bytes = get_message_id_bytes(message_id)
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
    lookup = b64_lookup_key(lookup_key)
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
    reply_create_args: dict[str, Any] = {
        "lookup": lookup,
        "encrypted_metadata": encrypted_metadata,
    }
    if isinstance(address, DomainAddress):
        reply_create_args["domain_address"] = address
    else:
        if not isinstance(address, RelayAddress):
            raise TypeError("address must be type RelayAddress")
        reply_create_args["relay_address"] = address
    Reply.objects.create(**reply_create_args)
    return mail


@csrf_exempt
def sns_inbound(request):
    incr_if_enabled("sns_inbound", 1)
    # First thing we do is verify the signature
    json_body = json.loads(request.body)
    verified_json_body = verify_from_sns(json_body)

    # Validate ARN and message type
    topic_arn = verified_json_body.get("TopicArn", None)
    message_type = verified_json_body.get("Type", None)
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
    if error_details:
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
        return HttpResponse(error_details["error"], status=400)

    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)


def validate_sns_arn_and_type(
    topic_arn: str | None, message_type: str | None
) -> dict[str, Any] | None:
    """
    Validate Topic ARN and SNS Message Type.

    If an error is detected, the return is a dictionary of error details.
    If no error is detected, the return is None.
    """
    if not topic_arn:
        error = "Received SNS request without Topic ARN."
    elif topic_arn not in settings.AWS_SNS_TOPIC:
        error = "Received SNS message for wrong topic."
    elif not message_type:
        error = "Received SNS request without Message Type."
    elif message_type not in SUPPORTED_SNS_TYPES:
        error = "Received SNS message for unsupported Type."
    else:
        error = None

    if error:
        return {
            "error": error,
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
            "received_sns_type": (
                shlex.quote(message_type) if message_type else message_type
            ),
            "supported_sns_types": SUPPORTED_SNS_TYPES,
        }
    return None


def _sns_inbound_logic(topic_arn, message_type, json_body):
    if message_type == "SubscriptionConfirmation":
        info_logger.info(
            "SNS SubscriptionConfirmation",
            extra={"SubscribeURL": json_body["SubscribeURL"]},
        )
        return HttpResponse("Logged SubscribeURL", status=200)
    if message_type == "Notification":
        incr_if_enabled("sns_inbound_Notification", 1)
        return _sns_notification(json_body)

    logger.error(
        "SNS message type did not fall under the SNS inbound logic",
        extra={"message_type": shlex.quote(message_type)},
    )
    capture_message(
        "Received SNS message with type not handled in inbound log",
        level="error",
        stack=True,
    )
    return HttpResponse(
        "Received SNS message with type not handled in inbound log", status=400
    )


def _sns_notification(json_body):
    try:
        message_json = json.loads(json_body["Message"])
    except JSONDecodeError:
        logger.error(
            "SNS notification has non-JSON message body",
            extra={"content": shlex.quote(json_body["Message"])},
        )
        return HttpResponse("Received SNS notification with non-JSON body", status=400)

    event_type = message_json.get("eventType")
    notification_type = message_json.get("notificationType")
    if notification_type not in {
        "Complaint",
        "Received",
        "Bounce",
    } and event_type not in {"Complaint", "Bounce"}:
        logger.error(
            "SNS notification for unsupported type",
            extra={
                "notification_type": shlex.quote(notification_type),
                "event_type": shlex.quote(event_type),
                "keys": [shlex.quote(key) for key in message_json.keys()],
            },
        )
        return HttpResponse(
            (
                "Received SNS notification for unsupported Type: "
                f"{html.escape(shlex.quote(notification_type))}"
            ),
            status=400,
        )
    response = _sns_message(message_json)
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
    if response.status_code < 500:
        remove_message_from_s3(bucket, object_key)

    return response


def _get_recipient_with_relay_domain(recipients):
    domains_to_check = get_domains_from_settings().values()
    for recipient in recipients:
        for domain in domains_to_check:
            if domain in recipient:
                return recipient
    return None


def _get_relay_recipient_from_message_json(message_json):
    # Go thru all To, Cc, and Bcc fields and
    # return the one that has a Relay domain

    # First check common headers for to or cc match
    headers_to_check = "to", "cc"
    common_headers = message_json["mail"]["commonHeaders"]
    for header in headers_to_check:
        if header in common_headers:
            recipient = _get_recipient_with_relay_domain(common_headers[header])
            if recipient is not None:
                return parseaddr(recipient)[1]

    # SES-SNS sends bcc in a different part of the message
    recipients = message_json["receipt"]["recipients"]
    return _get_recipient_with_relay_domain(recipients)


def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
    incr_if_enabled("sns_inbound_Notification_Received", 1)
    init_waffle_flags()
    notification_type = message_json.get("notificationType")
    event_type = message_json.get("eventType")
    if notification_type == "Bounce" or event_type == "Bounce":
        return _handle_bounce(message_json)
    if notification_type == "Complaint" or event_type == "Complaint":
        return _handle_complaint(message_json)
    if notification_type != "Received":
        raise ValueError('notification_type must be "Received"')
    if event_type is not None:
        raise ValueError("event_type must be None")
    return _handle_received(message_json)


# Enumerate the reasons that an email was not forwarded.
# This excludes emails dropped due to mask forwarding settings,
# such as "block all" and "block promotional". Those are logged
# as Glean email_blocked events.
EmailDroppedReason = Literal[
    "auto_block_spam",  # Email identified as spam, user has the auto_block_spam flag
    "dmarc_reject_failed",  # Email failed DMARC check with a reject policy
    "hard_bounce_pause",  # The user recently had a hard bounce
    "soft_bounce_pause",  # The user recently has a soft bounce
    "abuse_flag",  # The user exceeded an abuse limit, like mails forwarded
    "user_deactivated",  # The user account is deactivated
    "reply_requires_premium",  # The email is a reply from a free user
    "content_missing",  # Could not load the email from storage
    "error_from_header",  # Error generating the From: header, retryable
    "error_storage",  # Error fetching the email contents from storage (S3), retryable
    "error_sending",  # Error sending the forwarded email (SES), retryable
]


def log_email_dropped(
    reason: EmailDroppedReason,
    mask: RelayAddress | DomainAddress,
    is_reply: bool = False,
    can_retry: bool = False,
) -> None:
    """
    Log that an email was dropped for a reason other than a mask blocking setting.

    This mirrors the interface of glean_logger().log_email_blocked(), which
    records emails dropped due to the mask's blocking setting.
    """
    extra: dict[str, str | int | bool] = {"reason": reason}
    if mask.user.profile.metrics_enabled:
        if mask.user.profile.fxa is not None:
            extra["fxa_id"] = mask.user.profile.fxa.uid
        extra["mask_id"] = mask.metrics_id
    extra |= {
        "is_random_mask": isinstance(mask, RelayAddress),
        "is_reply": is_reply,
        "can_retry": can_retry,
    }
    info_logger.info("email_dropped", extra=extra)


def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
    """
    Handle an AWS SES received notification.

    For more information, see:
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html

    Returns (may be incomplete):
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
      flagged for abuse, the email is under a bounce pause, the email was suppressed
      for spam, the list email was blocked, or the noreply address was the recipient.
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
      the email failed DMARC with reject policy, or the email is a reply chain to a
      non-premium user.
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
      "CC", or "BCC" fields, or the Relay address is not in the database.
    * 503 if the "From" address is malformed, the S3 client returned an error different
      from "not found", or the SES client fails

    And many other returns conditions if the email is a reply. The HTTP returns are an
    artifact from an earlier time when emails were sent to a webhook. Currently,
    production instead pulls events from a queue.

    TODO: Return a more appropriate status object
    TODO: Document the metrics emitted
    """
    mail = message_json["mail"]
    if "commonHeaders" not in mail:
        logger.error("SNS message without commonHeaders")
        return HttpResponse(
            "Received SNS notification without commonHeaders.", status=400
        )
    common_headers = mail["commonHeaders"]
    receipt = message_json["receipt"]

    _record_receipt_verdicts(receipt, "all")
    to_address = _get_relay_recipient_from_message_json(message_json)
    if to_address is None:
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
        return HttpResponse("Address does not exist", status=404)

    _record_receipt_verdicts(receipt, "relay_recipient")
    from_addresses = parse_email_header(common_headers["from"][0])
    if not from_addresses:
        info_logger.error(
            "_handle_received: no from address",
            extra={
                "source": mail["source"],
                "common_headers_from": common_headers["from"],
            },
        )
        return HttpResponse("Unable to parse From address", status=400)
    from_address = from_addresses[0][1]

    try:
        [to_local_portion, to_domain_portion] = to_address.split("@")
    except ValueError:
        # TODO: Add metric
        return HttpResponse("Malformed to field.", status=400)

    if to_local_portion.lower() == "noreply":
        incr_if_enabled("email_for_noreply_address", 1)
        return HttpResponse("noreply address is not supported.")
    try:
        # FIXME: this ambiguous return of either
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
        # up a bit.
        address = _get_address(to_address)
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
        user_profile = address.user.profile
    except (
        ObjectDoesNotExist,
        CannotMakeAddressException,
        DeletedAddress.MultipleObjectsReturned,
    ):
        if to_local_portion.lower() == "replies":
            response = _handle_reply(from_address, message_json, to_address)
        else:
            response = HttpResponse("Address does not exist", status=404)
        return response

    _record_receipt_verdicts(receipt, "valid_user")
    # if this is spam and the user is set to auto-block spam, early return
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
        log_email_dropped(reason="auto_block_spam", mask=address)
        return HttpResponse("Address rejects spam.")

    if _get_verdict(receipt, "dmarc") == "FAIL":
        policy = receipt.get("dmarcPolicy", "none")
        # TODO: determine action on dmarcPolicy "quarantine"
        if policy == "reject":
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
            incr_if_enabled(
                "email_suppressed_for_dmarc_failure",
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
            )
            return HttpResponse("DMARC failure, policy is reject", status=400)

    # if this user is over bounce limits, early return
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
    if bounce_paused:
        _record_receipt_verdicts(receipt, "user_bounce_paused")
        incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
        )
        log_email_dropped(reason=reason, mask=address)
        return HttpResponse("Address is temporarily disabled.")

    # check if this is a reply from an external sender to a Relay user
    try:
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
        user_address = address
        address = reply_record.address
        message_id = _get_message_id_from_headers(mail["headers"])
        # make sure the relay user is premium
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
            return HttpResponse("Relay replies require a premium account", status=403)
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
        # match a Reply record, continue to treat this as a regular email from
        # an external sender to a relay user
        pass

    # if account flagged for abuse, early return
    if user_profile.is_flagged:
        log_email_dropped(reason="abuse_flag", mask=address)
        return HttpResponse("Address is temporarily disabled.")

    if not user_profile.user.is_active:
        log_email_dropped(reason="user_deactivated", mask=address)
        return HttpResponse("Account is deactivated.")

    # if address is set to block, early return
    if not address.enabled:
        incr_if_enabled("email_for_disabled_address", 1)
        address.num_blocked += 1
        address.save(update_fields=["num_blocked"])
        _record_receipt_verdicts(receipt, "disabled_alias")
        user_profile.last_engagement = datetime.now(UTC)
        user_profile.save()
        glean_logger().log_email_blocked(mask=address, reason="block_all")
        return HttpResponse("Address is temporarily disabled.")

    _record_receipt_verdicts(receipt, "active_alias")
    incr_if_enabled("email_for_active_address", 1)

    # if address is blocking list emails, and email is from list, early return
    if (
        address
        and address.block_list_emails
        and user_profile.has_premium
        and _check_email_from_list(mail["headers"])
    ):
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
        address.num_blocked += 1
        address.save(update_fields=["num_blocked"])
        user_profile.last_engagement = datetime.now(UTC)
        user_profile.save()
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
        return HttpResponse("Address is not accepting list emails.")

    # Collect new headers
    subject = common_headers.get("subject", "")
    destination_address = user_profile.user.email
    reply_address = get_reply_to_address()
    try:
        from_header = generate_from_header(from_address, to_address)
    except InvalidFromHeader:
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
        header_from = []
        for header in mail["headers"]:
            if header["name"].lower() == "from":
                header_from.append(header)
        info_logger.error(
            "generate_from_header",
            extra={
                "from_address": from_address,
                "source": mail["source"],
                "common_headers_from": common_headers["from"],
                "headers_from": header_from,
            },
        )
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
        return HttpResponse("Cannot parse the From address", status=503)

    # Get incoming email
    try:
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
    except ClientError as e:
        if e.response["Error"].get("Code", "") == "NoSuchKey":
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
            log_email_dropped(reason="content_missing", mask=address)
            return HttpResponse("Email not in S3", status=404)
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
        # we are returning a 503 so that SNS can retry the email processing
        return HttpResponse("Cannot fetch the message content from S3", status=503)

    # Handle developer overrides, logging
    dev_action = _get_developer_mode_action(address)
    if dev_action:
        if dev_action.new_destination_address:
            destination_address = dev_action.new_destination_address
        _log_dev_notification(
            "_handle_received: developer_mode", dev_action, message_json
        )

    # Convert to new email
    headers: OutgoingHeaders = {
        "Subject": subject,
        "From": from_header,
        "To": destination_address,
        "Reply-To": reply_address,
        "Resent-From": from_address,
    }
    sample_trackers = bool(sample_is_active("tracker_sample"))
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
    remove_level_one_trackers = bool(
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
    )
    (
        forwarded_email,
        issues,
        level_one_trackers_removed,
        has_html,
        has_text,
    ) = _convert_to_forwarded_email(
        incoming_email_bytes=incoming_email_bytes,
        headers=headers,
        to_address=to_address,
        from_address=from_address,
        language=user_profile.language,
        has_premium=user_profile.has_premium,
        sample_trackers=sample_trackers,
        remove_level_one_trackers=remove_level_one_trackers,
    )
    if has_html:
        incr_if_enabled("email_with_html_content", 1)
    if has_text:
        incr_if_enabled("email_with_text_content", 1)
    if issues:
        info_logger.info(
            "_handle_received: forwarding issues", extra={"issues": issues}
        )

    # Send new email
    try:
        ses_response = ses_send_raw_email(
            source_address=reply_address,
            destination_address=destination_address,
            message=forwarded_email,
        )
    except ClientError:
        # 503 service unavailable response to SNS so it can retry
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
        return HttpResponse("SES client error on Raw Email", status=503)

    message_id = ses_response["MessageId"]
    _store_reply_record(mail, message_id, address)

    user_profile.update_abuse_metric(
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
    )
    user_profile.last_engagement = datetime.now(UTC)
    user_profile.save()
    address.num_forwarded += 1
    address.last_used_at = datetime.now(UTC)
    if level_one_trackers_removed:
        address.num_level_one_trackers_blocked = (
            address.num_level_one_trackers_blocked or 0
        ) + level_one_trackers_removed
    address.save(
        update_fields=[
            "num_forwarded",
            "last_used_at",
            "block_list_emails",
            "num_level_one_trackers_blocked",
        ]
    )
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
    return HttpResponse("Sent email to final recipient.", status=200)


class DeveloperModeAction(NamedTuple):
    mask_id: str
    action: Literal["log", "simulate_complaint"] = "log"
    new_destination_address: str | None = None


def _get_verdict(receipt, verdict_type):
    return receipt[f"{verdict_type}Verdict"]["status"]


def _check_email_from_list(headers):
    for header in headers:
        if header["name"].lower().startswith("list-"):
            return True
    return False


def _record_receipt_verdicts(receipt, state):
    verdict_tags = []
    for key in sorted(receipt.keys()):
        if key.endswith("Verdict"):
            value = receipt[key]["status"]
            verdict_tags.append(f"{key}:{value}")
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
        elif key == "dmarcPolicy":
            value = receipt[key]
            verdict_tags.append(f"{key}:{value}")
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)


def _get_message_id_from_headers(headers):
    message_id = None
    for header in headers:
        if header["name"].lower() == "message-id":
            message_id = header["value"]
    return message_id


def _get_keys_from_headers(headers):
    in_reply_to = None
    for header in headers:
        if header["name"].lower() == "in-reply-to":
            in_reply_to = header["value"]
            message_id_bytes = get_message_id_bytes(in_reply_to)
            return derive_reply_keys(message_id_bytes)

        if header["name"].lower() == "references":
            message_ids = header["value"]
            for message_id in message_ids.split(" "):
                message_id_bytes = get_message_id_bytes(message_id)
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
                try:
                    # FIXME: calling code is likely to duplicate this query
                    _get_reply_record_from_lookup_key(lookup_key)
                    return lookup_key, encryption_key
                except Reply.DoesNotExist:
                    pass
            raise Reply.DoesNotExist
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
    raise ReplyHeadersNotFound


def _get_reply_record_from_lookup_key(lookup_key):
    lookup = b64_lookup_key(lookup_key)
    return Reply.objects.get(lookup=lookup)


def _strip_localpart_tag(address):
    [localpart, domain] = address.split("@")
    subaddress_parts = localpart.split("+")
    return f"{subaddress_parts[0]}@{domain}"


_TransportType = Literal["sns", "s3"]


def _get_email_bytes(
    message_json: AWS_SNSMessageJSON,
) -> tuple[bytes, _TransportType, float]:
    with Timer(logger=None) as load_timer:
        if "content" in message_json:
            # email content in sns message
            message_content = message_json["content"].encode("utf-8")
            transport: Literal["sns", "s3"] = "sns"
        else:
            # assume email content in S3
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
            message_content = get_message_content_from_s3(bucket, object_key)
            transport = "s3"
        histogram_if_enabled("relayed_email.size", len(message_content))
    load_time_s = round(load_timer.last, 3)
    return (message_content, transport, load_time_s)


def _get_developer_mode_action(
    mask: RelayAddress | DomainAddress,
) -> DeveloperModeAction | None:
    """Get the developer mode actions for this mask, if enabled."""

    if not (
        flag_is_active_in_task("developer_mode", mask.user)
        and "DEV:" in mask.description
    ):
        return None

    if "DEV:simulate_complaint" in mask.description:
        action = DeveloperModeAction(
            mask_id=mask.metrics_id,
            action="simulate_complaint",
            new_destination_address=f"complaint+{mask.metrics_id}@simulator.amazonses.com",
        )
    else:
        action = DeveloperModeAction(mask_id=mask.metrics_id, action="log")
    return action


def _log_dev_notification(
    log_message: str, dev_action: DeveloperModeAction, notification: dict[str, Any]
) -> None:
    """
    Log notification JSON

    This will log information beyond our privacy policy, so it should only be used on
    Relay staff accounts with prior permission.

    The notification JSON will be compressed, Ascii85-encoded with padding, and broken
    into 1024-bytes chunks. This will ensure it fits into GCP's log entry, which has a
    64KB limit per label value.
    """

    notification_gza85 = encode_dict_gza85(notification)
    total_parts = notification_gza85.count("\n") + 1
    log_group_id = str(uuid4())
    for partnum, part in enumerate(notification_gza85.splitlines()):
        info_logger.info(
            log_message,
            extra={
                "mask_id": dev_action.mask_id,
                "dev_action": dev_action.action,
                "log_group_id": log_group_id,
                "part": partnum,
                "parts": total_parts,
                "notification_gza85": part,
            },
        )


def _convert_to_forwarded_email(
    incoming_email_bytes: bytes,
    headers: OutgoingHeaders,
    to_address: str,
    from_address: str,
    language: str,
    has_premium: bool,
    sample_trackers: bool,
    remove_level_one_trackers: bool,
    now: datetime | None = None,
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
    """
    Convert an email (as bytes) to a forwarded email.

    Return is a tuple:
    - email - The forwarded email
    - issues - Any detected issues in conversion
    - level_one_trackers_removed (int) - Number of trackers removed
    - has_html - True if the email has an HTML representation
    - has_text - True if the email has a plain text representation
    """
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
    # python/typeshed issue 2418
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
    # policy.default.message_factory is EmailMessage
    if not isinstance(email, EmailMessage):
        raise TypeError("email must be type EmailMessage")

    # Replace headers in the original email
    header_issues = _replace_headers(email, headers)

    # Find and replace text content
    text_body = email.get_body("plain")
    text_content = None
    has_text = False
    if text_body:
        has_text = True
        if not isinstance(text_body, EmailMessage):
            raise TypeError("text_body must be type EmailMessage")
        text_content = text_body.get_content()
        new_text_content = _convert_text_content(text_content, to_address)
        text_body.set_content(new_text_content)

    # Find and replace HTML content
    html_body = email.get_body("html")
    level_one_trackers_removed = 0
    has_html = False
    if html_body:
        has_html = True
        if not isinstance(html_body, EmailMessage):
            raise TypeError("html_body must be type EmailMessage")
        html_content = html_body.get_content()
        new_content, level_one_trackers_removed = _convert_html_content(
            html_content,
            to_address,
            from_address,
            language,
            has_premium,
            sample_trackers,
            remove_level_one_trackers,
        )
        html_body.set_content(new_content, subtype="html")
    elif text_content:
        # Try to use the text content to generate HTML content
        html_content = urlize_and_linebreaks(text_content)
        new_content, level_one_trackers_removed = _convert_html_content(
            html_content,
            to_address,
            from_address,
            language,
            has_premium,
            sample_trackers,
            remove_level_one_trackers,
        )
        if not isinstance(text_body, EmailMessage):
            raise TypeError("text_body must be type EmailMessage")
        try:
            text_body.add_alternative(new_content, subtype="html")
        except TypeError as e:
            out = StringIO()
            _structure(email, fp=out)
            info_logger.error(
                "Adding HTML alternate failed",
                extra={"exception": str(e), "structure": out.getvalue()},
            )

    issues: EmailForwardingIssues = {}
    if header_issues:
        issues["headers"] = header_issues
    return (email, issues, level_one_trackers_removed, has_html, has_text)


def _replace_headers(
    email: EmailMessage, headers: OutgoingHeaders
) -> EmailHeaderIssues:
    """
    Replace the headers in email with new headers.

    This replaces headers in the passed email object, rather than returns an altered
    copy. The primary reason is that the Python email package can read an email with
    non-compliant headers or content, but can't write it. A read/write is required to
    create a copy that we then alter. This code instead alters the passed EmailMessage
    object, making header-specific changes in try / except statements.

    The other reason is the object size. An Email can be up to 10 MB, and we hope to
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
    nice to handle the non-compliant headers without crashing before we add a source of
    memory-related crashes.
    """
    # Look for headers to drop
    to_drop: list[str] = []
    replacements: set[str] = {_k.lower() for _k in headers.keys()}
    issues: EmailHeaderIssues = []

    # Detect non-compliant headers in incoming emails
    for header in email.keys():
        try:
            value = email[header]
        except Exception as e:
            issues.append(
                {"header": header, "direction": "in", "exception_on_read": repr(e)}
            )
            value = None
        if getattr(value, "defects", None):
            issues.append(
                {
                    "header": header,
                    "direction": "in",
                    "defect_count": len(value.defects),
                    "parsed_value": str(value),
                    "raw_value": str(value.as_raw),
                }
            )
        elif getattr(getattr(value, "_parse_tree", None), "all_defects", []):
            issues.append(
                {
                    "header": header,
                    "direction": "in",
                    "defect_count": len(value._parse_tree.all_defects),
                    "parsed_value": str(value),
                    "raw_value": str(value.as_raw),
                }
            )

    # Collect headers that will not be forwarded
    for header in email.keys():
        header_lower = header.lower()
        if (
            header_lower not in replacements
            and header_lower != "mime-version"
            and not header_lower.startswith("content-")
        ):
            to_drop.append(header)

    # Drop headers that should be dropped
    for header in to_drop:
        del email[header]

    # Replace the requested headers
    for header, value in headers.items():
        del email[header]
        try:
            email[header] = value.rstrip("\r\n")
        except Exception as e:
            issues.append(
                {
                    "header": header,
                    "direction": "out",
                    "exception_on_write": repr(e),
                    "value": value,
                }
            )
            continue
        try:
            parsed_value = email[header]
        except Exception as e:
            issues.append(
                {
                    "header": header,
                    "direction": "out",
                    "exception_on_write": repr(e),
                    "value": value,
                }
            )
            continue
        if parsed_value.defects:
            issues.append(
                {
                    "header": header,
                    "direction": "out",
                    "defect_count": len(parsed_value.defects),
                    "parsed_value": str(parsed_value),
                    "raw_value": str(parsed_value.as_raw),
                },
            )

    return issues


def _convert_html_content(
    html_content: str,
    to_address: str,
    from_address: str,
    language: str,
    has_premium: bool,
    sample_trackers: bool,
    remove_level_one_trackers: bool,
    now: datetime | None = None,
) -> tuple[str, int]:
    # frontend expects a timestamp in milliseconds
    now = now or datetime.now(UTC)
    datetime_now_ms = int(now.timestamp() * 1000)

    # scramble alias so that clients don't recognize it
    # and apply default link styles
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)

    # sample tracker numbers
    if sample_trackers:
        count_all_trackers(html_content)

    tracker_report_link = ""
    removed_count = 0
    if remove_level_one_trackers:
        html_content, tracker_details = remove_trackers(
            html_content, from_address, datetime_now_ms
        )
        removed_count = tracker_details["tracker_removed"]
        tracker_report_details = {
            "sender": from_address,
            "received_at": datetime_now_ms,
            "trackers": tracker_details["level_one"]["trackers"],
        }
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
            tracker_report_details
        )

    wrapped_html = wrap_html_email(
        original_html=html_content,
        language=language,
        has_premium=has_premium,
        display_email=display_email,
        tracker_report_link=tracker_report_link,
        num_level_one_email_trackers_removed=removed_count,
    )
    return wrapped_html, removed_count


def _convert_text_content(text_content: str, to_address: str) -> str:
    relay_header_text = (
        "This email was sent to your alias "
        f"{to_address}. To stop receiving emails sent to this alias, "
        "update the forwarding settings in your dashboard.\n"
        "---Begin Email---\n"
    )
    wrapped_text = relay_header_text + text_content
    return wrapped_text


def _build_reply_requires_premium_email(
    from_address: str,
    reply_record: Reply,
    message_id: str | None,
    decrypted_metadata: dict[str, Any] | None,
) -> EmailMessage:
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
    # will forward.  So, tell the user we forwarded it.
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
    sender: str | None = ""
    if decrypted_metadata is not None:
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
    ctx = {
        "sender": sender or "",
        "forwarded": forwarded,
        "SITE_ORIGIN": settings.SITE_ORIGIN,
    }
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)

    # Create the message
    msg = EmailMessage()
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
    msg["From"] = get_reply_to_address()
    msg["To"] = from_address
    if message_id:
        msg["In-Reply-To"] = message_id
        msg["References"] = message_id
    msg.set_content(text_body)
    msg.add_alternative(html_body, subtype="html")
    return msg


def _set_forwarded_first_reply(profile):
    profile.forwarded_first_reply = True
    profile.save()


def _send_reply_requires_premium_email(
    from_address: str,
    reply_record: Reply,
    message_id: str | None,
    decrypted_metadata: dict[str, Any] | None,
) -> None:
    msg = _build_reply_requires_premium_email(
        from_address, reply_record, message_id, decrypted_metadata
    )
    try:
        ses_send_raw_email(
            source_address=get_reply_to_address(premium=False),
            destination_address=from_address,
            message=msg,
        )
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
        # So, updated the DB.
        _set_forwarded_first_reply(reply_record.address.user.profile)
    except ClientError as e:
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
    incr_if_enabled("free_user_reply_attempt", 1)


def _reply_allowed(
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
):
    stripped_from_address = _strip_localpart_tag(from_address)
    reply_record_email = reply_record.address.user.email
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
    if (from_address == reply_record_email) or (
        stripped_from_address == stripped_reply_record_address
    ):
        # This is a Relay user replying to an external sender;

        if not reply_record.profile.user.is_active:
            return False

        if reply_record.profile.is_flagged:
            return False

        if reply_record.owner_has_premium:
            return True

        # if we haven't forwarded a first reply for this user, return True to allow
        # this first reply
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
        _send_reply_requires_premium_email(
            from_address, reply_record, message_id, decrypted_metadata
        )
        return allow_first_reply
    else:
        # The From: is not a Relay user, so make sure this is a reply *TO* a
        # premium Relay user
        try:
            address = _get_address(to_address)
            if address.user.profile.has_premium:
                return True
        except ObjectDoesNotExist:
            return False
    incr_if_enabled("free_user_reply_attempt", 1)
    return False


def _handle_reply(
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
) -> HttpResponse:
    """
    Handle a reply from a Relay user to an external email.

    Returns (may be incomplete):
    * 200 if the reply was sent
    * 400 if the In-Reply-To and References headers are missing, none of the References
      headers are a reply record, or the SES client raises an error
    * 403 if the Relay user is not allowed to reply
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
      the database
    * 503 if the S3 client returns an error (other than not found), or the SES client
      returns an error

    TODO: Return a more appropriate status object (see _handle_received)
    TODO: Document metrics emitted
    """
    mail = message_json["mail"]
    try:
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
    except ReplyHeadersNotFound:
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
        return HttpResponse("No In-Reply-To header", status=400)

    try:
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
    except Reply.DoesNotExist:
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)

    address = reply_record.address
    message_id = _get_message_id_from_headers(mail["headers"])
    decrypted_metadata = json.loads(
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
    )
    if not _reply_allowed(
        from_address, to_address, reply_record, message_id, decrypted_metadata
    ):
        log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
        return HttpResponse("Relay replies require a premium account", status=403)

    outbound_from_address = address.full_address
    incr_if_enabled("reply_email", 1)
    subject = mail["commonHeaders"].get("subject", "")
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
    headers: OutgoingHeaders = {
        "Subject": subject,
        "From": outbound_from_address,
        "To": to_address,
        "Reply-To": outbound_from_address,
    }

    try:
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
    except ClientError as e:
        if e.response["Error"].get("Code", "") == "NoSuchKey":
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
            log_email_dropped(reason="content_missing", mask=address, is_reply=True)
            return HttpResponse("Email not in S3", status=404)
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
        log_email_dropped(
            reason="error_storage", mask=address, is_reply=True, can_retry=True
        )
        # we are returning a 500 so that SNS can retry the email processing
        return HttpResponse("Cannot fetch the message content from S3", status=503)

    email = message_from_bytes(email_bytes, policy=relay_policy)
    if not isinstance(email, EmailMessage):
        raise TypeError("email must be type EmailMessage")

    # Convert to a reply email
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
    _replace_headers(email, headers)

    try:
        ses_send_raw_email(
            source_address=outbound_from_address,
            destination_address=to_address,
            message=email,
        )
    except ClientError:
        log_email_dropped(reason="error_sending", mask=address, is_reply=True)
        return HttpResponse("SES client error", status=400)

    reply_record.increment_num_replied()
    profile = address.user.profile
    profile.update_abuse_metric(replied=True)
    profile.last_engagement = datetime.now(UTC)
    profile.save()
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
    return HttpResponse("Sent email to final recipient.", status=200)


def _get_domain_address(
    local_portion: str, domain_portion: str, create: bool = True
) -> DomainAddress:
    """
    Find or create the DomainAddress for the parts of an email address.

    If the domain_portion is for a valid subdomain, and create=True, a new DomainAddress
    will be created and returned. If create=False, DomainAddress.DoesNotExist is raised.

    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.

    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
    """

    [address_subdomain, address_domain] = domain_portion.split(".", 1)
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
        if create:
            incr_if_enabled("email_for_not_supported_domain", 1)
        raise ObjectDoesNotExist("Address does not exist")
    try:
        with transaction.atomic():
            locked_profile = Profile.objects.select_for_update().get(
                subdomain=address_subdomain
            )
            domain_numerical = get_domain_numerical(address_domain)
            # filter DomainAddress because it may not exist
            # which will throw an error with get()
            domain_address = DomainAddress.objects.filter(
                user=locked_profile.user, address=local_portion, domain=domain_numerical
            ).first()
            if domain_address is None:
                if not create:
                    raise DomainAddress.DoesNotExist()
                # TODO: Consider flows when a user generating alias on a fly
                # was unable to receive an email due to user no longer being a
                # premium user as seen in exception thrown on make_domain_address
                domain_address = DomainAddress.make_domain_address(
                    locked_profile.user, local_portion, True
                )
                glean_logger().log_email_mask_created(
                    mask=domain_address,
                    created_by_api=False,
                )
            domain_address.last_used_at = datetime.now(UTC)
            domain_address.save()
            return domain_address
    except Profile.DoesNotExist as e:
        if create:
            incr_if_enabled("email_for_dne_subdomain", 1)
        raise e


def _get_address(address: str, create: bool = True) -> RelayAddress | DomainAddress:
    """
    Find or create the RelayAddress or DomainAddress for an email address.

    If an unknown email address is for a valid subdomain, and create is True,
    a new DomainAddress will be created.

    On failure, raises exception based on Django's ObjectDoesNotExist:
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
    * DomainAddress.DoesNotExist - looks like unknown DomainAddress, create is False
    * ObjectDoesNotExist - Unknown domain
    """

    local_portion, domain_portion = address.split("@")
    local_address = local_portion.lower()
    domain = domain_portion.lower()

    # if the domain is not the site's 'top' relay domain,
    # it may be for a user's subdomain
    email_domains = get_domains_from_settings().values()
    if domain not in email_domains:
        return _get_domain_address(local_address, domain, create)

    # the domain is the site's 'top' relay domain, so look up the RelayAddress
    try:
        domain_numerical = get_domain_numerical(domain)
        relay_address = RelayAddress.objects.get(
            address=local_address, domain=domain_numerical
        )
        return relay_address
    except RelayAddress.DoesNotExist as e:
        if not create:
            raise e
        try:
            DeletedAddress.objects.get(
                address_hash=address_hash(local_address, domain=domain)
            )
            incr_if_enabled("email_for_deleted_address", 1)
            # TODO: create a hard bounce receipt rule in SES
        except DeletedAddress.DoesNotExist:
            incr_if_enabled("email_for_unknown_address", 1)
        except DeletedAddress.MultipleObjectsReturned:
            # not sure why this happens on stage but let's handle it
            incr_if_enabled("email_for_deleted_address_multiple", 1)
        raise e


def _get_address_if_exists(address: str) -> RelayAddress | DomainAddress | None:
    """Get the matching RelayAddress or DomainAddress, or None if it doesn't exist."""
    try:
        return _get_address(address, create=False)
    except (RelayAddress.DoesNotExist, Profile.DoesNotExist, ObjectDoesNotExist):
        return None


def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
    """
    Handle an AWS SES bounce notification.

    For more information, see:
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object

    Returns:
    * 404 response if any email address does not match a user,
    * 200 response if all match or none are given

    Emits a counter metric "email_bounce" with these tags:
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'

    Emits an info log "bounce_notification", same data as metric, plus:
    * bounce_action: 'action' from bounced recipient data, or None
    * bounce_status: 'status' from bounced recipient data, or None
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
    * bounce_extra: Extra data from bounce_recipient data, if any
    * domain: User's real email address domain, if an address was given
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
    """
    bounce = message_json.get("bounce", {})
    bounce_type = bounce.get("bounceType", "none")
    bounce_subtype = bounce.get("bounceSubType", "none")
    bounced_recipients = bounce.get("bouncedRecipients", [])

    now = datetime.now(UTC)
    bounce_data = []
    for recipient in bounced_recipients:
        recipient_address = recipient.pop("emailAddress", None)
        data = {
            "bounce_type": bounce_type,
            "bounce_subtype": bounce_subtype,
            "bounce_action": recipient.pop("action", ""),
            "bounce_status": recipient.pop("status", ""),
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
            "user_match": "no_address",
            "relay_action": "no_action",
        }
        if recipient:
            data["bounce_extra"] = recipient.copy()
        bounce_data.append(data)

        if recipient_address is None:
            continue

        recipient_address = parseaddr(recipient_address)[1]
        recipient_domain = recipient_address.split("@")[1]
        data["domain"] = recipient_domain

        try:
            user = User.objects.get(email=recipient_address)
            profile = user.profile
            data["user_match"] = "found"
            if (fxa := profile.fxa) and profile.metrics_enabled:
                data["fxa_id"] = fxa.uid
            else:
                data["fxa_id"] = ""
        except User.DoesNotExist:
            # TODO: handle bounce for a user who no longer exists
            # add to SES account-wide suppression list?
            data["user_match"] = "missing"
            continue

        action = None
        if "spam" in data["bounce_diagnostic"].lower():
            # if an email bounced as spam, set to auto block spam for this user
            # and DON'T set them into bounce pause state
            action = "auto_block_spam"
            profile.auto_block_spam = True
        elif bounce_type == "Permanent":
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
            action = "hard_bounce"
            profile.last_hard_bounce = now
        elif bounce_type == "Transient":
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
            action = "soft_bounce"
            profile.last_soft_bounce = now
        if action:
            data["relay_action"] = action
            profile.save()

    if not bounce_data:
        # Data when there are no identified recipients
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]

    for data in bounce_data:
        tags = {
            "bounce_type": bounce_type,
            "bounce_subtype": bounce_subtype,
            "user_match": data["user_match"],
            "relay_action": data["relay_action"],
        }
        incr_if_enabled(
            "email_bounce",
            1,
            tags=[generate_tag(key, val) for key, val in tags.items()],
        )
        info_logger.info("bounce_notification", extra=data)

    if any(data["user_match"] == "missing" for data in bounce_data):
        return HttpResponse("Address does not exist", status=404)
    return HttpResponse("OK", status=200)


def _build_disabled_mask_for_spam_email(
    mask: RelayAddress | DomainAddress,
) -> EmailMessage:
    ctx = {"mask": mask.full_address, "SITE_ORIGIN": settings.SITE_ORIGIN}
    html_body = render_to_string("emails/disabled_mask_for_spam.html", ctx)
    text_body = render_to_string("emails/disabled_mask_for_spam.txt", ctx)

    # Create the message
    msg = EmailMessage()
    msg["Subject"] = ftl_bundle.format("relay-deactivated-mask-email-subject")
    msg["From"] = settings.RELAY_FROM_ADDRESS
    msg["To"] = mask.user.email
    msg.set_content(text_body)
    msg.add_alternative(html_body, subtype="html")
    return msg


def _send_disabled_mask_for_spam_email(mask: RelayAddress | DomainAddress) -> None:
    msg = _build_disabled_mask_for_spam_email(mask)
    if not settings.RELAY_FROM_ADDRESS:
        raise ValueError(
            "Must set settings.RELAY_FROM_ADDRESS to send disabled_mask_for_spam email."
        )
    try:
        ses_send_raw_email(
            source_address=settings.RELAY_FROM_ADDRESS,
            destination_address=mask.user.email,
            message=msg,
        )
    except ClientError as e:
        logger.error("send_disabled_mask_ses_client_error", extra=e.response["Error"])
    incr_if_enabled("send_disabled_mask_email", 1)


def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
    """
    Handle an AWS SES complaint notification.

    This looks for Relay users in the complainedRecipients (real email address)
    and the From: header (mask address). We expect both to match the same Relay user,
    and return a 200. If one or the other do not match, a 404 is returned, and errors
    may be logged.

    The first time a user complains, this sets the user's auto_block_spam flag to True.

    The second time a user complains, this disables the mask thru which the spam mail
    was forwarded, and sends an email to the user to notify them the mask is disabled
    and can be re-enabled on their dashboard.

    For more information on the complaint notification, see:
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object

    Returns:
    * 404 response if any email address does not match a user,
    * 200 response if all match or none are given

    Emits a counter metric "email_complaint" with these tags:
    * complaint_subtype: 'onaccountsuppressionlist', or 'none' if omitted
    * complaint_feedback - feedback enumeration from ISP (usually 'abuse') or 'none'
    * user_match: 'found' or 'no_recipients'
    * relay_action: 'no_action', 'auto_block_spam', or 'disable_mask'

    Emits an info log "complaint_notification", same data as metric, plus:
    * complaint_user_agent - identifies the client used to file the complaint
    * complaint_extra - Extra data from complainedRecipients data, if any
    * domain - User's domain, if an address was given
    * found_in - "complained_recipients" (real email), "from_header" (email mask),
      or "all" (matching records found in both)
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
    * mask_match - "found" if "From" header contains an email mask, or "not_found"
    """
    complaint_data = _get_complaint_data(message_json)
    complainers, unknown_count = _gather_complainers(complaint_data)

    # Reduce future complaints from complaining Relay users
    actions: list[ComplaintAction] = []
    for complainer in complainers:
        action = _reduce_future_complaints(complainer)
        actions.append(action)

        if (
            flag_is_active_in_task("developer_mode", complainer["user"])
            and action.mask_id
        ):
            _log_dev_notification(
                "_handle_complaint: developer_mode",
                DeveloperModeAction(mask_id=action.mask_id, action="log"),
                message_json,
            )

    # Log complaint and actions taken
    if not actions:
        # Log the complaint but that no action was taken
        actions.append(ComplaintAction(user_match="no_recipients"))
    for action in actions:
        tags = [
            generate_tag(key, val)
            for key, val in {
                "complaint_subtype": complaint_data.subtype or "none",
                "complaint_feedback": complaint_data.feedback_type or "none",
                "user_match": action.user_match,
                "relay_action": action.relay_action,
            }.items()
        ]
        incr_if_enabled("email_complaint", tags=tags)

        log_extra = {
            "complaint_subtype": complaint_data.subtype or None,
            "complaint_user_agent": complaint_data.user_agent or None,
            "complaint_feedback": complaint_data.feedback_type or None,
        }
        log_extra.update(
            {
                key: value
                for key, value in action._asdict().items()
                if (value is not None and key != "mask_id")
            }
        )
        info_logger.info("complaint_notification", extra=log_extra)

    if unknown_count:
        return HttpResponse("Address does not exist", status=404)
    return HttpResponse("OK", status=200)


class RawComplaintData(NamedTuple):
    complained_recipients: list[tuple[str, dict[str, Any]]]
    from_addresses: list[str]
    subtype: str
    user_agent: str
    feedback_type: str


def _get_complaint_data(message_json: AWS_SNSMessageJSON) -> RawComplaintData:
    """
    Extract complaint data from an AWS SES Complaint Notification.

    This extracts only the data used by _handle_complaint(). It also works on
    complaint events, which have a similar structure and the same data needed
    by _handle_complaint.

    For more information on the complaint notification, see:
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
    """
    complaint = message_json["complaint"]

    T = TypeVar("T")

    def get_or_log(
        key: str, source: dict[str, T], data_type: type[T]
    ) -> tuple[T, bool]:
        """Get a value from a dictionary, or log if not found"""
        if key in source:
            return source[key], True
        logger.error(
            "_get_complaint_data: Unexpected message format",
            extra={"missing_key": key, "found_keys": ",".join(sorted(source.keys()))},
        )
        return data_type(), False

    raw_recipients, has_cr = get_or_log("complainedRecipients", complaint, list)
    complained_recipients = []
    no_entries = True
    for entry in raw_recipients:
        no_entries = False
        raw_email_address, has_email = get_or_log("emailAddress", entry, str)
        if has_email:
            email_address = parseaddr(raw_email_address)[1]
            extra = {
                key: value for key, value in entry.items() if key != "emailAddress"
            }
            complained_recipients.append((email_address, extra))
    if has_cr and no_entries:
        logger.error("_get_complaint_data: Empty complainedRecipients")

    mail, has_mail = get_or_log("mail", message_json, dict)
    if has_mail:
        commonHeaders, has_ch = get_or_log("commonHeaders", mail, dict)
    else:
        commonHeaders, has_ch = {}, False
    if has_ch:
        raw_from_addresses, _ = get_or_log("from", commonHeaders, list)
    else:
        raw_from_addresses = []
    from_addresses = [parseaddr(addr)[1] for addr in raw_from_addresses]

    # Only present when set
    feedback_type = complaint.get("complaintFeedbackType", "")
    # Only present when destination is on account suppression list
    subtype = complaint.get("complaintSubType", "")
    # Only present for feedback reports
    user_agent = complaint.get("userAgent", "")

    return RawComplaintData(
        complained_recipients, from_addresses, subtype, user_agent, feedback_type
    )


class Complainer(TypedDict):
    user: User
    found_in: Literal["complained_recipients", "from_header", "all"]
    domain: str
    extra: dict[str, Any] | None
    masks: list[RelayAddress | DomainAddress]


def _gather_complainers(
    complaint_data: RawComplaintData,
) -> tuple[list[Complainer], int]:
    """
    Fetch Relay Users and masks from the complaint data.

    This matches data from an AWS SES Complaint Notification (as extracted by
    _get_complaint_data()) to the Relay database, and returns the Users,
    RelayAddresses, and DomainAddresses, as well as status and extra data.

    If the complaint came from the AWS SES complaint simulator, detect
    developer_mode and move forward with the developer's User data.
    """

    users: dict[int, Complainer] = {}
    unknown_complainer_count = 0
    for email_address, extra_data in complaint_data.complained_recipients:
        local, domain = email_address.split("@", 1)

        # If the complainer is the AWS SES complaint simulation, assume that
        # it was send by a user with the developer_mode flag. Look for
        # a mask that matches the embedded mask metrics_id, and use
        # the related user's email instead of the AWS simulator address.
        # See docs/developer_mode.md
        if domain == "simulator.amazonses.com" and local.startswith("complaint+"):
            mask_metrics_id = local.removeprefix("complaint+")
            mask = _get_mask_by_metrics_id(mask_metrics_id)
            if mask:
                email_address = mask.user.email
                domain = mask.user.email.split("@")[1]

        try:
            user = User.objects.get(email=email_address)
        except User.DoesNotExist:
            logger.error("_gather_complainers: unknown complainedRecipient")
            unknown_complainer_count += 1
            continue

        if user.id in users:
            logger.error("_gather_complainers: complainer appears twice")
            continue

        users[user.id] = {
            "user": user,
            "found_in": "complained_recipients",
            "domain": domain,
            "extra": extra_data or None,
            "masks": [],
        }

    # Collect From: addresses and their users
    unknown_sender_count = 0
    for email_address in complaint_data.from_addresses:
        mask = _get_address_if_exists(email_address)
        if not mask:
            logger.error("_gather_complainers: unknown mask, maybe deleted?")
            unknown_sender_count += 1
            continue

        if mask.user.id not in users:
            # Add mask-only entry to users
            users[mask.user.id] = {
                "user": mask.user,
                "found_in": "from_header",
                "domain": mask.user.email.split("@")[1],
                "extra": None,
                "masks": [mask],
            }
            continue

        user_data = users[mask.user.id]
        if mask in user_data["masks"]:
            logger.error("_gather_complainers: mask appears twice")
            continue

        user_data["masks"].append(mask)
        if user_data["found_in"] in ("all", "complained_recipients"):
            user_data["found_in"] = "all"
        else:
            logger.error("_gather_complainers: no complainer, multi-mask")

    return (list(users.values()), unknown_complainer_count + unknown_sender_count)


def _get_mask_by_metrics_id(metrics_id: str) -> RelayAddress | DomainAddress | None:
    """Look up a mask by metrics ID, or None if not found."""
    if not metrics_id or metrics_id[0] not in ("R", "D"):
        return None
    mask_type_id = metrics_id[0]
    mask_raw_id = metrics_id[1:]
    try:
        mask_id = int(mask_raw_id)
    except ValueError:
        return None  # ID is not an int, do not try to match to Relay mask

    if mask_type_id == "R":
        try:
            return RelayAddress.objects.get(id=mask_id)
        except RelayAddress.DoesNotExist:
            return None
    try:
        return DomainAddress.objects.get(id=mask_id)
    except DomainAddress.DoesNotExist:
        return None


class ComplaintAction(NamedTuple):
    user_match: Literal["found", "no_recipients"]
    relay_action: Literal["no_action", "auto_block_spam", "disable_mask"] = "no_action"
    mask_match: Literal["found", "not_found"] = "not_found"
    mask_id: str | None = None
    found_in: Literal["complained_recipients", "from_header", "all"] | None = None
    fxa_id: str | None = None
    domain: str | None = None
    complaint_extra: str | None = None


def _reduce_future_complaints(complainer: Complainer) -> ComplaintAction:
    """Take action to reduce future complaints from complaining user."""

    user = complainer["user"]
    mask_match: Literal["found", "not_found"] = "not_found"
    relay_action: Literal["no_action", "auto_block_spam", "disable_mask"] = "no_action"
    mask_id = None

    if not user.profile.auto_block_spam:
        relay_action = "auto_block_spam"
        user.profile.auto_block_spam = True
        user.profile.save()

    for mask in complainer["masks"]:
        mask_match = "found"
        mask_id = mask.metrics_id
        if (
            flag_is_active_in_task("disable_mask_on_complaint", user)
            and mask.enabled
            and relay_action != "auto_block_spam"
        ):
            relay_action = "disable_mask"
            mask.enabled = False
            mask.save()
            _send_disabled_mask_for_spam_email(mask)

    return ComplaintAction(
        user_match="found",
        relay_action=relay_action,
        mask_match=mask_match,
        mask_id=mask_id,
        fxa_id=user.profile.metrics_fxa_id,
        domain=complainer["domain"],
        found_in=complainer["found_in"],
        complaint_extra=(
            json.dumps(complainer["extra"]) if complainer["extra"] else None
        ),
    )


_WAFFLE_FLAGS_INITIALIZED = False


def init_waffle_flags() -> None:
    """Initialize waffle flags for email tasks"""
    global _WAFFLE_FLAGS_INITIALIZED
    if _WAFFLE_FLAGS_INITIALIZED:
        return

    flags: list[tuple[str, str]] = [
        (
            "disable_mask_on_complaint",
            "MPP-3119: When a Relay user marks an email as spam, disable the mask.",
        ),
        (
            "developer_mode",
            "MPP-3932: Enable logging and overrides for Relay developers.",
        ),
    ]
    waffle_flag_table = get_waffle_flag_model().objects
    for name, note in flags:
        waffle_flag_table.get_or_create(name=name, defaults={"note": note})
    _WAFFLE_FLAGS_INITIALIZED = True
