phones/models.py (409 lines of code) (raw):

from __future__ import annotations import logging import secrets import string from collections.abc import Iterator from datetime import UTC, datetime, timedelta from math import floor from django.conf import settings from django.contrib.auth.models import User from django.core.cache import cache from django.core.exceptions import BadRequest, ValidationError from django.db import models from django.db.migrations.recorder import MigrationRecorder from django.db.models.signals import post_save from django.dispatch.dispatcher import receiver from django.urls import reverse import phonenumbers from twilio.base.exceptions import TwilioRestException from twilio.rest import Client from emails.utils import incr_if_enabled from .apps import phones_config, twilio_client from .iq_utils import send_iq_sms logger = logging.getLogger("eventsinfo") events_logger = logging.getLogger("events") MAX_MINUTES_TO_VERIFY_REAL_PHONE = 5 LAST_CONTACT_TYPE_CHOICES = [ ("call", "call"), ("text", "text"), ] DEFAULT_REGION = "US" def verification_code_default(): return str(secrets.randbelow(1000000)).zfill(6) def verification_sent_date_default(): return datetime.now(UTC) def get_last_text_sender(relay_number: RelayNumber) -> InboundContact | None: """ Get the last text sender. MPP-2581 introduces a last_text_date column for determining the last sender. Before MPP-2581, the last_inbound_date with last_inbound_type=text was used. During the transition, look at both methods. """ try: latest = InboundContact.objects.filter( relay_number=relay_number, last_text_date__isnull=False ).latest("last_text_date") except InboundContact.DoesNotExist: latest = None try: latest_by_old_method = InboundContact.objects.filter( relay_number=relay_number, last_inbound_type="text" ).latest("last_inbound_date") except InboundContact.DoesNotExist: latest_by_old_method = None if (latest is None and latest_by_old_method is not None) or ( latest and latest_by_old_method and latest != latest_by_old_method and latest.last_text_date and latest_by_old_method.last_inbound_date > latest.last_text_date ): # Pre-MPP-2581 server handled the latest text message return latest_by_old_method return latest def iq_fmt(e164_number: str) -> str: return "1" + str(phonenumbers.parse(e164_number, "E164").national_number) class VerifiedRealPhoneManager(models.Manager["RealPhone"]): """Return verified RealPhone records.""" def get_queryset(self) -> models.query.QuerySet[RealPhone]: return super().get_queryset().filter(verified=True) def get_for_user(self, user: User) -> RealPhone: """Get the one verified RealPhone for the user, or raise DoesNotExist.""" return self.get(user=user) def exists_for_number(self, number: str) -> bool: """Return True if a verified RealPhone exists for this number.""" return self.filter(number=number).exists() def country_code_for_user(self, user: User) -> str: """Return the RealPhone country code for this user.""" return self.values_list("country_code", flat=True).get(user=user) class ExpiredRealPhoneManager(models.Manager["RealPhone"]): """Return RealPhone records where the sent verification is no longer valid.""" def get_queryset(self) -> models.query.QuerySet[RealPhone]: return ( super() .get_queryset() .filter( verified=False, verification_sent_date__lt=RealPhone.verification_expiration(), ) ) def delete_for_number(self, number: str) -> tuple[int, dict[str, int]]: return self.filter(number=number).delete() class RecentRealPhoneManager(models.Manager["RealPhone"]): """Return RealPhone records where the sent verification is still valid.""" def get_queryset(self) -> models.query.QuerySet[RealPhone]: return ( super() .get_queryset() .filter( verified=False, verification_sent_date__gte=RealPhone.verification_expiration(), ) ) def get_for_user_number_and_verification_code( self, user: User, number: str, verification_code: str ) -> RealPhone: """Get the RealPhone with this user, number, and recently sent code, or raise""" return self.get(user=user, number=number, verification_code=verification_code) class PendingRealPhoneManager(RecentRealPhoneManager): """Return unverified RealPhone records where verification is still valid.""" def get_queryset(self) -> models.query.QuerySet[RealPhone]: return super().get_queryset().filter(verified=False) def exists_for_number(self, number: str) -> bool: """Return True if a verified RealPhone exists for this number.""" return self.filter(number=number).exists() class RealPhone(models.Model): user = models.ForeignKey(User, on_delete=models.CASCADE) number = models.CharField(max_length=15) verification_code = models.CharField( max_length=8, default=verification_code_default ) verification_sent_date = models.DateTimeField( blank=True, null=True, default=verification_sent_date_default ) verified = models.BooleanField(default=False) verified_date = models.DateTimeField(blank=True, null=True) country_code = models.CharField(max_length=2, default=DEFAULT_REGION) objects = models.Manager() verified_objects = VerifiedRealPhoneManager() expired_objects = ExpiredRealPhoneManager() recent_objects = RecentRealPhoneManager() pending_objects = PendingRealPhoneManager() class Meta: constraints = [ models.UniqueConstraint( fields=["number", "verified"], condition=models.Q(verified=True), name="unique_verified_number", ) ] @classmethod def verification_expiration(self) -> datetime: return datetime.now(UTC) - timedelta( 0, 60 * settings.MAX_MINUTES_TO_VERIFY_REAL_PHONE ) def save(self, *args, **kwargs): # delete any expired unverified RealPhone records for this number # note: it doesn't matter which user is trying to create a new # RealPhone record - any expired unverified record for the number # should be deleted RealPhone.expired_objects.delete_for_number(self.number) # We are not ready to support multiple real phone numbers per user, # so raise an exception if this save() would create a second # RealPhone record for the user try: verified_number = RealPhone.verified_objects.get_for_user(self.user) if not ( verified_number.number == self.number and verified_number.verification_code == self.verification_code ): raise BadRequest("User already has a verified number.") except RealPhone.DoesNotExist: pass # call super save to save into the DB # See also: realphone_post_save receiver below return super().save(*args, **kwargs) def mark_verified(self): incr_if_enabled("phones_RealPhone.mark_verified") self.verified = True self.verified_date = datetime.now(UTC) self.save(force_update=True) return self @receiver(post_save, sender=RealPhone, dispatch_uid="realphone_post_save") def realphone_post_save(sender, instance, created, **kwargs): # don't do anything if running migrations if isinstance(instance, MigrationRecorder.Migration): return if created: # only send verification_code when creating new record incr_if_enabled("phones_RealPhone.post_save_created_send_verification") text_body = ( f"Your Firefox Relay verification code is {instance.verification_code}" ) if settings.PHONES_NO_CLIENT_CALLS_IN_TEST: return if settings.IQ_FOR_VERIFICATION: send_iq_sms(instance.number, settings.IQ_MAIN_NUMBER, text_body) return client = twilio_client() client.messages.create( body=text_body, from_=settings.TWILIO_MAIN_NUMBER, to=instance.number, ) def vcard_lookup_key_default(): return "".join( secrets.choice(string.ascii_letters + string.digits) for i in range(6) ) class RelayNumber(models.Model): user = models.ForeignKey(User, on_delete=models.CASCADE) number = models.CharField(max_length=15, db_index=True, unique=True) vendor = models.CharField(max_length=15, default="twilio") location = models.CharField(max_length=255) country_code = models.CharField(max_length=2, default=DEFAULT_REGION) vcard_lookup_key = models.CharField( max_length=6, default=vcard_lookup_key_default, unique=True ) enabled = models.BooleanField(default=True) remaining_seconds = models.IntegerField( default=settings.MAX_MINUTES_PER_BILLING_CYCLE * 60 ) remaining_texts = models.IntegerField(default=settings.MAX_TEXTS_PER_BILLING_CYCLE) calls_forwarded = models.IntegerField(default=0) calls_blocked = models.IntegerField(default=0) texts_forwarded = models.IntegerField(default=0) texts_blocked = models.IntegerField(default=0) created_at = models.DateTimeField(null=True, auto_now_add=True) @property def remaining_minutes(self) -> int: # return a 0 or positive int for remaining minutes return floor(max(self.remaining_seconds, 0) / 60) @property def calls_and_texts_forwarded(self) -> int: return self.calls_forwarded + self.texts_forwarded @property def calls_and_texts_blocked(self) -> int: return self.calls_blocked + self.texts_blocked @property def storing_phone_log(self) -> bool: return bool(self.user.profile.store_phone_log) def save(self, *args, **kwargs): try: realphone = RealPhone.verified_objects.get(user=self.user) except RealPhone.DoesNotExist: raise ValidationError("User does not have a verified real phone.") # if this number exists for this user, this is an update call existing_numbers = RelayNumber.objects.filter(user=self.user) this_number = existing_numbers.filter(number=self.number).first() update_user_profile_last_engagement = False if this_number and this_number.id == self.id: update_user_profile_last_engagement = any( [ self.enabled != this_number.enabled, self.calls_forwarded != this_number.calls_forwarded, self.calls_blocked != this_number.calls_blocked, self.texts_forwarded != this_number.texts_forwarded, self.texts_blocked != this_number.texts_blocked, ] ) if update_user_profile_last_engagement: self.user.profile.last_engagement = datetime.now(UTC) self.user.profile.save() return super().save(*args, **kwargs) elif existing_numbers.exists(): raise ValidationError("User can have only one relay number.") if RelayNumber.objects.filter(number=self.number).exists(): raise ValidationError("This number is already claimed.") use_twilio = ( self.vendor == "twilio" and not settings.PHONES_NO_CLIENT_CALLS_IN_TEST ) if use_twilio: # Before saving into DB provision the number in Twilio client = twilio_client() # Since this will charge the Twilio account, first see if this # is running with TEST creds to avoid charges. if settings.TWILIO_TEST_ACCOUNT_SID: client = phones_config().twilio_test_client twilio_incoming_number = client.incoming_phone_numbers.create( phone_number=self.number, sms_application_sid=settings.TWILIO_SMS_APPLICATION_SID, voice_application_sid=settings.TWILIO_SMS_APPLICATION_SID, ) # Assume number was selected through suggested_numbers, so same country # as realphone self.country_code = realphone.country_code.upper() # Add numbers to the Relay messaging service, so it goes into our # A2P 10DLC campaigns if use_twilio and self.country_code in settings.TWILIO_NEEDS_10DLC_CAMPAIGN: if settings.TWILIO_MESSAGING_SERVICE_SID: register_with_messaging_service(client, twilio_incoming_number.sid) else: events_logger.warning( "Skipping Twilio Messaging Service registration, since" " TWILIO_MESSAGING_SERVICE_SID is empty.", extra={"number_sid": twilio_incoming_number.sid}, ) return super().save(*args, **kwargs) class CachedList: """A list that is stored in a cache.""" def __init__(self, cache_key: str) -> None: self.cache_key = cache_key cache_value = cache.get(self.cache_key, "") if cache_value: self.data = cache_value.split(",") else: self.data = [] def __iter__(self) -> Iterator[str]: return (item for item in self.data) def append(self, item: str) -> None: self.data.append(item) self.data.sort() cache.set(self.cache_key, ",".join(self.data)) def register_with_messaging_service(client: Client, number_sid: str) -> None: """Register a Twilio US phone number with a Messaging Service.""" if not settings.TWILIO_MESSAGING_SERVICE_SID: raise ValueError( "settings.TWILIO_MESSAGING_SERVICE_SID must contain a value when calling " "register_with_messaging_service" ) closed_sids = CachedList("twilio_messaging_service_closed") for service_sid in settings.TWILIO_MESSAGING_SERVICE_SID: if service_sid in closed_sids: continue try: client.messaging.v1.services(service_sid).phone_numbers.create( phone_number_sid=number_sid ) except TwilioRestException as err: log_extra = { "err_msg": err.msg, "status": err.status, "code": err.code, "service_sid": service_sid, "number_sid": number_sid, } if err.status == 409 and err.code == 21710: # Log "Phone Number is already in the Messaging Service" # https://www.twilio.com/docs/api/errors/21710 events_logger.warning("twilio_messaging_service", extra=log_extra) return elif err.status == 412 and err.code == 21714: # Log "Number Pool size limit reached", continue to next service # https://www.twilio.com/docs/api/errors/21714 closed_sids.append(service_sid) events_logger.warning("twilio_messaging_service", extra=log_extra) else: # Log and re-raise other Twilio errors events_logger.error("twilio_messaging_service", extra=log_extra) raise else: return # Successfully registered with service raise Exception("All services in TWILIO_MESSAGING_SERVICE_SID are full") @receiver(post_save, sender=RelayNumber) def relaynumber_post_save(sender, instance, created, **kwargs): # don't do anything if running migrations if isinstance(instance, MigrationRecorder.Migration): return # TODO: if IQ_FOR_NEW_NUMBERS, send welcome message via IQ if not instance.vendor == "twilio": return if created: incr_if_enabled("phones_RelayNumber.post_save_created_send_welcome") if not settings.PHONES_NO_CLIENT_CALLS_IN_TEST: # only send welcome vCard when creating new record send_welcome_message(instance.user, instance) def send_welcome_message(user, relay_number): real_phone = RealPhone.verified_objects.get(user=user) if not settings.SITE_ORIGIN: raise ValueError( "settings.SITE_ORIGIN must contain a value when calling " "send_welcome_message" ) media_url = settings.SITE_ORIGIN + reverse( "vCard", kwargs={"lookup_key": relay_number.vcard_lookup_key} ) client = twilio_client() client.messages.create( body=( "Welcome to Relay phone masking!" " 🎉 Please add your number to your contacts." " This will help you identify your Relay messages and calls." ), from_=settings.TWILIO_MAIN_NUMBER, to=real_phone.number, media_url=[media_url], ) def last_inbound_date_default(): return datetime.now(UTC) class InboundContact(models.Model): relay_number = models.ForeignKey(RelayNumber, on_delete=models.CASCADE) inbound_number = models.CharField(max_length=15) last_inbound_date = models.DateTimeField(default=last_inbound_date_default) last_inbound_type = models.CharField( max_length=4, choices=LAST_CONTACT_TYPE_CHOICES, default="text" ) num_calls = models.PositiveIntegerField(default=0) num_calls_blocked = models.PositiveIntegerField(default=0) last_call_date = models.DateTimeField(null=True) num_texts = models.PositiveIntegerField(default=0) num_texts_blocked = models.PositiveIntegerField(default=0) last_text_date = models.DateTimeField(null=True) blocked = models.BooleanField(default=False) class Meta: indexes = [models.Index(fields=["relay_number", "inbound_number"])] def suggested_numbers(user): try: real_phone = RealPhone.verified_objects.get_for_user(user) except RealPhone.DoesNotExist: raise BadRequest( "available_numbers: This user hasn't verified a RealPhone yet." ) existing_number = RelayNumber.objects.filter(user=user) if existing_number: raise BadRequest( "available_numbers: Another RelayNumber already exists for this user." ) real_num = real_phone.number client = twilio_client() avail_nums = client.available_phone_numbers(real_phone.country_code) # TODO: can we make multiple pattern searches in a single Twilio API request same_prefix_options = [] # look for numbers with same area code and 3-number prefix contains = f"{real_num[:8]}****" if real_num else "" twilio_nums = avail_nums.local.list(contains=contains, limit=10) same_prefix_options.extend(convert_twilio_numbers_to_dict(twilio_nums)) # look for numbers with same area code, 2-number prefix and suffix contains = f"{real_num[:7]}***{real_num[10:]}" if real_num else "" twilio_nums = avail_nums.local.list(contains=contains, limit=10) same_prefix_options.extend(convert_twilio_numbers_to_dict(twilio_nums)) # look for numbers with same area code and 1-number prefix contains = f"{real_num[:6]}******" if real_num else "" twilio_nums = avail_nums.local.list(contains=contains, limit=10) same_prefix_options.extend(convert_twilio_numbers_to_dict(twilio_nums)) # look for same number in other area codes contains = f"+1***{real_num[5:]}" if real_num else "" twilio_nums = avail_nums.local.list(contains=contains, limit=10) other_areas_options = convert_twilio_numbers_to_dict(twilio_nums) # look for any numbers in the area code contains = f"{real_num[:5]}*******" if real_num else "" twilio_nums = avail_nums.local.list(contains=contains, limit=10) same_area_options = convert_twilio_numbers_to_dict(twilio_nums) # look for any available numbers twilio_nums = avail_nums.local.list(limit=10) random_options = convert_twilio_numbers_to_dict(twilio_nums) return { "real_num": real_num, "same_prefix_options": same_prefix_options, "other_areas_options": other_areas_options, "same_area_options": same_area_options, "random_options": random_options, } def location_numbers(location, country_code=DEFAULT_REGION): client = twilio_client() avail_nums = client.available_phone_numbers(country_code) twilio_nums = avail_nums.local.list(in_locality=location, limit=10) return convert_twilio_numbers_to_dict(twilio_nums) def area_code_numbers(area_code, country_code=DEFAULT_REGION): client = twilio_client() avail_nums = client.available_phone_numbers(country_code) twilio_nums = avail_nums.local.list(area_code=area_code, limit=10) return convert_twilio_numbers_to_dict(twilio_nums) def convert_twilio_numbers_to_dict(twilio_numbers): """ To serialize twilio numbers to JSON for the API, we need to convert them into dictionaries. """ numbers_as_dicts = [] for twilio_number in twilio_numbers: number = {} number["friendly_name"] = twilio_number.friendly_name number["iso_country"] = twilio_number.iso_country number["locality"] = twilio_number.locality number["phone_number"] = twilio_number.phone_number number["postal_code"] = twilio_number.postal_code number["region"] = twilio_number.region numbers_as_dicts.append(number) return numbers_as_dicts