in fxa/oauth.py [0:0]
def verify_token(self, token, scope=None):
"""Verify an OAuth token, and retrieve user id and scopes.
:param token: the string to verify.
:param scope: optional scope expected to be provided for this token.
:returns: a dict with user id and authorized scopes for this token.
:raises fxa.errors.ClientError: if the provided token is invalid.
:raises fxa.errors.TrustError: if the token scopes do not match.
"""
key = 'fxa.oauth.verify_token:%s:%s' % (
get_hmac(token, TOKEN_HMAC_SECRET), scope)
if self.cache is not None:
resp = self.cache.get(key)
else:
resp = None
if resp is None:
# We want to fetch
# https://oauth.accounts.firefox.com/.well-known/openid-configuration
# and then get the jwks_uri key to get the /jwks url, but we'll
# just hardcodes it like this for now; our /jwks url will never
# change.
# https://github.com/mozilla/PyFxA/issues/81 is an issue about
# getting the jwks url out of the openid-configuration.
keys = []
if self.jwks is not None:
keys.extend(self.jwks)
else:
keys.extend(self.apiclient.get('/jwks').get('keys', []))
resp = None
try:
for k in keys:
try:
resp = self._verify_jwt_token(json.dumps(k), token)
break
except jwt.exceptions.InvalidSignatureError:
# It's only worth trying other keys in the event of
# `InvalidSignature`; if it was invalid for other reasons
# (e.g. it's expired) then using a different key won't
# help.
continue
else:
# It's a well-formed JWT, but not signed by any of the advertized keys.
# We can immediately surface this as an error.
if len(keys) > 0:
raise TrustError({"error": "invalid signature"})
except (jwt.exceptions.DecodeError, jwt.exceptions.InvalidKeyError):
# It wasn't a JWT at all, or it was signed using a key type we
# don't support. Fall back to asking the FxA server to verify.
pass
except jwt.exceptions.PyJWTError as e:
# Any other JWT-related failure (e.g. expired token) can
# immediately surface as a trust error.
raise TrustError({"error": str(e)})
if resp is None:
resp = self.apiclient.post('/verify', {'token': token})
missing_attrs = ", ".join([
k for k in ('user', 'scope', 'client_id')
if resp.get(k) is None
])
if missing_attrs:
error_msg = f'{missing_attrs} missing in OAuth response'
raise OutOfProtocolError(error_msg)
if scope is not None:
authorized_scope = resp['scope']
if not scope_matches(authorized_scope, scope):
raise ScopeMismatchError(authorized_scope, scope)
if self.cache is not None:
self.cache.set(key, json.dumps(resp))
else:
resp = json.loads(resp)
return resp