fxa/oauth.py (211 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this file, # You can obtain one at http://mozilla.org/MPL/2.0/. import json import os import base64 import hashlib from urllib.parse import urlparse, urlunparse, urlencode, parse_qs import jwt from fxa.cache import MemoryCache, DEFAULT_CACHE_EXPIRY from fxa.constants import PRODUCTION_URLS from fxa.errors import OutOfProtocolError, ScopeMismatchError, TrustError from fxa._utils import APIClient, scope_matches, get_hmac, HawkTokenAuth DEFAULT_SERVER_URL = PRODUCTION_URLS['oauth'] VERSION_SUFFIXES = ("/v1",) TOKEN_HMAC_SECRET = 'PyFxA Token Cache Hmac Secret' class Client: """Client for talking to the Firefox Accounts OAuth server""" def __init__(self, client_id=None, client_secret=None, server_url=None, cache=True, ttl=DEFAULT_CACHE_EXPIRY, jwks=None): self.client_id = client_id self.client_secret = client_secret if server_url is None: server_url = DEFAULT_SERVER_URL server_url = server_url.rstrip('/') if not server_url.endswith(VERSION_SUFFIXES): server_url += VERSION_SUFFIXES[0] if isinstance(server_url, str): self.apiclient = APIClient(server_url) else: self.apiclient = server_url self.cache = cache if self.cache is True: self.cache = MemoryCache(ttl) if jwks is not None: # Fail early if bad JWKs were provided. for key in jwks: jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key)) self.jwks = jwks @property def server_url(self): return self.apiclient.server_url def get_client_metadata(self, client_id=None): """Get the OAuth client metadata for a given client_id.""" if client_id is None: client_id = self.client_id return self.apiclient.get(f"/client/{client_id}") def get_redirect_url(self, state="", redirect_uri=None, scope=None, action=None, email=None, client_id=None, code_challenge=None, code_challenge_method=None, access_type=None, keys_jwk=None): """Get the URL to redirect to to initiate the oauth flow.""" if client_id is None: client_id = self.client_id params = { "client_id": client_id, "state": state, } if redirect_uri is not None: params["redirect_uri"] = redirect_uri if scope is not None: params["scope"] = scope if action is not None: params["action"] = action if email is not None: params["email"] = email if code_challenge is not None: params["code_challenge"] = code_challenge if code_challenge_method is not None: params["code_challenge_method"] = code_challenge_method if keys_jwk is not None: params["keys_jwk"] = keys_jwk if access_type is not None: params["access_type"] = access_type query_str = urlencode(params) authorization_url = urlparse(self.server_url + "/authorization") return urlunparse(authorization_url._replace(query=query_str)) def trade_code(self, code, client_id=None, client_secret=None, code_verifier=None, ttl=None): """Trade the authentication code for a longer lived token. :param code: the authentication code from the oauth redirect dance. :param client_id: the string generated during FxA client registration. :param client_secret: the related secret string. :param code_verifier: optional PKCE code verifier. :param ttl: optional ttl in seconds, the access token is valid for. :returns: a dict with user id and authorized scopes for this token. """ if client_id is None: client_id = self.client_id if client_secret is None: client_secret = self.client_secret url = '/token' body = { 'code': code, 'client_id': client_id, } if client_secret is not None: body["client_secret"] = client_secret if code_verifier is not None: body["code_verifier"] = code_verifier if ttl is not None: body["ttl"] = ttl resp = self.apiclient.post(url, body) if 'access_token' not in resp: error_msg = 'access_token missing in OAuth response' raise OutOfProtocolError(error_msg) return resp def authorize_code(self, session, scope=None, client_id=None, code_challenge=None, code_challenge_method=None): """Trade a session for an oauth authorization code. This method takes a session for a user and uses it to generate an oauth authentication code. This code can in turn be traded for a full-blown oauth token. :param session: an auth session to use. :param scope: optional scope to be provided by the token. :param client_id: the string generated during FxA client registration. :param code_challenge: optional PKCE code challenge. :param code_challenge_method: optional PKCE code challenge method. """ auth = HawkTokenAuth(session.token, "sessionToken", self.apiclient) if client_id is None: client_id = self.client_id url = "/oauth/authorization" # Although not relevant in this scenario from a security perspective, # we generate a random 'state' and check the returned redirect URL # for completeness. state = base64.urlsafe_b64encode(os.urandom(24)).decode('utf-8') body = { "client_id": client_id, "state": state } if scope is not None: body["scope"] = scope if code_challenge is not None: body["code_challenge"] = code_challenge body["code_challenge_method"] = code_challenge_method or "S256" resp = self.apiclient.post(url, body, auth=auth) if "redirect" not in resp: error_msg = "redirect missing in OAuth response" raise OutOfProtocolError(error_msg) # This flow is designed for web-based redirects. # In order to get the code we must parse it from the redirect url. query_params = parse_qs(urlparse(resp["redirect"]).query) # Check that the 'state' parameter is present and the same we provided if "state" not in query_params: error_msg = "state missing in OAuth response" raise OutOfProtocolError(error_msg) if state != query_params["state"][0]: error_msg = f"state mismatch in OAuth response (wanted: '{state}', got: '{query_params['state'][0]}')" raise OutOfProtocolError(error_msg) try: return query_params["code"][0] except (KeyError, IndexError, ValueError): error_msg = "code missing in OAuth redirect url" raise OutOfProtocolError(error_msg) def authorize_token(self, session, scope=None, client_id=None): """Trade session token for an oauth token. :param session: an auth session. :param scope: optional scope to be provided by the token. :param client_id: the string generated during FxA client registration. """ (challenge, verifier) = self.generate_pkce_challenge() code = self.authorize_code(session, scope, client_id, **challenge) resp = self.trade_code(code, **verifier) if 'access_token' not in resp: error_msg = 'access_token missing in OAuth response' raise OutOfProtocolError(error_msg) return resp['access_token'] def _verify_jwt_token(self, key, token): pubkey = jwt.algorithms.RSAAlgorithm.from_jwk(key) # The FxA OAuth ecosystem currently doesn't make good use of aud, and # instead relies on scope for restricting which services can accept # which tokens. So there's no value in checking it here, and in fact if # we check it here, it fails because the right audience isn't being # requested. decoded = jwt.decode( token, pubkey, algorithms=['RS256'], options={'verify_aud': False} ) # Ref https://tools.ietf.org/html/rfc7515#section-4.1.9 the `typ` header # is lowercase and has an implicit default `application/` prefix. typ = jwt.get_unverified_header(token).get('typ', '') if '/' not in typ: typ = 'application/' + typ if typ.lower() != 'application/at+jwt': raise TrustError return { 'user': decoded.get('sub'), 'client_id': decoded.get('client_id'), 'scope': decoded.get('scope', '').split(), 'generation': decoded.get('fxa-generation'), 'profile_changed_at': decoded.get('fxa-profileChangedAt') } def verify_token(self, token, scope=None): """Verify an OAuth token, and retrieve user id and scopes. :param token: the string to verify. :param scope: optional scope expected to be provided for this token. :returns: a dict with user id and authorized scopes for this token. :raises fxa.errors.ClientError: if the provided token is invalid. :raises fxa.errors.TrustError: if the token scopes do not match. """ key = 'fxa.oauth.verify_token:%s:%s' % ( get_hmac(token, TOKEN_HMAC_SECRET), scope) if self.cache is not None: resp = self.cache.get(key) else: resp = None if resp is None: # We want to fetch # https://oauth.accounts.firefox.com/.well-known/openid-configuration # and then get the jwks_uri key to get the /jwks url, but we'll # just hardcodes it like this for now; our /jwks url will never # change. # https://github.com/mozilla/PyFxA/issues/81 is an issue about # getting the jwks url out of the openid-configuration. keys = [] if self.jwks is not None: keys.extend(self.jwks) else: keys.extend(self.apiclient.get('/jwks').get('keys', [])) resp = None try: for k in keys: try: resp = self._verify_jwt_token(json.dumps(k), token) break except jwt.exceptions.InvalidSignatureError: # It's only worth trying other keys in the event of # `InvalidSignature`; if it was invalid for other reasons # (e.g. it's expired) then using a different key won't # help. continue else: # It's a well-formed JWT, but not signed by any of the advertized keys. # We can immediately surface this as an error. if len(keys) > 0: raise TrustError({"error": "invalid signature"}) except (jwt.exceptions.DecodeError, jwt.exceptions.InvalidKeyError): # It wasn't a JWT at all, or it was signed using a key type we # don't support. Fall back to asking the FxA server to verify. pass except jwt.exceptions.PyJWTError as e: # Any other JWT-related failure (e.g. expired token) can # immediately surface as a trust error. raise TrustError({"error": str(e)}) if resp is None: resp = self.apiclient.post('/verify', {'token': token}) missing_attrs = ", ".join([ k for k in ('user', 'scope', 'client_id') if resp.get(k) is None ]) if missing_attrs: error_msg = f'{missing_attrs} missing in OAuth response' raise OutOfProtocolError(error_msg) if scope is not None: authorized_scope = resp['scope'] if not scope_matches(authorized_scope, scope): raise ScopeMismatchError(authorized_scope, scope) if self.cache is not None: self.cache.set(key, json.dumps(resp)) else: resp = json.loads(resp) return resp def destroy_token(self, token): """Destroy an OAuth token :param token: the token to destroy. :raises fxa.errors.ClientError: if the provided token is invalid. """ url = '/destroy' body = { 'token': token } self.apiclient.post(url, body) def generate_pkce_challenge(self): """Ramdomly generate parameters for a PKCE challenge. This method returns a two-tuple (challenge, response) where the first item contains request parameters for a PKCE challenge, and the second item contains the corresponding parameters for a verification. """ code_verifier = base64.urlsafe_b64encode(os.urandom(32)).decode('utf-8').rstrip("=") raw_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() code_challenge = base64.urlsafe_b64encode(raw_challenge).decode('utf-8').rstrip("=") return ({ "code_challenge": code_challenge, "code_challenge_method": "S256", }, { "code_verifier": code_verifier, })