from __future__ import annotations

import logging
import uuid
from collections import namedtuple
from datetime import UTC, datetime, timedelta
from hashlib import sha256
from typing import TYPE_CHECKING, Literal

from django.conf import settings
from django.contrib.auth.models import User
from django.db import models, transaction
from django.utils.translation.trans_real import (
    get_supported_language_variant,
    parse_accept_lang_header,
)

from allauth.socialaccount.models import SocialAccount

from .country_utils import AcceptLanguageError, guess_country_from_accept_lang
from .exceptions import CannotMakeSubdomainException
from .plans import get_premium_countries
from .utils import flag_is_active_in_task
from .validators import valid_available_subdomain

if TYPE_CHECKING:
    from collections.abc import Iterable

    from django.db.models.base import ModelBase
    from django.db.models.query import QuerySet

    from emails.models import DomainAddress, RelayAddress


abuse_logger = logging.getLogger("abusemetrics")
BounceStatus = namedtuple("BounceStatus", "paused type")
PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"]


def hash_subdomain(subdomain: str, domain: str = settings.MOZMAIL_DOMAIN) -> str:
    return sha256(f"{subdomain}.{domain}".encode()).hexdigest()


class Profile(models.Model):
    user = models.OneToOneField(User, on_delete=models.CASCADE)
    api_token = models.UUIDField(default=uuid.uuid4)
    num_address_deleted = models.PositiveIntegerField(default=0)
    date_subscribed = models.DateTimeField(blank=True, null=True)
    date_subscribed_phone = models.DateTimeField(blank=True, null=True)
    # TODO MPP-2972: delete date_phone_subscription_checked in favor of
    # date_phone_subscription_next_reset
    date_phone_subscription_checked = models.DateTimeField(blank=True, null=True)
    date_phone_subscription_start = models.DateTimeField(blank=True, null=True)
    date_phone_subscription_reset = models.DateTimeField(blank=True, null=True)
    date_phone_subscription_end = models.DateTimeField(blank=True, null=True)
    address_last_deleted = models.DateTimeField(blank=True, null=True, db_index=True)
    last_soft_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
    last_hard_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
    last_account_flagged = models.DateTimeField(blank=True, null=True, db_index=True)
    num_deleted_relay_addresses = models.PositiveIntegerField(default=0)
    num_deleted_domain_addresses = models.PositiveIntegerField(default=0)
    num_email_forwarded_in_deleted_address = models.PositiveIntegerField(default=0)
    num_email_blocked_in_deleted_address = models.PositiveIntegerField(default=0)
    num_level_one_trackers_blocked_in_deleted_address = models.PositiveIntegerField(
        default=0, null=True
    )
    num_email_replied_in_deleted_address = models.PositiveIntegerField(default=0)
    num_email_spam_in_deleted_address = models.PositiveIntegerField(default=0)
    subdomain = models.CharField(
        blank=True,
        null=True,
        unique=True,
        max_length=63,
        db_index=True,
        validators=[valid_available_subdomain],
    )
    # Whether we store the user's alias labels in the server
    server_storage = models.BooleanField(default=True)
    # Whether we store the caller/sender log for the user's relay number
    store_phone_log = models.BooleanField(default=True)
    # TODO: Data migration to set null to false
    # TODO: Schema migration to remove null=True
    remove_level_one_email_trackers = models.BooleanField(null=True, default=False)
    onboarding_state = models.PositiveIntegerField(default=0)
    onboarding_free_state = models.PositiveIntegerField(default=0)
    auto_block_spam = models.BooleanField(default=False)
    forwarded_first_reply = models.BooleanField(default=False)
    # Empty string means the profile was created through relying party flow
    created_by = models.CharField(blank=True, null=True, max_length=63)
    sent_welcome_email = models.BooleanField(default=False)
    last_engagement = models.DateTimeField(blank=True, null=True, db_index=True)

    class Meta:
        # Moved from emails to privaterelay, but old table name retained. See:
        # privaterelay/migrations/0010_move_profile_and_registered_subdomain_models.py
        # emails/migrations/0062_move_profile_and_registered_subdomain_models.py
        db_table = "emails_profile"

    def __str__(self):
        return f"{self.user} Profile"

    def save(
        self,
        force_insert: bool | tuple[ModelBase, ...] = False,
        force_update: bool = False,
        using: str | None = None,
        update_fields: Iterable[str] | None = None,
    ) -> None:
        from emails.models import DomainAddress, RelayAddress

        # always lower-case the subdomain before saving it
        # TODO: change subdomain field as a custom field inheriting from
        # CharField to validate constraints on the field update too
        if self.subdomain and not self.subdomain.islower():
            self.subdomain = self.subdomain.lower()
            if update_fields is not None:
                update_fields = {"subdomain"}.union(update_fields)
        super().save(
            force_insert=force_insert,
            force_update=force_update,
            using=using,
            update_fields=update_fields,
        )
        # any time a profile is saved with server_storage False, delete the
        # appropriate server-stored Relay address data.
        if not self.server_storage:
            relay_addresses = RelayAddress.objects.filter(user=self.user)
            relay_addresses.update(description="", generated_for="", used_on="")
            domain_addresses = DomainAddress.objects.filter(user=self.user)
            domain_addresses.update(description="", used_on="")
        if settings.PHONES_ENABLED:
            # any time a profile is saved with store_phone_log False, delete the
            # appropriate server-stored InboundContact records
            from phones.models import InboundContact, RelayNumber

            if not self.store_phone_log:
                try:
                    relay_number = RelayNumber.objects.get(user=self.user)
                    InboundContact.objects.filter(relay_number=relay_number).delete()
                except RelayNumber.DoesNotExist:
                    pass

    @property
    def language(self):
        if self.fxa and self.fxa.extra_data.get("locale"):
            for accept_lang, _ in parse_accept_lang_header(
                self.fxa.extra_data.get("locale")
            ):
                try:
                    return get_supported_language_variant(accept_lang)
                except LookupError:
                    continue
        return "en"

    # This method returns whether the locale associated with the user's Mozilla account
    # includes a country code from a Premium country. This is less accurate than using
    # get_countries_info_from_request_and_mapping(), which can use a GeoIP lookup, so
    # prefer using that if a request context is available. In other contexts, for
    # example when sending an email, this method can be useful.
    @property
    def fxa_locale_in_premium_country(self) -> bool:
        if self.fxa and self.fxa.extra_data.get("locale"):
            try:
                country = guess_country_from_accept_lang(self.fxa.extra_data["locale"])
            except AcceptLanguageError:
                return False
            premium_countries = get_premium_countries()
            if country in premium_countries:
                return True
        return False

    @property
    def avatar(self) -> str | None:
        if fxa := self.fxa:
            return str(fxa.extra_data.get("avatar"))
        return None

    @property
    def relay_addresses(self) -> QuerySet[RelayAddress]:
        from emails.models import RelayAddress

        return RelayAddress.objects.filter(user=self.user)

    @property
    def domain_addresses(self) -> QuerySet[DomainAddress]:
        from emails.models import DomainAddress

        return DomainAddress.objects.filter(user=self.user)

    @property
    def total_masks(self) -> int:
        ra_count: int = self.relay_addresses.count()
        da_count: int = self.domain_addresses.count()
        return ra_count + da_count

    @property
    def at_mask_limit(self) -> bool:
        if self.has_premium:
            return False
        ra_count: int = self.relay_addresses.count()
        return ra_count >= settings.MAX_NUM_FREE_ALIASES

    def check_bounce_pause(self) -> BounceStatus:
        if self.last_hard_bounce:
            last_hard_bounce_allowed = datetime.now(UTC) - timedelta(
                days=settings.HARD_BOUNCE_ALLOWED_DAYS
            )
            if self.last_hard_bounce > last_hard_bounce_allowed:
                return BounceStatus(True, "hard")
            self.last_hard_bounce = None
            self.save()
        if self.last_soft_bounce:
            last_soft_bounce_allowed = datetime.now(UTC) - timedelta(
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
            )
            if self.last_soft_bounce > last_soft_bounce_allowed:
                return BounceStatus(True, "soft")
            self.last_soft_bounce = None
            self.save()
        return BounceStatus(False, "")

    @property
    def bounce_status(self) -> BounceStatus:
        return self.check_bounce_pause()

    @property
    def next_email_try(self) -> datetime:
        bounce_pause, bounce_type = self.check_bounce_pause()

        if not bounce_pause:
            return datetime.now(UTC)

        if bounce_type == "soft":
            if not self.last_soft_bounce:
                raise ValueError("self.last_soft_bounce must be truthy value.")
            return self.last_soft_bounce + timedelta(
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
            )

        if bounce_type != "hard":
            raise ValueError("bounce_type must be either 'soft' or 'hard'")
        if not self.last_hard_bounce:
            raise ValueError("self.last_hard_bounce must be truthy value.")
        return self.last_hard_bounce + timedelta(days=settings.HARD_BOUNCE_ALLOWED_DAYS)

    @property
    def last_bounce_date(self):
        if self.last_hard_bounce:
            return self.last_hard_bounce
        if self.last_soft_bounce:
            return self.last_soft_bounce
        return None

    @property
    def at_max_free_aliases(self) -> bool:
        relay_addresses_count: int = self.relay_addresses.count()
        return relay_addresses_count >= settings.MAX_NUM_FREE_ALIASES

    @property
    def fxa(self) -> SocialAccount | None:
        # Note: we are NOT using .filter() here because it invalidates
        # any profile instances that were queried with prefetch_related, which
        # we use in at least the profile view to minimize queries
        if not hasattr(self.user, "socialaccount_set"):
            raise AttributeError("self.user must have socialaccount_set attribute")
        for sa in self.user.socialaccount_set.all():
            if sa.provider == "fxa":
                return sa
        return None

    @property
    def display_name(self) -> str | None:
        # if display name is not set on FxA the
        # displayName key will not exist on the extra_data
        if fxa := self.fxa:
            name = fxa.extra_data.get("displayName")
            return name if name is None else str(name)
        return None

    @property
    def custom_domain(self) -> str:
        if not self.subdomain:
            raise ValueError("self.subdomain must be truthy value.")
        return f"@{self.subdomain}.{settings.MOZMAIL_DOMAIN}"

    @property
    def has_premium(self) -> bool:
        if not self.user.is_active:
            return False

        # FIXME: as we don't have all the tiers defined we are over-defining
        # this to mark the user as a premium user as well
        if not self.fxa:
            return False
        for premium_domain in PREMIUM_DOMAINS:
            if self.user.email.endswith(f"@{premium_domain}"):
                return True
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
        for sub in settings.SUBSCRIPTIONS_WITH_UNLIMITED:
            if sub in user_subscriptions:
                return True
        return False

    @property
    def has_phone(self) -> bool:
        if not self.fxa:
            return False
        if settings.RELAY_CHANNEL != "prod" and not settings.IN_PYTEST:
            if not flag_is_active_in_task("phones", self.user):
                return False
        if flag_is_active_in_task("free_phones", self.user):
            return True
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
        for sub in settings.SUBSCRIPTIONS_WITH_PHONE:
            if sub in user_subscriptions:
                return True
        return False

    @property
    def has_vpn(self) -> bool:
        if not self.fxa:
            return False
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
        for sub in settings.SUBSCRIPTIONS_WITH_VPN:
            if sub in user_subscriptions:
                return True
        return False

    @property
    def emails_forwarded(self) -> int:
        return (
            sum(ra.num_forwarded for ra in self.relay_addresses)
            + sum(da.num_forwarded for da in self.domain_addresses)
            + self.num_email_forwarded_in_deleted_address
        )

    @property
    def emails_blocked(self) -> int:
        return (
            sum(ra.num_blocked for ra in self.relay_addresses)
            + sum(da.num_blocked for da in self.domain_addresses)
            + self.num_email_blocked_in_deleted_address
        )

    @property
    def emails_replied(self) -> int:
        ra_sum = self.relay_addresses.aggregate(models.Sum("num_replied", default=0))
        da_sum = self.domain_addresses.aggregate(models.Sum("num_replied", default=0))
        return (
            int(ra_sum["num_replied__sum"])
            + int(da_sum["num_replied__sum"])
            + self.num_email_replied_in_deleted_address
        )

    @property
    def level_one_trackers_blocked(self) -> int:
        return (
            sum(ra.num_level_one_trackers_blocked or 0 for ra in self.relay_addresses)
            + sum(
                da.num_level_one_trackers_blocked or 0 for da in self.domain_addresses
            )
            + (self.num_level_one_trackers_blocked_in_deleted_address or 0)
        )

    @property
    def joined_before_premium_release(self):
        date_created = self.user.date_joined
        return date_created < datetime.fromisoformat("2021-10-22 17:00:00+00:00")

    @property
    def date_phone_registered(self) -> datetime | None:
        if not settings.PHONES_ENABLED:
            return None

        from phones.models import RealPhone, RelayNumber

        try:
            real_phone = RealPhone.verified_objects.get_for_user(self.user)
            relay_number = RelayNumber.objects.get(user=self.user)
        except RealPhone.DoesNotExist:
            return None
        except RelayNumber.DoesNotExist:
            return real_phone.verified_date
        return relay_number.created_at or real_phone.verified_date

    def add_subdomain(self, subdomain):
        # Handles if the subdomain is "" or None
        if not subdomain:
            raise CannotMakeSubdomainException(
                "error-subdomain-cannot-be-empty-or-null"
            )

        # subdomain must be all lowercase
        subdomain = subdomain.lower()

        if not self.has_premium:
            raise CannotMakeSubdomainException("error-premium-set-subdomain")
        if self.subdomain is not None:
            raise CannotMakeSubdomainException("error-premium-cannot-change-subdomain")
        self.subdomain = subdomain
        # The validator defined in the subdomain field does not get run in full_clean()
        # when self.subdomain is "" or None, so we need to run the validator again to
        # catch these cases.
        valid_available_subdomain(subdomain)
        self.full_clean()
        self.save()

        RegisteredSubdomain.objects.create(subdomain_hash=hash_subdomain(subdomain))
        return subdomain

    def update_abuse_metric(
        self,
        address_created: bool = False,
        replied: bool = False,
        email_forwarded: bool = False,
        forwarded_email_size: int = 0,
    ) -> datetime | None:
        if self.user.email in settings.ALLOWED_ACCOUNTS:
            return None

        with transaction.atomic():
            # look for abuse metrics created on the same UTC date, regardless of time.
            midnight_utc_today = datetime.combine(
                datetime.now(UTC).date(), datetime.min.time()
            ).astimezone(UTC)
            midnight_utc_tomorrow = midnight_utc_today + timedelta(days=1)
            abuse_metric = (
                self.user.abusemetrics_set.select_for_update()
                .filter(
                    first_recorded__gte=midnight_utc_today,
                    first_recorded__lt=midnight_utc_tomorrow,
                )
                .first()
            )
            if not abuse_metric:
                from emails.models import AbuseMetrics

                abuse_metric = AbuseMetrics.objects.create(user=self.user)
                AbuseMetrics.objects.filter(
                    first_recorded__lt=midnight_utc_today
                ).delete()

            # increment the abuse metric
            if address_created:
                abuse_metric.num_address_created_per_day += 1
            if replied:
                abuse_metric.num_replies_per_day += 1
            if email_forwarded:
                abuse_metric.num_email_forwarded_per_day += 1
            if forwarded_email_size > 0:
                abuse_metric.forwarded_email_size_per_day += forwarded_email_size
            abuse_metric.last_recorded = datetime.now(UTC)
            abuse_metric.save()

            # check user should be flagged for abuse
            hit_max_create = False
            hit_max_replies = False
            hit_max_forwarded = False
            hit_max_forwarded_email_size = False

            hit_max_create = (
                abuse_metric.num_address_created_per_day
                >= settings.MAX_ADDRESS_CREATION_PER_DAY
            )
            hit_max_replies = (
                abuse_metric.num_replies_per_day >= settings.MAX_REPLIES_PER_DAY
            )
            hit_max_forwarded = (
                abuse_metric.num_email_forwarded_per_day
                >= settings.MAX_FORWARDED_PER_DAY
            )
            hit_max_forwarded_email_size = (
                abuse_metric.forwarded_email_size_per_day
                >= settings.MAX_FORWARDED_EMAIL_SIZE_PER_DAY
            )
            if (
                hit_max_create
                or hit_max_replies
                or hit_max_forwarded
                or hit_max_forwarded_email_size
            ):
                self.last_account_flagged = datetime.now(UTC)
                self.save()
                data = {
                    "uid": self.fxa.uid if self.fxa else None,
                    "flagged": self.last_account_flagged.timestamp(),
                    "replies": abuse_metric.num_replies_per_day,
                    "addresses": abuse_metric.num_address_created_per_day,
                    "forwarded": abuse_metric.num_email_forwarded_per_day,
                    "forwarded_size_in_bytes": (
                        abuse_metric.forwarded_email_size_per_day
                    ),
                }
                # log for further secops review
                abuse_logger.info("Abuse flagged", extra=data)

        return self.last_account_flagged

    @property
    def is_flagged(self):
        if not self.last_account_flagged:
            return False
        account_premium_feature_resumed = self.last_account_flagged + timedelta(
            days=settings.PREMIUM_FEATURE_PAUSED_DAYS
        )
        if datetime.now(UTC) > account_premium_feature_resumed:
            # premium feature has been resumed
            return False
        # user was flagged and the premium feature pause period is not yet over
        return True

    @property
    def metrics_enabled(self) -> bool:
        """
        Does the user allow us to record technical and interaction data?

        This is based on the Mozilla accounts opt-out option, added around 2022. A user
        can go to their Mozilla account profile settings, Data Collection and Use, and
        deselect "Help improve Mozilla Account". This setting defaults to On, and is
        sent as "metricsEnabled". Some older Relay accounts do not have
        "metricsEnabled", and we default to On.
        """
        if self.fxa:
            return bool(self.fxa.extra_data.get("metricsEnabled", True))
        return True

    @property
    def plan(self) -> Literal["free", "email", "phone", "bundle"]:
        """The user's Relay plan as a string."""
        if self.has_premium:
            if self.has_phone:
                return "bundle" if self.has_vpn else "phone"
            else:
                return "email"
        else:
            return "free"

    @property
    def plan_term(self) -> Literal[None, "unknown", "1_month", "1_year"]:
        """The user's Relay plan term as a string."""
        plan = self.plan
        if plan == "free":
            return None
        if plan == "phone":
            start_date = self.date_phone_subscription_start
            end_date = self.date_phone_subscription_end
            if start_date and end_date:
                span = end_date - start_date
                return "1_year" if span.days > 32 else "1_month"
        return "unknown"

    @property
    def metrics_premium_status(self) -> str:
        plan = self.plan
        if plan == "free":
            return "free"
        return f"{plan}_{self.plan_term}"

    @property
    def metrics_fxa_id(self) -> str:
        """Return Mozilla Accounts ID if user has metrics enabled, else empty string"""
        if (fxa := self.fxa) and self.metrics_enabled and isinstance(fxa.uid, str):
            return fxa.uid
        return ""


class RegisteredSubdomain(models.Model):
    subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True)
    registered_at = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return self.subdomain_hash

    class Meta:
        # Moved from emails to privaterelay, but old table name retained. See:
        # privaterelay/migrations/0010_move_profile_and_registered_subdomain_models.py
        # emails/migrations/0062_move_profile_and_registered_subdomain_models.py
        db_table = "emails_registeredsubdomain"

    @classmethod
    def is_taken(cls, subdomain: str) -> bool:
        return cls.objects.filter(subdomain_hash=hash_subdomain(subdomain)).exists()
