phones/management/commands/delete_phone_data.py (102 lines of code) (raw):

from argparse import ArgumentParser from dataclasses import dataclass from typing import Any from django.core.management.base import BaseCommand, CommandError from allauth.socialaccount.models import SocialAccount from phones.models import InboundContact, RealPhone, RelayNumber class Command(BaseCommand): help = "Deletes phone data, so a user can re-enroll." def add_arguments(self, parser: ArgumentParser) -> None: parser.add_argument("fxa_id", help="The user's FxA ID") parser.add_argument( "-f", "--force", action="store_true", help="Skip confirmation and delete any found data", ) def handle(self, *args: Any, **kwargs: Any) -> None | str: fxa_id: str = kwargs["fxa_id"] skip_confirmation: bool = kwargs["force"] try: data = _PhoneData.from_fxa(fxa_id) except SocialAccount.DoesNotExist: raise CommandError(f"No user with FxA ID '{fxa_id}'.") report = f"Found a matching user:\n\n{data.bullet_report()}\n" self.stdout.write(report) if not data.has_data: return "User has NO PHONE DATA to delete." confirmed = skip_confirmation or self.confirm() if confirmed: data.reset() return "Deleted user's phone data." return "User still has their phone data... FOR NOW!" def confirm(self) -> bool: answer = "" first_time = True while answer not in ("Y", "N"): if first_time: first_time = False else: self.stdout.write("Please answer 'Y' or 'N'") raw_answer = input("Delete this user's phone data? (Y/N) ") answer = raw_answer.strip().upper() return answer == "Y" @dataclass class _PhoneData: """Helper class to hold phone data for a user.""" fxa: SocialAccount real_phones: list[RealPhone] | None = None relay_phone: RelayNumber | None = None inbound_contact_count: int = 0 @classmethod def from_fxa(cls, fxa_id: str) -> "_PhoneData": """Initialize from an FxA ID.""" fxa = SocialAccount.objects.get(provider="fxa", uid=fxa_id) real_phones = RealPhone.objects.filter(user=fxa.user) if not real_phones.exists(): return cls(fxa=fxa) try: relay_phone = RelayNumber.objects.get(user=fxa.user) inbound_contact_count = InboundContact.objects.filter( relay_number=relay_phone ).count() except RelayNumber.DoesNotExist: return cls(fxa=fxa, real_phones=list(real_phones)) return cls( fxa=fxa, real_phones=list(real_phones), relay_phone=relay_phone, inbound_contact_count=inbound_contact_count, ) @property def has_data(self) -> bool: """Return True if the user has phone data to reset.""" return self.real_phones is not None and len(self.real_phones) > 0 @property def real_numbers(self) -> list[str] | None: """Get user's real phone number, if it exists.""" if self.real_phones: return [real_phone.number for real_phone in self.real_phones] return None @property def relay_number(self) -> str | None: """Get user's Relay phone mask number, if it exists.""" if self.relay_phone: return self.relay_phone.number return None def bullet_report(self) -> str: """Return a bulleted list of the user's data.""" return ( f"* FxA ID: {self.fxa.uid}\n" f"* User ID: {self.fxa.user_id}\n" f"* Email: {self.fxa.user.email}\n" f"* Real Phone: " + ( "\n* Real Phone: ".join(number for number in self.real_numbers) if self.real_numbers else "<NO REAL PHONE>" ) + f"\n* Relay Phone: {self.relay_number or '<NO RELAY PHONE>'}\n" f"* Inbound Contacts: {self.inbound_contact_count}\n" ) def reset(self) -> None: """Reset the user's phone data, so they can re-enroll with new numbers.""" if self.relay_phone: if self.inbound_contact_count: InboundContact.objects.filter(relay_number=self.relay_phone).delete() self.relay_phone.delete() for real_phone in self.real_phones or []: real_phone.delete()