privaterelay/glean_interface.py (291 lines of code) (raw):
"""Relay interface to EventsServerEventLogger generated by glean_parser."""
from __future__ import annotations
from datetime import datetime
from logging import getLogger
from typing import Any, Literal, NamedTuple
from django.conf import settings
from django.contrib.auth.models import User
from django.http import HttpRequest
from ipware import get_client_ip
from emails.models import DomainAddress, RelayAddress
from .glean.server_events import GLEAN_EVENT_MOZLOG_TYPE, EventsServerEventLogger
from .types import RELAY_CHANNEL_NAME
# Enumerate the mask setting that caused an email to not be forwarded.
EmailBlockedReason = Literal[
"block_all", # The mask is set to block all mail
"block_promotional", # The mask is set to block promotional / list mail
]
def _opt_dt_to_glean(value: datetime | None) -> int:
"""Convert an optional datetime to an integer timestamp."""
if value == datetime.min:
return -2 # datetime was not checked
if value is None:
return -1 # datetime does not exist
return int(value.timestamp())
def _opt_str_to_glean(value: str | None) -> str:
"""Convert an optional string to a (possibly empty) string."""
return "" if value is None else value
class RequestData(NamedTuple):
"""Extract and store data from the request."""
user_agent: str | None = None
ip_address: str | None = None
@classmethod
def from_request(cls, request: HttpRequest) -> RequestData:
user_agent = request.headers.get("user-agent", None)
client_ip, is_routable = get_client_ip(request)
ip_address = client_ip if (client_ip and is_routable) else None
return cls(user_agent=user_agent, ip_address=ip_address)
class UserData(NamedTuple):
"""Extract and store data from a Relay user."""
metrics_enabled: bool
fxa_id: str | None = None
n_random_masks: int = 0
n_domain_masks: int = 0
n_deleted_random_masks: int = 0
n_deleted_domain_masks: int = 0
date_joined_relay: datetime | None = None
date_joined_premium: datetime | None = None
premium_status: str = ""
has_extension: bool = False
date_got_extension: datetime | None = None
@classmethod
def from_user(cls, user: User) -> UserData:
metrics_enabled = user.profile.metrics_enabled
if not metrics_enabled:
return cls(metrics_enabled=False)
fxa_id = user.profile.metrics_fxa_id or None
n_random_masks = user.relayaddress_set.count()
n_domain_masks = user.domainaddress_set.count()
n_deleted_random_masks = user.profile.num_deleted_relay_addresses
n_deleted_domain_masks = user.profile.num_deleted_domain_addresses
date_joined_relay = user.date_joined
if user.profile.has_premium:
if user.profile.has_phone:
date_joined_premium = user.profile.date_subscribed_phone
else:
date_joined_premium = user.profile.date_subscribed
else:
date_joined_premium = None
premium_status = user.profile.metrics_premium_status
# Until more accurate date_got_extension is calculated (MPP-3765)
# do not check for when the user got extension
has_extension = False
date_got_extension = datetime.min
return cls(
metrics_enabled=True,
fxa_id=fxa_id,
n_random_masks=n_random_masks,
n_domain_masks=n_domain_masks,
n_deleted_random_masks=n_deleted_random_masks,
n_deleted_domain_masks=n_deleted_domain_masks,
date_joined_relay=date_joined_relay,
date_joined_premium=date_joined_premium,
premium_status=premium_status,
has_extension=has_extension,
date_got_extension=date_got_extension,
)
class EmailMaskData(NamedTuple):
"""Extract and store data from a Relay email mask."""
is_random_mask: bool
has_website: bool
@classmethod
def from_mask(cls, mask: RelayAddress | DomainAddress) -> EmailMaskData:
if isinstance(mask, RelayAddress):
is_random_mask = True
has_website = bool(mask.generated_for)
else:
is_random_mask = False
has_website = False
return EmailMaskData(is_random_mask=is_random_mask, has_website=has_website)
class RelayGleanLogger(EventsServerEventLogger):
"""Extend the generated EventsServerEventLogger for Relay usage."""
def __init__(
self,
application_id: str,
app_display_version: str,
channel: RELAY_CHANNEL_NAME,
):
if not settings.GLEAN_EVENT_MOZLOG_TYPE == GLEAN_EVENT_MOZLOG_TYPE:
raise ValueError(
"settings.GLEAN_EVENT_MOZLOG_TYPE must equal GLEAN_EVENT_MOZLOG_TYPE"
)
self._logger = getLogger(GLEAN_EVENT_MOZLOG_TYPE)
super().__init__(
application_id=application_id,
app_display_version=app_display_version,
channel=channel,
)
def emit_record(self, now: datetime, ping: dict[str, Any]) -> None:
"""Emit record as a log instead of a print()"""
self._logger.info(GLEAN_EVENT_MOZLOG_TYPE, extra=ping)
def log_email_mask_created(
self,
*,
request: HttpRequest | None = None,
mask: RelayAddress | DomainAddress,
created_by_api: bool,
) -> None:
"""Log that a Relay email mask was created."""
user_data = UserData.from_user(mask.user)
if not user_data.metrics_enabled:
return
request_data = RequestData.from_request(request) if request else RequestData()
mask_data = EmailMaskData.from_mask(mask)
self.record_email_mask_created(
user_agent=_opt_str_to_glean(request_data.user_agent),
ip_address=_opt_str_to_glean(request_data.ip_address),
fxa_id=_opt_str_to_glean(user_data.fxa_id),
platform="",
n_random_masks=user_data.n_random_masks,
n_domain_masks=user_data.n_domain_masks,
n_deleted_random_masks=user_data.n_deleted_random_masks,
n_deleted_domain_masks=user_data.n_deleted_domain_masks,
date_joined_relay=_opt_dt_to_glean(user_data.date_joined_relay),
premium_status=user_data.premium_status,
date_joined_premium=_opt_dt_to_glean(user_data.date_joined_premium),
has_extension=user_data.has_extension,
date_got_extension=_opt_dt_to_glean(user_data.date_got_extension),
is_random_mask=mask_data.is_random_mask,
created_by_api=created_by_api,
has_website=mask_data.has_website,
)
def log_email_mask_label_updated(
self,
*,
request: HttpRequest,
mask: RelayAddress | DomainAddress,
) -> None:
"""Log that a Relay email mask's label was changed."""
user_data = UserData.from_user(mask.user)
if not user_data.metrics_enabled:
return
request_data = RequestData.from_request(request)
mask_data = EmailMaskData.from_mask(mask)
self.record_email_mask_label_updated(
user_agent=_opt_str_to_glean(request_data.user_agent),
ip_address=_opt_str_to_glean(request_data.ip_address),
fxa_id=_opt_str_to_glean(user_data.fxa_id),
platform="",
n_random_masks=user_data.n_random_masks,
n_domain_masks=user_data.n_domain_masks,
n_deleted_random_masks=user_data.n_deleted_random_masks,
n_deleted_domain_masks=user_data.n_deleted_domain_masks,
date_joined_relay=_opt_dt_to_glean(user_data.date_joined_relay),
premium_status=user_data.premium_status,
date_joined_premium=_opt_dt_to_glean(user_data.date_joined_premium),
has_extension=user_data.has_extension,
date_got_extension=_opt_dt_to_glean(user_data.date_got_extension),
is_random_mask=mask_data.is_random_mask,
)
def log_email_mask_deleted(
self,
*,
request: HttpRequest,
user: User,
is_random_mask: bool,
) -> None:
"""Log that a Relay email mask was deleted."""
user_data = UserData.from_user(user)
if not user_data.metrics_enabled:
return
request_data = RequestData.from_request(request)
self.record_email_mask_deleted(
user_agent=_opt_str_to_glean(request_data.user_agent),
ip_address=_opt_str_to_glean(request_data.ip_address),
fxa_id=_opt_str_to_glean(user_data.fxa_id),
platform="",
n_random_masks=user_data.n_random_masks,
n_domain_masks=user_data.n_domain_masks,
n_deleted_random_masks=user_data.n_deleted_random_masks,
n_deleted_domain_masks=user_data.n_deleted_domain_masks,
date_joined_relay=_opt_dt_to_glean(user_data.date_joined_relay),
premium_status=user_data.premium_status,
date_joined_premium=_opt_dt_to_glean(user_data.date_joined_premium),
has_extension=user_data.has_extension,
date_got_extension=_opt_dt_to_glean(user_data.date_got_extension),
is_random_mask=is_random_mask,
)
def log_email_forwarded(
self,
*,
mask: RelayAddress | DomainAddress,
is_reply: bool = False,
) -> None:
"""Log that an email was forwarded."""
user_data = UserData.from_user(mask.user)
if not user_data.metrics_enabled:
return
request_data = RequestData()
mask_data = EmailMaskData.from_mask(mask)
self.record_email_forwarded(
user_agent=_opt_str_to_glean(request_data.user_agent),
ip_address=_opt_str_to_glean(request_data.ip_address),
fxa_id=_opt_str_to_glean(user_data.fxa_id),
platform="",
n_random_masks=user_data.n_random_masks,
n_domain_masks=user_data.n_domain_masks,
n_deleted_random_masks=user_data.n_deleted_random_masks,
n_deleted_domain_masks=user_data.n_deleted_domain_masks,
date_joined_relay=_opt_dt_to_glean(user_data.date_joined_relay),
premium_status=user_data.premium_status,
date_joined_premium=_opt_dt_to_glean(user_data.date_joined_premium),
has_extension=user_data.has_extension,
date_got_extension=_opt_dt_to_glean(user_data.date_got_extension),
is_random_mask=mask_data.is_random_mask,
is_reply=is_reply,
)
def log_email_blocked(
self,
*,
mask: RelayAddress | DomainAddress,
reason: EmailBlockedReason,
is_reply: bool = False,
) -> None:
"""Log that an email was not forwarded."""
user_data = UserData.from_user(mask.user)
if not user_data.metrics_enabled:
return
request_data = RequestData()
mask_data = EmailMaskData.from_mask(mask)
self.record_email_blocked(
user_agent=_opt_str_to_glean(request_data.user_agent),
ip_address=_opt_str_to_glean(request_data.ip_address),
fxa_id=_opt_str_to_glean(user_data.fxa_id),
platform="",
n_random_masks=user_data.n_random_masks,
n_domain_masks=user_data.n_domain_masks,
n_deleted_random_masks=user_data.n_deleted_random_masks,
n_deleted_domain_masks=user_data.n_deleted_domain_masks,
date_joined_relay=_opt_dt_to_glean(user_data.date_joined_relay),
premium_status=user_data.premium_status,
date_joined_premium=_opt_dt_to_glean(user_data.date_joined_premium),
has_extension=user_data.has_extension,
date_got_extension=_opt_dt_to_glean(user_data.date_got_extension),
is_random_mask=mask_data.is_random_mask,
is_reply=is_reply,
reason=reason,
)
def log_api_accessed(self, request: HttpRequest) -> None:
"""Log that any Relay API endpoint was accessed."""
if not request.user or not request.user.is_authenticated:
return
request_data = RequestData.from_request(request)
user_data = UserData.from_user(request.user)
self.record_api_accessed(
user_agent=_opt_str_to_glean(request_data.user_agent),
ip_address=_opt_str_to_glean(request_data.ip_address),
endpoint=request.path,
method=_opt_str_to_glean(request.method),
fxa_id=_opt_str_to_glean(user_data.fxa_id),
)
def log_text_received(
self,
*,
user: User,
) -> None:
"""Log that a text message was received."""
user_data = UserData.from_user(user)
if not user_data.metrics_enabled:
return
request_data = RequestData()
self.record_phone_text_received(
user_agent=_opt_str_to_glean(request_data.user_agent),
ip_address=_opt_str_to_glean(request_data.ip_address),
fxa_id=_opt_str_to_glean(user_data.fxa_id),
)
def log_call_received(
self,
*,
user: User,
) -> None:
"""Log that a phone call was received."""
user_data = UserData.from_user(user)
if not user_data.metrics_enabled:
return
request_data = RequestData()
self.record_phone_call_received(
user_agent=_opt_str_to_glean(request_data.user_agent),
ip_address=_opt_str_to_glean(request_data.ip_address),
fxa_id=_opt_str_to_glean(user_data.fxa_id),
)