privaterelay/models.py (455 lines of code) (raw):

from __future__ import annotations import logging import uuid from collections import namedtuple from datetime import UTC, datetime, timedelta from hashlib import sha256 from typing import TYPE_CHECKING, Literal from django.conf import settings from django.contrib.auth.models import User from django.db import models, transaction from django.utils.translation.trans_real import ( get_supported_language_variant, parse_accept_lang_header, ) from allauth.socialaccount.models import SocialAccount from .country_utils import AcceptLanguageError, guess_country_from_accept_lang from .exceptions import CannotMakeSubdomainException from .plans import get_premium_countries from .utils import flag_is_active_in_task from .validators import valid_available_subdomain if TYPE_CHECKING: from collections.abc import Iterable from django.db.models.base import ModelBase from django.db.models.query import QuerySet from emails.models import DomainAddress, RelayAddress abuse_logger = logging.getLogger("abusemetrics") BounceStatus = namedtuple("BounceStatus", "paused type") PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"] def hash_subdomain(subdomain: str, domain: str = settings.MOZMAIL_DOMAIN) -> str: return sha256(f"{subdomain}.{domain}".encode()).hexdigest() class Profile(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE) api_token = models.UUIDField(default=uuid.uuid4) num_address_deleted = models.PositiveIntegerField(default=0) date_subscribed = models.DateTimeField(blank=True, null=True) date_subscribed_phone = models.DateTimeField(blank=True, null=True) # TODO MPP-2972: delete date_phone_subscription_checked in favor of # date_phone_subscription_next_reset date_phone_subscription_checked = models.DateTimeField(blank=True, null=True) date_phone_subscription_start = models.DateTimeField(blank=True, null=True) date_phone_subscription_reset = models.DateTimeField(blank=True, null=True) date_phone_subscription_end = models.DateTimeField(blank=True, null=True) address_last_deleted = models.DateTimeField(blank=True, null=True, db_index=True) last_soft_bounce = models.DateTimeField(blank=True, null=True, db_index=True) last_hard_bounce = models.DateTimeField(blank=True, null=True, db_index=True) last_account_flagged = models.DateTimeField(blank=True, null=True, db_index=True) num_deleted_relay_addresses = models.PositiveIntegerField(default=0) num_deleted_domain_addresses = models.PositiveIntegerField(default=0) num_email_forwarded_in_deleted_address = models.PositiveIntegerField(default=0) num_email_blocked_in_deleted_address = models.PositiveIntegerField(default=0) num_level_one_trackers_blocked_in_deleted_address = models.PositiveIntegerField( default=0, null=True ) num_email_replied_in_deleted_address = models.PositiveIntegerField(default=0) num_email_spam_in_deleted_address = models.PositiveIntegerField(default=0) subdomain = models.CharField( blank=True, null=True, unique=True, max_length=63, db_index=True, validators=[valid_available_subdomain], ) # Whether we store the user's alias labels in the server server_storage = models.BooleanField(default=True) # Whether we store the caller/sender log for the user's relay number store_phone_log = models.BooleanField(default=True) # TODO: Data migration to set null to false # TODO: Schema migration to remove null=True remove_level_one_email_trackers = models.BooleanField(null=True, default=False) onboarding_state = models.PositiveIntegerField(default=0) onboarding_free_state = models.PositiveIntegerField(default=0) auto_block_spam = models.BooleanField(default=False) forwarded_first_reply = models.BooleanField(default=False) # Empty string means the profile was created through relying party flow created_by = models.CharField(blank=True, null=True, max_length=63) sent_welcome_email = models.BooleanField(default=False) last_engagement = models.DateTimeField(blank=True, null=True, db_index=True) class Meta: # Moved from emails to privaterelay, but old table name retained. See: # privaterelay/migrations/0010_move_profile_and_registered_subdomain_models.py # emails/migrations/0062_move_profile_and_registered_subdomain_models.py db_table = "emails_profile" def __str__(self): return f"{self.user} Profile" def save( self, force_insert: bool | tuple[ModelBase, ...] = False, force_update: bool = False, using: str | None = None, update_fields: Iterable[str] | None = None, ) -> None: from emails.models import DomainAddress, RelayAddress # always lower-case the subdomain before saving it # TODO: change subdomain field as a custom field inheriting from # CharField to validate constraints on the field update too if self.subdomain and not self.subdomain.islower(): self.subdomain = self.subdomain.lower() if update_fields is not None: update_fields = {"subdomain"}.union(update_fields) super().save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields, ) # any time a profile is saved with server_storage False, delete the # appropriate server-stored Relay address data. if not self.server_storage: relay_addresses = RelayAddress.objects.filter(user=self.user) relay_addresses.update(description="", generated_for="", used_on="") domain_addresses = DomainAddress.objects.filter(user=self.user) domain_addresses.update(description="", used_on="") if settings.PHONES_ENABLED: # any time a profile is saved with store_phone_log False, delete the # appropriate server-stored InboundContact records from phones.models import InboundContact, RelayNumber if not self.store_phone_log: try: relay_number = RelayNumber.objects.get(user=self.user) InboundContact.objects.filter(relay_number=relay_number).delete() except RelayNumber.DoesNotExist: pass @property def language(self): if self.fxa and self.fxa.extra_data.get("locale"): for accept_lang, _ in parse_accept_lang_header( self.fxa.extra_data.get("locale") ): try: return get_supported_language_variant(accept_lang) except LookupError: continue return "en" # This method returns whether the locale associated with the user's Mozilla account # includes a country code from a Premium country. This is less accurate than using # get_countries_info_from_request_and_mapping(), which can use a GeoIP lookup, so # prefer using that if a request context is available. In other contexts, for # example when sending an email, this method can be useful. @property def fxa_locale_in_premium_country(self) -> bool: if self.fxa and self.fxa.extra_data.get("locale"): try: country = guess_country_from_accept_lang(self.fxa.extra_data["locale"]) except AcceptLanguageError: return False premium_countries = get_premium_countries() if country in premium_countries: return True return False @property def avatar(self) -> str | None: if fxa := self.fxa: return str(fxa.extra_data.get("avatar")) return None @property def relay_addresses(self) -> QuerySet[RelayAddress]: from emails.models import RelayAddress return RelayAddress.objects.filter(user=self.user) @property def domain_addresses(self) -> QuerySet[DomainAddress]: from emails.models import DomainAddress return DomainAddress.objects.filter(user=self.user) @property def total_masks(self) -> int: ra_count: int = self.relay_addresses.count() da_count: int = self.domain_addresses.count() return ra_count + da_count @property def at_mask_limit(self) -> bool: if self.has_premium: return False ra_count: int = self.relay_addresses.count() return ra_count >= settings.MAX_NUM_FREE_ALIASES def check_bounce_pause(self) -> BounceStatus: if self.last_hard_bounce: last_hard_bounce_allowed = datetime.now(UTC) - timedelta( days=settings.HARD_BOUNCE_ALLOWED_DAYS ) if self.last_hard_bounce > last_hard_bounce_allowed: return BounceStatus(True, "hard") self.last_hard_bounce = None self.save() if self.last_soft_bounce: last_soft_bounce_allowed = datetime.now(UTC) - timedelta( days=settings.SOFT_BOUNCE_ALLOWED_DAYS ) if self.last_soft_bounce > last_soft_bounce_allowed: return BounceStatus(True, "soft") self.last_soft_bounce = None self.save() return BounceStatus(False, "") @property def bounce_status(self) -> BounceStatus: return self.check_bounce_pause() @property def next_email_try(self) -> datetime: bounce_pause, bounce_type = self.check_bounce_pause() if not bounce_pause: return datetime.now(UTC) if bounce_type == "soft": if not self.last_soft_bounce: raise ValueError("self.last_soft_bounce must be truthy value.") return self.last_soft_bounce + timedelta( days=settings.SOFT_BOUNCE_ALLOWED_DAYS ) if bounce_type != "hard": raise ValueError("bounce_type must be either 'soft' or 'hard'") if not self.last_hard_bounce: raise ValueError("self.last_hard_bounce must be truthy value.") return self.last_hard_bounce + timedelta(days=settings.HARD_BOUNCE_ALLOWED_DAYS) @property def last_bounce_date(self): if self.last_hard_bounce: return self.last_hard_bounce if self.last_soft_bounce: return self.last_soft_bounce return None @property def at_max_free_aliases(self) -> bool: relay_addresses_count: int = self.relay_addresses.count() return relay_addresses_count >= settings.MAX_NUM_FREE_ALIASES @property def fxa(self) -> SocialAccount | None: # Note: we are NOT using .filter() here because it invalidates # any profile instances that were queried with prefetch_related, which # we use in at least the profile view to minimize queries if not hasattr(self.user, "socialaccount_set"): raise AttributeError("self.user must have socialaccount_set attribute") for sa in self.user.socialaccount_set.all(): if sa.provider == "fxa": return sa return None @property def display_name(self) -> str | None: # if display name is not set on FxA the # displayName key will not exist on the extra_data if fxa := self.fxa: name = fxa.extra_data.get("displayName") return name if name is None else str(name) return None @property def custom_domain(self) -> str: if not self.subdomain: raise ValueError("self.subdomain must be truthy value.") return f"@{self.subdomain}.{settings.MOZMAIL_DOMAIN}" @property def has_premium(self) -> bool: if not self.user.is_active: return False # FIXME: as we don't have all the tiers defined we are over-defining # this to mark the user as a premium user as well if not self.fxa: return False for premium_domain in PREMIUM_DOMAINS: if self.user.email.endswith(f"@{premium_domain}"): return True user_subscriptions = self.fxa.extra_data.get("subscriptions", []) for sub in settings.SUBSCRIPTIONS_WITH_UNLIMITED: if sub in user_subscriptions: return True return False @property def has_phone(self) -> bool: if not self.fxa: return False if settings.RELAY_CHANNEL != "prod" and not settings.IN_PYTEST: if not flag_is_active_in_task("phones", self.user): return False if flag_is_active_in_task("free_phones", self.user): return True user_subscriptions = self.fxa.extra_data.get("subscriptions", []) for sub in settings.SUBSCRIPTIONS_WITH_PHONE: if sub in user_subscriptions: return True return False @property def has_vpn(self) -> bool: if not self.fxa: return False user_subscriptions = self.fxa.extra_data.get("subscriptions", []) for sub in settings.SUBSCRIPTIONS_WITH_VPN: if sub in user_subscriptions: return True return False @property def emails_forwarded(self) -> int: return ( sum(ra.num_forwarded for ra in self.relay_addresses) + sum(da.num_forwarded for da in self.domain_addresses) + self.num_email_forwarded_in_deleted_address ) @property def emails_blocked(self) -> int: return ( sum(ra.num_blocked for ra in self.relay_addresses) + sum(da.num_blocked for da in self.domain_addresses) + self.num_email_blocked_in_deleted_address ) @property def emails_replied(self) -> int: ra_sum = self.relay_addresses.aggregate(models.Sum("num_replied", default=0)) da_sum = self.domain_addresses.aggregate(models.Sum("num_replied", default=0)) return ( int(ra_sum["num_replied__sum"]) + int(da_sum["num_replied__sum"]) + self.num_email_replied_in_deleted_address ) @property def level_one_trackers_blocked(self) -> int: return ( sum(ra.num_level_one_trackers_blocked or 0 for ra in self.relay_addresses) + sum( da.num_level_one_trackers_blocked or 0 for da in self.domain_addresses ) + (self.num_level_one_trackers_blocked_in_deleted_address or 0) ) @property def joined_before_premium_release(self): date_created = self.user.date_joined return date_created < datetime.fromisoformat("2021-10-22 17:00:00+00:00") @property def date_phone_registered(self) -> datetime | None: if not settings.PHONES_ENABLED: return None from phones.models import RealPhone, RelayNumber try: real_phone = RealPhone.verified_objects.get_for_user(self.user) relay_number = RelayNumber.objects.get(user=self.user) except RealPhone.DoesNotExist: return None except RelayNumber.DoesNotExist: return real_phone.verified_date return relay_number.created_at or real_phone.verified_date def add_subdomain(self, subdomain): # Handles if the subdomain is "" or None if not subdomain: raise CannotMakeSubdomainException( "error-subdomain-cannot-be-empty-or-null" ) # subdomain must be all lowercase subdomain = subdomain.lower() if not self.has_premium: raise CannotMakeSubdomainException("error-premium-set-subdomain") if self.subdomain is not None: raise CannotMakeSubdomainException("error-premium-cannot-change-subdomain") self.subdomain = subdomain # The validator defined in the subdomain field does not get run in full_clean() # when self.subdomain is "" or None, so we need to run the validator again to # catch these cases. valid_available_subdomain(subdomain) self.full_clean() self.save() RegisteredSubdomain.objects.create(subdomain_hash=hash_subdomain(subdomain)) return subdomain def update_abuse_metric( self, address_created: bool = False, replied: bool = False, email_forwarded: bool = False, forwarded_email_size: int = 0, ) -> datetime | None: if self.user.email in settings.ALLOWED_ACCOUNTS: return None with transaction.atomic(): # look for abuse metrics created on the same UTC date, regardless of time. midnight_utc_today = datetime.combine( datetime.now(UTC).date(), datetime.min.time() ).astimezone(UTC) midnight_utc_tomorrow = midnight_utc_today + timedelta(days=1) abuse_metric = ( self.user.abusemetrics_set.select_for_update() .filter( first_recorded__gte=midnight_utc_today, first_recorded__lt=midnight_utc_tomorrow, ) .first() ) if not abuse_metric: from emails.models import AbuseMetrics abuse_metric = AbuseMetrics.objects.create(user=self.user) AbuseMetrics.objects.filter( first_recorded__lt=midnight_utc_today ).delete() # increment the abuse metric if address_created: abuse_metric.num_address_created_per_day += 1 if replied: abuse_metric.num_replies_per_day += 1 if email_forwarded: abuse_metric.num_email_forwarded_per_day += 1 if forwarded_email_size > 0: abuse_metric.forwarded_email_size_per_day += forwarded_email_size abuse_metric.last_recorded = datetime.now(UTC) abuse_metric.save() # check user should be flagged for abuse hit_max_create = False hit_max_replies = False hit_max_forwarded = False hit_max_forwarded_email_size = False hit_max_create = ( abuse_metric.num_address_created_per_day >= settings.MAX_ADDRESS_CREATION_PER_DAY ) hit_max_replies = ( abuse_metric.num_replies_per_day >= settings.MAX_REPLIES_PER_DAY ) hit_max_forwarded = ( abuse_metric.num_email_forwarded_per_day >= settings.MAX_FORWARDED_PER_DAY ) hit_max_forwarded_email_size = ( abuse_metric.forwarded_email_size_per_day >= settings.MAX_FORWARDED_EMAIL_SIZE_PER_DAY ) if ( hit_max_create or hit_max_replies or hit_max_forwarded or hit_max_forwarded_email_size ): self.last_account_flagged = datetime.now(UTC) self.save() data = { "uid": self.fxa.uid if self.fxa else None, "flagged": self.last_account_flagged.timestamp(), "replies": abuse_metric.num_replies_per_day, "addresses": abuse_metric.num_address_created_per_day, "forwarded": abuse_metric.num_email_forwarded_per_day, "forwarded_size_in_bytes": ( abuse_metric.forwarded_email_size_per_day ), } # log for further secops review abuse_logger.info("Abuse flagged", extra=data) return self.last_account_flagged @property def is_flagged(self): if not self.last_account_flagged: return False account_premium_feature_resumed = self.last_account_flagged + timedelta( days=settings.PREMIUM_FEATURE_PAUSED_DAYS ) if datetime.now(UTC) > account_premium_feature_resumed: # premium feature has been resumed return False # user was flagged and the premium feature pause period is not yet over return True @property def metrics_enabled(self) -> bool: """ Does the user allow us to record technical and interaction data? This is based on the Mozilla accounts opt-out option, added around 2022. A user can go to their Mozilla account profile settings, Data Collection and Use, and deselect "Help improve Mozilla Account". This setting defaults to On, and is sent as "metricsEnabled". Some older Relay accounts do not have "metricsEnabled", and we default to On. """ if self.fxa: return bool(self.fxa.extra_data.get("metricsEnabled", True)) return True @property def plan(self) -> Literal["free", "email", "phone", "bundle"]: """The user's Relay plan as a string.""" if self.has_premium: if self.has_phone: return "bundle" if self.has_vpn else "phone" else: return "email" else: return "free" @property def plan_term(self) -> Literal[None, "unknown", "1_month", "1_year"]: """The user's Relay plan term as a string.""" plan = self.plan if plan == "free": return None if plan == "phone": start_date = self.date_phone_subscription_start end_date = self.date_phone_subscription_end if start_date and end_date: span = end_date - start_date return "1_year" if span.days > 32 else "1_month" return "unknown" @property def metrics_premium_status(self) -> str: plan = self.plan if plan == "free": return "free" return f"{plan}_{self.plan_term}" @property def metrics_fxa_id(self) -> str: """Return Mozilla Accounts ID if user has metrics enabled, else empty string""" if (fxa := self.fxa) and self.metrics_enabled and isinstance(fxa.uid, str): return fxa.uid return "" class RegisteredSubdomain(models.Model): subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True) registered_at = models.DateTimeField(auto_now_add=True) def __str__(self): return self.subdomain_hash class Meta: # Moved from emails to privaterelay, but old table name retained. See: # privaterelay/migrations/0010_move_profile_and_registered_subdomain_models.py # emails/migrations/0062_move_profile_and_registered_subdomain_models.py db_table = "emails_registeredsubdomain" @classmethod def is_taken(cls, subdomain: str) -> bool: return cls.objects.filter(subdomain_hash=hash_subdomain(subdomain)).exists()