def _handle_received()

in emails/views.py [0:0]


def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
    """
    Handle an AWS SES received notification.

    For more information, see:
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html

    Returns (may be incomplete):
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
      flagged for abuse, the email is under a bounce pause, the email was suppressed
      for spam, the list email was blocked, or the noreply address was the recipient.
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
      the email failed DMARC with reject policy, or the email is a reply chain to a
      non-premium user.
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
      "CC", or "BCC" fields, or the Relay address is not in the database.
    * 503 if the "From" address is malformed, the S3 client returned an error different
      from "not found", or the SES client fails

    And many other returns conditions if the email is a reply. The HTTP returns are an
    artifact from an earlier time when emails were sent to a webhook. Currently,
    production instead pulls events from a queue.

    TODO: Return a more appropriate status object
    TODO: Document the metrics emitted
    """
    mail = message_json["mail"]
    if "commonHeaders" not in mail:
        logger.error("SNS message without commonHeaders")
        return HttpResponse(
            "Received SNS notification without commonHeaders.", status=400
        )
    common_headers = mail["commonHeaders"]
    receipt = message_json["receipt"]

    _record_receipt_verdicts(receipt, "all")
    to_address = _get_relay_recipient_from_message_json(message_json)
    if to_address is None:
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
        return HttpResponse("Address does not exist", status=404)

    _record_receipt_verdicts(receipt, "relay_recipient")
    from_addresses = parse_email_header(common_headers["from"][0])
    if not from_addresses:
        info_logger.error(
            "_handle_received: no from address",
            extra={
                "source": mail["source"],
                "common_headers_from": common_headers["from"],
            },
        )
        return HttpResponse("Unable to parse From address", status=400)
    from_address = from_addresses[0][1]

    try:
        [to_local_portion, to_domain_portion] = to_address.split("@")
    except ValueError:
        # TODO: Add metric
        return HttpResponse("Malformed to field.", status=400)

    if to_local_portion.lower() == "noreply":
        incr_if_enabled("email_for_noreply_address", 1)
        return HttpResponse("noreply address is not supported.")
    try:
        # FIXME: this ambiguous return of either
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
        # up a bit.
        address = _get_address(to_address)
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
        user_profile = address.user.profile
    except (
        ObjectDoesNotExist,
        CannotMakeAddressException,
        DeletedAddress.MultipleObjectsReturned,
    ):
        if to_local_portion.lower() == "replies":
            response = _handle_reply(from_address, message_json, to_address)
        else:
            response = HttpResponse("Address does not exist", status=404)
        return response

    _record_receipt_verdicts(receipt, "valid_user")
    # if this is spam and the user is set to auto-block spam, early return
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
        log_email_dropped(reason="auto_block_spam", mask=address)
        return HttpResponse("Address rejects spam.")

    if _get_verdict(receipt, "dmarc") == "FAIL":
        policy = receipt.get("dmarcPolicy", "none")
        # TODO: determine action on dmarcPolicy "quarantine"
        if policy == "reject":
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
            incr_if_enabled(
                "email_suppressed_for_dmarc_failure",
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
            )
            return HttpResponse("DMARC failure, policy is reject", status=400)

    # if this user is over bounce limits, early return
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
    if bounce_paused:
        _record_receipt_verdicts(receipt, "user_bounce_paused")
        incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
        )
        log_email_dropped(reason=reason, mask=address)
        return HttpResponse("Address is temporarily disabled.")

    # check if this is a reply from an external sender to a Relay user
    try:
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
        user_address = address
        address = reply_record.address
        message_id = _get_message_id_from_headers(mail["headers"])
        # make sure the relay user is premium
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
            return HttpResponse("Relay replies require a premium account", status=403)
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
        # match a Reply record, continue to treat this as a regular email from
        # an external sender to a relay user
        pass

    # if account flagged for abuse, early return
    if user_profile.is_flagged:
        log_email_dropped(reason="abuse_flag", mask=address)
        return HttpResponse("Address is temporarily disabled.")

    if not user_profile.user.is_active:
        log_email_dropped(reason="user_deactivated", mask=address)
        return HttpResponse("Account is deactivated.")

    # if address is set to block, early return
    if not address.enabled:
        incr_if_enabled("email_for_disabled_address", 1)
        address.num_blocked += 1
        address.save(update_fields=["num_blocked"])
        _record_receipt_verdicts(receipt, "disabled_alias")
        user_profile.last_engagement = datetime.now(UTC)
        user_profile.save()
        glean_logger().log_email_blocked(mask=address, reason="block_all")
        return HttpResponse("Address is temporarily disabled.")

    _record_receipt_verdicts(receipt, "active_alias")
    incr_if_enabled("email_for_active_address", 1)

    # if address is blocking list emails, and email is from list, early return
    if (
        address
        and address.block_list_emails
        and user_profile.has_premium
        and _check_email_from_list(mail["headers"])
    ):
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
        address.num_blocked += 1
        address.save(update_fields=["num_blocked"])
        user_profile.last_engagement = datetime.now(UTC)
        user_profile.save()
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
        return HttpResponse("Address is not accepting list emails.")

    # Collect new headers
    subject = common_headers.get("subject", "")
    destination_address = user_profile.user.email
    reply_address = get_reply_to_address()
    try:
        from_header = generate_from_header(from_address, to_address)
    except InvalidFromHeader:
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
        header_from = []
        for header in mail["headers"]:
            if header["name"].lower() == "from":
                header_from.append(header)
        info_logger.error(
            "generate_from_header",
            extra={
                "from_address": from_address,
                "source": mail["source"],
                "common_headers_from": common_headers["from"],
                "headers_from": header_from,
            },
        )
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
        return HttpResponse("Cannot parse the From address", status=503)

    # Get incoming email
    try:
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
    except ClientError as e:
        if e.response["Error"].get("Code", "") == "NoSuchKey":
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
            log_email_dropped(reason="content_missing", mask=address)
            return HttpResponse("Email not in S3", status=404)
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
        # we are returning a 503 so that SNS can retry the email processing
        return HttpResponse("Cannot fetch the message content from S3", status=503)

    # Handle developer overrides, logging
    dev_action = _get_developer_mode_action(address)
    if dev_action:
        if dev_action.new_destination_address:
            destination_address = dev_action.new_destination_address
        _log_dev_notification(
            "_handle_received: developer_mode", dev_action, message_json
        )

    # Convert to new email
    headers: OutgoingHeaders = {
        "Subject": subject,
        "From": from_header,
        "To": destination_address,
        "Reply-To": reply_address,
        "Resent-From": from_address,
    }
    sample_trackers = bool(sample_is_active("tracker_sample"))
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
    remove_level_one_trackers = bool(
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
    )
    (
        forwarded_email,
        issues,
        level_one_trackers_removed,
        has_html,
        has_text,
    ) = _convert_to_forwarded_email(
        incoming_email_bytes=incoming_email_bytes,
        headers=headers,
        to_address=to_address,
        from_address=from_address,
        language=user_profile.language,
        has_premium=user_profile.has_premium,
        sample_trackers=sample_trackers,
        remove_level_one_trackers=remove_level_one_trackers,
    )
    if has_html:
        incr_if_enabled("email_with_html_content", 1)
    if has_text:
        incr_if_enabled("email_with_text_content", 1)
    if issues:
        info_logger.info(
            "_handle_received: forwarding issues", extra={"issues": issues}
        )

    # Send new email
    try:
        ses_response = ses_send_raw_email(
            source_address=reply_address,
            destination_address=destination_address,
            message=forwarded_email,
        )
    except ClientError:
        # 503 service unavailable response to SNS so it can retry
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
        return HttpResponse("SES client error on Raw Email", status=503)

    message_id = ses_response["MessageId"]
    _store_reply_record(mail, message_id, address)

    user_profile.update_abuse_metric(
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
    )
    user_profile.last_engagement = datetime.now(UTC)
    user_profile.save()
    address.num_forwarded += 1
    address.last_used_at = datetime.now(UTC)
    if level_one_trackers_removed:
        address.num_level_one_trackers_blocked = (
            address.num_level_one_trackers_blocked or 0
        ) + level_one_trackers_removed
    address.save(
        update_fields=[
            "num_forwarded",
            "last_used_at",
            "block_list_emails",
            "num_level_one_trackers_blocked",
        ]
    )
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
    return HttpResponse("Sent email to final recipient.", status=200)