privaterelay/glean_interface.py (291 lines of code) (raw):

"""Relay interface to EventsServerEventLogger generated by glean_parser.""" from __future__ import annotations from datetime import datetime from logging import getLogger from typing import Any, Literal, NamedTuple from django.conf import settings from django.contrib.auth.models import User from django.http import HttpRequest from ipware import get_client_ip from emails.models import DomainAddress, RelayAddress from .glean.server_events import GLEAN_EVENT_MOZLOG_TYPE, EventsServerEventLogger from .types import RELAY_CHANNEL_NAME # Enumerate the mask setting that caused an email to not be forwarded. EmailBlockedReason = Literal[ "block_all", # The mask is set to block all mail "block_promotional", # The mask is set to block promotional / list mail ] def _opt_dt_to_glean(value: datetime | None) -> int: """Convert an optional datetime to an integer timestamp.""" if value == datetime.min: return -2 # datetime was not checked if value is None: return -1 # datetime does not exist return int(value.timestamp()) def _opt_str_to_glean(value: str | None) -> str: """Convert an optional string to a (possibly empty) string.""" return "" if value is None else value class RequestData(NamedTuple): """Extract and store data from the request.""" user_agent: str | None = None ip_address: str | None = None @classmethod def from_request(cls, request: HttpRequest) -> RequestData: user_agent = request.headers.get("user-agent", None) client_ip, is_routable = get_client_ip(request) ip_address = client_ip if (client_ip and is_routable) else None return cls(user_agent=user_agent, ip_address=ip_address) class UserData(NamedTuple): """Extract and store data from a Relay user.""" metrics_enabled: bool fxa_id: str | None = None n_random_masks: int = 0 n_domain_masks: int = 0 n_deleted_random_masks: int = 0 n_deleted_domain_masks: int = 0 date_joined_relay: datetime | None = None date_joined_premium: datetime | None = None premium_status: str = "" has_extension: bool = False date_got_extension: datetime | None = None @classmethod def from_user(cls, user: User) -> UserData: metrics_enabled = user.profile.metrics_enabled if not metrics_enabled: return cls(metrics_enabled=False) fxa_id = user.profile.metrics_fxa_id or None n_random_masks = user.relayaddress_set.count() n_domain_masks = user.domainaddress_set.count() n_deleted_random_masks = user.profile.num_deleted_relay_addresses n_deleted_domain_masks = user.profile.num_deleted_domain_addresses date_joined_relay = user.date_joined if user.profile.has_premium: if user.profile.has_phone: date_joined_premium = user.profile.date_subscribed_phone else: date_joined_premium = user.profile.date_subscribed else: date_joined_premium = None premium_status = user.profile.metrics_premium_status # Until more accurate date_got_extension is calculated (MPP-3765) # do not check for when the user got extension has_extension = False date_got_extension = datetime.min return cls( metrics_enabled=True, fxa_id=fxa_id, n_random_masks=n_random_masks, n_domain_masks=n_domain_masks, n_deleted_random_masks=n_deleted_random_masks, n_deleted_domain_masks=n_deleted_domain_masks, date_joined_relay=date_joined_relay, date_joined_premium=date_joined_premium, premium_status=premium_status, has_extension=has_extension, date_got_extension=date_got_extension, ) class EmailMaskData(NamedTuple): """Extract and store data from a Relay email mask.""" is_random_mask: bool has_website: bool @classmethod def from_mask(cls, mask: RelayAddress | DomainAddress) -> EmailMaskData: if isinstance(mask, RelayAddress): is_random_mask = True has_website = bool(mask.generated_for) else: is_random_mask = False has_website = False return EmailMaskData(is_random_mask=is_random_mask, has_website=has_website) class RelayGleanLogger(EventsServerEventLogger): """Extend the generated EventsServerEventLogger for Relay usage.""" def __init__( self, application_id: str, app_display_version: str, channel: RELAY_CHANNEL_NAME, ): if not settings.GLEAN_EVENT_MOZLOG_TYPE == GLEAN_EVENT_MOZLOG_TYPE: raise ValueError( "settings.GLEAN_EVENT_MOZLOG_TYPE must equal GLEAN_EVENT_MOZLOG_TYPE" ) self._logger = getLogger(GLEAN_EVENT_MOZLOG_TYPE) super().__init__( application_id=application_id, app_display_version=app_display_version, channel=channel, ) def emit_record(self, now: datetime, ping: dict[str, Any]) -> None: """Emit record as a log instead of a print()""" self._logger.info(GLEAN_EVENT_MOZLOG_TYPE, extra=ping) def log_email_mask_created( self, *, request: HttpRequest | None = None, mask: RelayAddress | DomainAddress, created_by_api: bool, ) -> None: """Log that a Relay email mask was created.""" user_data = UserData.from_user(mask.user) if not user_data.metrics_enabled: return request_data = RequestData.from_request(request) if request else RequestData() mask_data = EmailMaskData.from_mask(mask) self.record_email_mask_created( user_agent=_opt_str_to_glean(request_data.user_agent), ip_address=_opt_str_to_glean(request_data.ip_address), fxa_id=_opt_str_to_glean(user_data.fxa_id), platform="", n_random_masks=user_data.n_random_masks, n_domain_masks=user_data.n_domain_masks, n_deleted_random_masks=user_data.n_deleted_random_masks, n_deleted_domain_masks=user_data.n_deleted_domain_masks, date_joined_relay=_opt_dt_to_glean(user_data.date_joined_relay), premium_status=user_data.premium_status, date_joined_premium=_opt_dt_to_glean(user_data.date_joined_premium), has_extension=user_data.has_extension, date_got_extension=_opt_dt_to_glean(user_data.date_got_extension), is_random_mask=mask_data.is_random_mask, created_by_api=created_by_api, has_website=mask_data.has_website, ) def log_email_mask_label_updated( self, *, request: HttpRequest, mask: RelayAddress | DomainAddress, ) -> None: """Log that a Relay email mask's label was changed.""" user_data = UserData.from_user(mask.user) if not user_data.metrics_enabled: return request_data = RequestData.from_request(request) mask_data = EmailMaskData.from_mask(mask) self.record_email_mask_label_updated( user_agent=_opt_str_to_glean(request_data.user_agent), ip_address=_opt_str_to_glean(request_data.ip_address), fxa_id=_opt_str_to_glean(user_data.fxa_id), platform="", n_random_masks=user_data.n_random_masks, n_domain_masks=user_data.n_domain_masks, n_deleted_random_masks=user_data.n_deleted_random_masks, n_deleted_domain_masks=user_data.n_deleted_domain_masks, date_joined_relay=_opt_dt_to_glean(user_data.date_joined_relay), premium_status=user_data.premium_status, date_joined_premium=_opt_dt_to_glean(user_data.date_joined_premium), has_extension=user_data.has_extension, date_got_extension=_opt_dt_to_glean(user_data.date_got_extension), is_random_mask=mask_data.is_random_mask, ) def log_email_mask_deleted( self, *, request: HttpRequest, user: User, is_random_mask: bool, ) -> None: """Log that a Relay email mask was deleted.""" user_data = UserData.from_user(user) if not user_data.metrics_enabled: return request_data = RequestData.from_request(request) self.record_email_mask_deleted( user_agent=_opt_str_to_glean(request_data.user_agent), ip_address=_opt_str_to_glean(request_data.ip_address), fxa_id=_opt_str_to_glean(user_data.fxa_id), platform="", n_random_masks=user_data.n_random_masks, n_domain_masks=user_data.n_domain_masks, n_deleted_random_masks=user_data.n_deleted_random_masks, n_deleted_domain_masks=user_data.n_deleted_domain_masks, date_joined_relay=_opt_dt_to_glean(user_data.date_joined_relay), premium_status=user_data.premium_status, date_joined_premium=_opt_dt_to_glean(user_data.date_joined_premium), has_extension=user_data.has_extension, date_got_extension=_opt_dt_to_glean(user_data.date_got_extension), is_random_mask=is_random_mask, ) def log_email_forwarded( self, *, mask: RelayAddress | DomainAddress, is_reply: bool = False, ) -> None: """Log that an email was forwarded.""" user_data = UserData.from_user(mask.user) if not user_data.metrics_enabled: return request_data = RequestData() mask_data = EmailMaskData.from_mask(mask) self.record_email_forwarded( user_agent=_opt_str_to_glean(request_data.user_agent), ip_address=_opt_str_to_glean(request_data.ip_address), fxa_id=_opt_str_to_glean(user_data.fxa_id), platform="", n_random_masks=user_data.n_random_masks, n_domain_masks=user_data.n_domain_masks, n_deleted_random_masks=user_data.n_deleted_random_masks, n_deleted_domain_masks=user_data.n_deleted_domain_masks, date_joined_relay=_opt_dt_to_glean(user_data.date_joined_relay), premium_status=user_data.premium_status, date_joined_premium=_opt_dt_to_glean(user_data.date_joined_premium), has_extension=user_data.has_extension, date_got_extension=_opt_dt_to_glean(user_data.date_got_extension), is_random_mask=mask_data.is_random_mask, is_reply=is_reply, ) def log_email_blocked( self, *, mask: RelayAddress | DomainAddress, reason: EmailBlockedReason, is_reply: bool = False, ) -> None: """Log that an email was not forwarded.""" user_data = UserData.from_user(mask.user) if not user_data.metrics_enabled: return request_data = RequestData() mask_data = EmailMaskData.from_mask(mask) self.record_email_blocked( user_agent=_opt_str_to_glean(request_data.user_agent), ip_address=_opt_str_to_glean(request_data.ip_address), fxa_id=_opt_str_to_glean(user_data.fxa_id), platform="", n_random_masks=user_data.n_random_masks, n_domain_masks=user_data.n_domain_masks, n_deleted_random_masks=user_data.n_deleted_random_masks, n_deleted_domain_masks=user_data.n_deleted_domain_masks, date_joined_relay=_opt_dt_to_glean(user_data.date_joined_relay), premium_status=user_data.premium_status, date_joined_premium=_opt_dt_to_glean(user_data.date_joined_premium), has_extension=user_data.has_extension, date_got_extension=_opt_dt_to_glean(user_data.date_got_extension), is_random_mask=mask_data.is_random_mask, is_reply=is_reply, reason=reason, ) def log_api_accessed(self, request: HttpRequest) -> None: """Log that any Relay API endpoint was accessed.""" if not request.user or not request.user.is_authenticated: return request_data = RequestData.from_request(request) user_data = UserData.from_user(request.user) self.record_api_accessed( user_agent=_opt_str_to_glean(request_data.user_agent), ip_address=_opt_str_to_glean(request_data.ip_address), endpoint=request.path, method=_opt_str_to_glean(request.method), fxa_id=_opt_str_to_glean(user_data.fxa_id), ) def log_text_received( self, *, user: User, ) -> None: """Log that a text message was received.""" user_data = UserData.from_user(user) if not user_data.metrics_enabled: return request_data = RequestData() self.record_phone_text_received( user_agent=_opt_str_to_glean(request_data.user_agent), ip_address=_opt_str_to_glean(request_data.ip_address), fxa_id=_opt_str_to_glean(user_data.fxa_id), ) def log_call_received( self, *, user: User, ) -> None: """Log that a phone call was received.""" user_data = UserData.from_user(user) if not user_data.metrics_enabled: return request_data = RequestData() self.record_phone_call_received( user_agent=_opt_str_to_glean(request_data.user_agent), ip_address=_opt_str_to_glean(request_data.ip_address), fxa_id=_opt_str_to_glean(user_data.fxa_id), )