api/authentication.py (115 lines of code) (raw):

import logging import shlex from datetime import UTC, datetime from typing import Any from django.conf import settings from django.core.cache import cache import requests from allauth.socialaccount.models import SocialAccount from rest_framework.authentication import BaseAuthentication, get_authorization_header from rest_framework.exceptions import ( APIException, AuthenticationFailed, NotFound, ParseError, PermissionDenied, ) logger = logging.getLogger("events") INTROSPECT_TOKEN_URL = "{}/introspect".format( settings.SOCIALACCOUNT_PROVIDERS["fxa"]["OAUTH_ENDPOINT"] ) def get_cache_key(token): return hash(token) def introspect_token(token: str) -> dict[str, Any]: try: fxa_resp = requests.post( INTROSPECT_TOKEN_URL, json={"token": token}, timeout=settings.FXA_REQUESTS_TIMEOUT_SECONDS, ) except Exception as exc: logger.error( "Could not introspect token with FXA.", extra={"error_cls": type(exc), "error": shlex.quote(str(exc))}, ) raise AuthenticationFailed("Could not introspect token with FXA.") fxa_resp_data = {"status_code": fxa_resp.status_code, "json": {}} try: fxa_resp_data["json"] = fxa_resp.json() except requests.exceptions.JSONDecodeError: logger.error( "JSONDecodeError from FXA introspect response.", extra={"fxa_response": shlex.quote(fxa_resp.text)}, ) raise AuthenticationFailed("JSONDecodeError from FXA introspect response") return fxa_resp_data def get_fxa_uid_from_oauth_token(token: str, use_cache: bool = True) -> str: # set a default cache_timeout, but this will be overriden to match # the 'exp' time in the JWT returned by FxA cache_timeout = 60 cache_key = get_cache_key(token) if not use_cache: fxa_resp_data = introspect_token(token) else: # set a default fxa_resp_data, so any error during introspection # will still cache for at least cache_timeout to prevent an outage # from causing useless run-away repetitive introspection requests fxa_resp_data = {"status_code": None, "json": {}} try: cached_fxa_resp_data = cache.get(cache_key) if cached_fxa_resp_data: fxa_resp_data = cached_fxa_resp_data else: # no cached data, get new fxa_resp_data = introspect_token(token) except AuthenticationFailed: raise finally: # Store potential valid response, errors, inactive users, etc. from FxA # for at least 60 seconds. Valid access_token cache extended after checking. cache.set(cache_key, fxa_resp_data, cache_timeout) if fxa_resp_data["status_code"] is None: raise APIException("Previous FXA call failed, wait to retry.") if not fxa_resp_data["status_code"] == 200: raise APIException("Did not receive a 200 response from FXA.") if not fxa_resp_data["json"].get("active"): raise AuthenticationFailed("FXA returned active: False for token.") # FxA user is active, check for the associated Relay account if (raw_fxa_uid := fxa_resp_data.get("json", {}).get("sub")) is None: raise NotFound("FXA did not return an FXA UID.") fxa_uid = str(raw_fxa_uid) # cache valid access_token and fxa_resp_data until access_token expiration # TODO: revisit this since the token can expire before its time if isinstance(fxa_resp_data.get("json", {}).get("exp"), int): # Note: FXA iat and exp are timestamps in *milliseconds* fxa_token_exp_time = int(fxa_resp_data["json"]["exp"] / 1000) now_time = int(datetime.now(UTC).timestamp()) fxa_token_exp_cache_timeout = fxa_token_exp_time - now_time if fxa_token_exp_cache_timeout > cache_timeout: # cache until access_token expires (matched Relay user) # this handles cases where the token already expired cache_timeout = fxa_token_exp_cache_timeout cache.set(cache_key, fxa_resp_data, cache_timeout) return fxa_uid class FxaTokenAuthentication(BaseAuthentication): def authenticate_header(self, request): # Note: we need to implement this function to make DRF return a 401 status code # when we raise AuthenticationFailed, rather than a 403. See: # https://www.django-rest-framework.org/api-guide/authentication/#custom-authentication return "Bearer" def authenticate(self, request): authorization = get_authorization_header(request).decode() if not authorization or not authorization.startswith("Bearer "): # If the request has no Bearer token, return None to attempt the next # auth scheme in the REST_FRAMEWORK AUTHENTICATION_CLASSES list return None token = authorization.split(" ")[1] if token == "": raise ParseError("Missing FXA Token after 'Bearer'.") use_cache = True method = request.method if method in ["POST", "DELETE", "PUT"]: use_cache = False if method == "POST" and request.path == "/api/v1/relayaddresses/": use_cache = True fxa_uid = get_fxa_uid_from_oauth_token(token, use_cache) try: # MPP-3021: select_related user object to save DB query sa = SocialAccount.objects.filter( uid=fxa_uid, provider="fxa" ).select_related("user")[0] except IndexError: raise PermissionDenied( "Authenticated user does not have a Relay account." " Have they accepted the terms?" ) user = sa.user if not user.is_active: raise PermissionDenied( "Authenticated user does not have an active Relay account." " Have they been deactivated?" ) if user: return (user, token) else: raise NotFound()