in emails/views.py [0:0]
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
"""
Handle an AWS SES received notification.
For more information, see:
https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
Returns (may be incomplete):
* 200 if the email was sent, the Relay address is disabled, the Relay user is
flagged for abuse, the email is under a bounce pause, the email was suppressed
for spam, the list email was blocked, or the noreply address was the recipient.
* 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
the email failed DMARC with reject policy, or the email is a reply chain to a
non-premium user.
* 404 if an S3-stored email was not found, no Relay address was found in the "To",
"CC", or "BCC" fields, or the Relay address is not in the database.
* 503 if the "From" address is malformed, the S3 client returned an error different
from "not found", or the SES client fails
And many other returns conditions if the email is a reply. The HTTP returns are an
artifact from an earlier time when emails were sent to a webhook. Currently,
production instead pulls events from a queue.
TODO: Return a more appropriate status object
TODO: Document the metrics emitted
"""
mail = message_json["mail"]
if "commonHeaders" not in mail:
logger.error("SNS message without commonHeaders")
return HttpResponse(
"Received SNS notification without commonHeaders.", status=400
)
common_headers = mail["commonHeaders"]
receipt = message_json["receipt"]
_record_receipt_verdicts(receipt, "all")
to_address = _get_relay_recipient_from_message_json(message_json)
if to_address is None:
incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
return HttpResponse("Address does not exist", status=404)
_record_receipt_verdicts(receipt, "relay_recipient")
from_addresses = parse_email_header(common_headers["from"][0])
if not from_addresses:
info_logger.error(
"_handle_received: no from address",
extra={
"source": mail["source"],
"common_headers_from": common_headers["from"],
},
)
return HttpResponse("Unable to parse From address", status=400)
from_address = from_addresses[0][1]
try:
[to_local_portion, to_domain_portion] = to_address.split("@")
except ValueError:
# TODO: Add metric
return HttpResponse("Malformed to field.", status=400)
if to_local_portion.lower() == "noreply":
incr_if_enabled("email_for_noreply_address", 1)
return HttpResponse("noreply address is not supported.")
try:
# FIXME: this ambiguous return of either
# RelayAddress or DomainAddress types makes the Rustacean in me throw
# up a bit.
address = _get_address(to_address)
prefetch_related_objects([address.user], "socialaccount_set", "profile")
user_profile = address.user.profile
except (
ObjectDoesNotExist,
CannotMakeAddressException,
DeletedAddress.MultipleObjectsReturned,
):
if to_local_portion.lower() == "replies":
response = _handle_reply(from_address, message_json, to_address)
else:
response = HttpResponse("Address does not exist", status=404)
return response
_record_receipt_verdicts(receipt, "valid_user")
# if this is spam and the user is set to auto-block spam, early return
if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
incr_if_enabled("email_auto_suppressed_for_spam", 1)
log_email_dropped(reason="auto_block_spam", mask=address)
return HttpResponse("Address rejects spam.")
if _get_verdict(receipt, "dmarc") == "FAIL":
policy = receipt.get("dmarcPolicy", "none")
# TODO: determine action on dmarcPolicy "quarantine"
if policy == "reject":
log_email_dropped(reason="dmarc_reject_failed", mask=address)
incr_if_enabled(
"email_suppressed_for_dmarc_failure",
tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
)
return HttpResponse("DMARC failure, policy is reject", status=400)
# if this user is over bounce limits, early return
bounce_paused, bounce_type = user_profile.check_bounce_pause()
if bounce_paused:
_record_receipt_verdicts(receipt, "user_bounce_paused")
incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
"soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
)
log_email_dropped(reason=reason, mask=address)
return HttpResponse("Address is temporarily disabled.")
# check if this is a reply from an external sender to a Relay user
try:
(lookup_key, _) = _get_keys_from_headers(mail["headers"])
reply_record = _get_reply_record_from_lookup_key(lookup_key)
user_address = address
address = reply_record.address
message_id = _get_message_id_from_headers(mail["headers"])
# make sure the relay user is premium
if not _reply_allowed(from_address, to_address, reply_record, message_id):
log_email_dropped(reason="reply_requires_premium", mask=user_address)
return HttpResponse("Relay replies require a premium account", status=403)
except (ReplyHeadersNotFound, Reply.DoesNotExist):
# if there's no In-Reply-To header, or the In-Reply-To value doesn't
# match a Reply record, continue to treat this as a regular email from
# an external sender to a relay user
pass
# if account flagged for abuse, early return
if user_profile.is_flagged:
log_email_dropped(reason="abuse_flag", mask=address)
return HttpResponse("Address is temporarily disabled.")
if not user_profile.user.is_active:
log_email_dropped(reason="user_deactivated", mask=address)
return HttpResponse("Account is deactivated.")
# if address is set to block, early return
if not address.enabled:
incr_if_enabled("email_for_disabled_address", 1)
address.num_blocked += 1
address.save(update_fields=["num_blocked"])
_record_receipt_verdicts(receipt, "disabled_alias")
user_profile.last_engagement = datetime.now(UTC)
user_profile.save()
glean_logger().log_email_blocked(mask=address, reason="block_all")
return HttpResponse("Address is temporarily disabled.")
_record_receipt_verdicts(receipt, "active_alias")
incr_if_enabled("email_for_active_address", 1)
# if address is blocking list emails, and email is from list, early return
if (
address
and address.block_list_emails
and user_profile.has_premium
and _check_email_from_list(mail["headers"])
):
incr_if_enabled("list_email_for_address_blocking_lists", 1)
address.num_blocked += 1
address.save(update_fields=["num_blocked"])
user_profile.last_engagement = datetime.now(UTC)
user_profile.save()
glean_logger().log_email_blocked(mask=address, reason="block_promotional")
return HttpResponse("Address is not accepting list emails.")
# Collect new headers
subject = common_headers.get("subject", "")
destination_address = user_profile.user.email
reply_address = get_reply_to_address()
try:
from_header = generate_from_header(from_address, to_address)
except InvalidFromHeader:
# TODO: MPP-3407, MPP-3417 - Determine how to handle these
header_from = []
for header in mail["headers"]:
if header["name"].lower() == "from":
header_from.append(header)
info_logger.error(
"generate_from_header",
extra={
"from_address": from_address,
"source": mail["source"],
"common_headers_from": common_headers["from"],
"headers_from": header_from,
},
)
log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
return HttpResponse("Cannot parse the From address", status=503)
# Get incoming email
try:
(incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
except ClientError as e:
if e.response["Error"].get("Code", "") == "NoSuchKey":
logger.error("s3_object_does_not_exist", extra=e.response["Error"])
log_email_dropped(reason="content_missing", mask=address)
return HttpResponse("Email not in S3", status=404)
logger.error("s3_client_error_get_email", extra=e.response["Error"])
log_email_dropped(reason="error_storage", mask=address, can_retry=True)
# we are returning a 503 so that SNS can retry the email processing
return HttpResponse("Cannot fetch the message content from S3", status=503)
# Handle developer overrides, logging
dev_action = _get_developer_mode_action(address)
if dev_action:
if dev_action.new_destination_address:
destination_address = dev_action.new_destination_address
_log_dev_notification(
"_handle_received: developer_mode", dev_action, message_json
)
# Convert to new email
headers: OutgoingHeaders = {
"Subject": subject,
"From": from_header,
"To": destination_address,
"Reply-To": reply_address,
"Resent-From": from_address,
}
sample_trackers = bool(sample_is_active("tracker_sample"))
tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
remove_level_one_trackers = bool(
tracker_removal_flag and user_profile.remove_level_one_email_trackers
)
(
forwarded_email,
issues,
level_one_trackers_removed,
has_html,
has_text,
) = _convert_to_forwarded_email(
incoming_email_bytes=incoming_email_bytes,
headers=headers,
to_address=to_address,
from_address=from_address,
language=user_profile.language,
has_premium=user_profile.has_premium,
sample_trackers=sample_trackers,
remove_level_one_trackers=remove_level_one_trackers,
)
if has_html:
incr_if_enabled("email_with_html_content", 1)
if has_text:
incr_if_enabled("email_with_text_content", 1)
if issues:
info_logger.info(
"_handle_received: forwarding issues", extra={"issues": issues}
)
# Send new email
try:
ses_response = ses_send_raw_email(
source_address=reply_address,
destination_address=destination_address,
message=forwarded_email,
)
except ClientError:
# 503 service unavailable response to SNS so it can retry
log_email_dropped(reason="error_sending", mask=address, can_retry=True)
return HttpResponse("SES client error on Raw Email", status=503)
message_id = ses_response["MessageId"]
_store_reply_record(mail, message_id, address)
user_profile.update_abuse_metric(
email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
)
user_profile.last_engagement = datetime.now(UTC)
user_profile.save()
address.num_forwarded += 1
address.last_used_at = datetime.now(UTC)
if level_one_trackers_removed:
address.num_level_one_trackers_blocked = (
address.num_level_one_trackers_blocked or 0
) + level_one_trackers_removed
address.save(
update_fields=[
"num_forwarded",
"last_used_at",
"block_list_emails",
"num_level_one_trackers_blocked",
]
)
glean_logger().log_email_forwarded(mask=address, is_reply=False)
return HttpResponse("Sent email to final recipient.", status=200)