fxa/core.py (500 lines of code) (raw):
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from binascii import unhexlify, hexlify
from secrets import token_bytes
from urllib.parse import quote as urlquote
from fxa.errors import ClientError
from fxa._utils import (
APIClient,
HawkTokenAuth,
exactly_one_of,
hexstr
)
from fxa.constants import PRODUCTION_URLS
from fxa.crypto import (
create_salt,
quick_stretch_password,
stretch_password,
unwrap_keys,
derive_auth_pw,
derive_wrap_kb,
)
DEFAULT_SERVER_URL = PRODUCTION_URLS['authentication']
VERSION_SUFFIXES = ("/v1",)
class Client:
"""Client for talking to the Firefox Accounts auth server."""
def __init__(self, server_url=None, key_stretch_version=1):
if server_url is None:
server_url = DEFAULT_SERVER_URL
if not isinstance(server_url, str):
self.apiclient = server_url
self.server_url = self.apiclient.server_url
else:
server_url = server_url.rstrip("/")
if not server_url.endswith(VERSION_SUFFIXES):
server_url += VERSION_SUFFIXES[0]
self.server_url = server_url
self.apiclient = APIClient(server_url)
if key_stretch_version not in [1, 2]:
raise ValueError("Invalid key_stretch_version! Options are: 1,2")
else:
self.key_stretch_version = key_stretch_version
def create_account(self, email, password=None, stretchpwd=None, **kwds):
"""creates an account with email and password.
Note, the stretched password can also be provided. When doing this, and
using key_stretch_version=2, the format changes from a string to StrechedPassword
object
"""
keys = kwds.pop("keys", False)
if self.key_stretch_version == 2:
spwd = StretchedPassword(2, email, create_salt(2, hexlify(token_bytes(16))),
password, stretchpwd)
kb = token_bytes(32)
body = {
"email": email,
"authPW": spwd.get_auth_pw_v1(),
"wrapKb": spwd.get_wrapkb_v1(kb),
"authPWVersion2": spwd.get_auth_pw_v2(),
"wrapKbVersion2": spwd.get_wrapkb_v2(kb),
"clientSalt": spwd.v2_salt,
}
else:
spwd = StretchedPassword(1, email, None, password, stretchpwd)
body = {
"email": email,
"authPW": spwd.get_auth_pw_v1(),
}
EXTRA_KEYS = ("service", "redirectTo", "resume", "preVerifyToken",
"preVerified")
for extra in kwds:
if extra in EXTRA_KEYS:
body[extra] = kwds[extra]
else:
msg = f"Unexpected keyword argument: {extra}"
raise TypeError(msg)
url = "/account/create"
if keys:
url += "?keys=true"
resp = self.apiclient.post(url, body)
if self.key_stretch_version == 2:
stretchpwd_final = spwd
key_fetch_token = resp.get('keyFetchTokenVersion2')
else:
stretchpwd_final = spwd.v1
key_fetch_token = resp.get('keyFetchToken')
# XXX TODO: somehow sanity-check the schema on this endpoint
return Session(
client=self,
email=email,
stretchpwd=stretchpwd_final,
uid=resp["uid"],
token=resp["sessionToken"],
key_fetch_token=key_fetch_token,
verified=False,
auth_timestamp=resp["authAt"],
)
def login(self, email, password=None, stretchpwd=None, keys=False, unblock_code=None,
verification_method=None, reason="login"):
exactly_one_of(password, "password", stretchpwd, "stretchpwd")
if self.key_stretch_version == 2:
version, salt = self.get_key_stretch_version(email)
salt = salt if version == 2 else create_salt(2, hexlify(token_bytes(16)))
spwd = StretchedPassword(2, email, salt, password, stretchpwd)
try:
resp = self.start_password_change(email, spwd.v1)
key_fetch_token = resp["keyFetchToken"]
password_change_token = resp["passwordChangeToken"]
kb = self.fetch_keys(resp["keyFetchToken"], spwd.v1)[1]
resp = self.finish_password_change_v2(
password_change_token,
spwd,
kb
)
body = {
"email": email,
"authPW": spwd.get_auth_pw_v2(),
"reason": reason,
}
except Exception as inst:
# If something goes wrong fallback to v1 logins!
print("Warning! v2 key stretch auto upgrade failed! Falling back to v1 login. " +
f"Reason: {inst}")
body = {
"email": email,
"authPW": spwd.get_auth_pw_v1(),
"reason": reason,
}
else:
spwd = StretchedPassword(1, email, None, password, stretchpwd)
body = {
"email": email,
"authPW": spwd.get_auth_pw_v1(),
"reason": reason,
}
url = "/account/login"
if keys:
url += "?keys=true"
if unblock_code:
body["unblockCode"] = unblock_code
if verification_method:
body["verificationMethod"] = verification_method
resp = self.apiclient.post(url, body)
# Repackage stretchpwd based on version
if self.key_stretch_version == 2:
stretchpwd_final = spwd
key_fetch_token = resp.get("keyFetchTokenVersion2")
else:
stretchpwd_final = spwd.v1
key_fetch_token = resp.get("keyFetchToken")
# XXX TODO: somehow sanity-check the schema on this endpoint
return Session(
client=self,
email=email,
stretchpwd=stretchpwd_final,
uid=resp["uid"],
token=resp["sessionToken"],
key_fetch_token=key_fetch_token,
verified=resp["verified"],
verificationMethod=resp.get("verificationMethod"),
auth_timestamp=resp["authAt"],
)
def _get_stretched_password(self, email, password=None, stretchpwd=None):
if password is not None:
if stretchpwd is not None:
raise ValueError("must specify exactly one of 'password' or 'stretchpwd'")
stretchpwd = quick_stretch_password(email, password)
elif stretchpwd is None:
raise ValueError("must specify one of 'password' or 'stretchpwd'")
return stretchpwd
def get_account_status(self, uid):
return self.apiclient.get("/account/status?uid=" + uid)
def destroy_account(self, email, password=None, stretchpwd=None):
exactly_one_of(password, "password", stretchpwd, "stretchpwd")
# create a session and get pack teh stretched password
session = self.login(email, password, stretchpwd, keys=True)
# grab the stretched pwd
if isinstance(session.stretchpwd, bytes):
stretchpwd = session.stretchpwd
elif isinstance(session.stretchpwd, StretchedPassword) and session.stretchpwd.v2:
stretchpwd = session.stretchpwd.v2
elif isinstance(session.stretchpwd, StretchedPassword) and session.stretchpwd.v1:
stretchpwd = session.stretchpwd.v1
else:
raise ValueError("Unknown session.stretchpwd state!")
# destroy account
url = "/account/destroy"
body = {
"email": email,
"authPW": hexstr(derive_auth_pw(stretchpwd))
}
self.apiclient.post(url, body, auth=session._auth)
def get_random_bytes(self):
# XXX TODO: sanity-check the schema of the returned response
return unhexlify(self.apiclient.post("/get_random_bytes")["data"])
def fetch_keys(self, key_fetch_token, stretchpwd):
url = "/account/keys"
auth = HawkTokenAuth(key_fetch_token, "keyFetchToken", self.apiclient)
resp = self.apiclient.get(url, auth=auth)
bundle = unhexlify(resp["bundle"])
keys = auth.unbundle("account/keys", bundle)
return unwrap_keys(keys, stretchpwd)
def change_password(self, email, oldpwd=None, newpwd=None,
oldstretchpwd=None, newstretchpwd=None):
exactly_one_of(oldpwd, "oldpwd", oldstretchpwd, "oldstretchpwd")
exactly_one_of(newpwd, "newpwd", newstretchpwd, "newstretchpwd")
if self.key_stretch_version == 2:
version, salt = self.get_key_stretch_version(email)
old_spwd = StretchedPassword(version, email, salt, oldpwd, oldstretchpwd)
new_spwd = StretchedPassword(2, email, salt, newpwd, newstretchpwd)
if version == 2:
resp = self.start_password_change(email, old_spwd.v2)
kb = self.fetch_keys(resp["keyFetchToken2"], old_spwd.v2)[1]
else:
resp = self.start_password_change(email, old_spwd.v1)["passwordChangeToken"]
kb = self.fetch_keys(resp["keyFetchToken"], old_spwd.v1)[1]
self.finish_password_change_v2(
resp["passwordChangeToken"],
new_spwd,
kb)
else:
if oldpwd:
oldstretchpwd = quick_stretch_password(email, oldpwd)
if newpwd:
newstretchpwd = quick_stretch_password(email, newpwd)
resp = self.start_password_change(email, oldstretchpwd)
kb = self.fetch_keys(resp["keyFetchToken"], oldstretchpwd)[1]
new_wrapkb = derive_wrap_kb(kb, newstretchpwd)
self.finish_password_change(resp["passwordChangeToken"], newstretchpwd, new_wrapkb)
def start_password_change(self, email, stretchpwd):
body = {
"email": email,
"oldAuthPW": hexstr(derive_auth_pw(stretchpwd)),
}
return self.apiclient.post("/password/change/start", body)
def finish_password_change(self, token, stretchpwd, wrapkb):
body = {
"authPW": hexstr(derive_auth_pw(stretchpwd)),
"wrapKb": hexstr(wrapkb),
}
auth = HawkTokenAuth(token, "passwordChangeToken", self.apiclient)
self.apiclient.post("/password/change/finish", body, auth=auth)
def finish_password_change_v2(self, token, spwd, kb):
body = {
"authPW": spwd.get_auth_pw_v1(),
"wrapKb": spwd.get_wrapkb_v1(kb),
"authPWVersion2": spwd.get_auth_pw_v2(),
"wrapKbVersion2": spwd.get_wrapkb_v2(kb),
"clientSalt": spwd.v2_salt,
}
auth = HawkTokenAuth(token, "passwordChangeToken", self.apiclient)
return self.apiclient.post("/password/change/finish", body, auth=auth)
def reset_account(self, email, token, password=None, stretchpwd=None):
# TODO: Add support for recovery key!
exactly_one_of(password, "password", stretchpwd, "stretchpwd")
body = None
if self.key_stretch_version == 2:
version, salt = self.get_key_stretch_version(email)
if version == 2:
spwd = StretchedPassword(2, email, salt, password, stretchpwd)
# Note, without recovery key, we must generate new kb
kb = token_bytes(32)
body = {
"email": email,
"authPW": spwd.get_auth_pw_v1(),
"wrapKb": spwd.get_wrapkb_v1(kb),
"authPWVersion2": spwd.get_auth_pw_v2(),
"wrapKbVersion2": spwd.get_wrapkb_v2(kb),
"clientSalt": salt,
}
if body is None:
spwd = StretchedPassword(1, email, None, password, stretchpwd)
body = {
"authPW": spwd.get_auth_pw_v1(),
}
url = "/account/reset"
auth = HawkTokenAuth(token, "accountResetToken", self.apiclient)
self.apiclient.post(url, body, auth=auth)
def send_reset_code(self, email, **kwds):
body = {
"email": email,
}
for extra in kwds:
if extra in ("service", "redirectTo", "resume"):
body[extra] = kwds[extra]
else:
msg = f"Unexpected keyword argument: {extra}"
raise TypeError(msg)
url = "/password/forgot/send_code"
resp = self.apiclient.post(url, body)
return PasswordForgotToken(
self, email,
resp["passwordForgotToken"],
resp["ttl"],
resp["codeLength"],
resp["tries"],
)
def resend_reset_code(self, email, token, **kwds):
body = {
"email": email,
}
for extra in kwds:
if extra in ("service", "redirectTo", "resume"):
body[extra] = kwds[extra]
else:
msg = f"Unexpected keyword argument: {extra}"
raise TypeError(msg)
url = "/password/forgot/resend_code"
auth = HawkTokenAuth(token, "passwordForgotToken", self.apiclient)
return self.apiclient.post(url, body, auth=auth)
def verify_reset_code(self, token, code):
body = {
"code": code,
}
url = "/password/forgot/verify_code"
auth = HawkTokenAuth(token, "passwordForgotToken", self.apiclient)
return self.apiclient.post(url, body, auth=auth)
def get_reset_code_status(self, token):
url = "/password/forgot/status"
auth = HawkTokenAuth(token, "passwordForgotToken", self.apiclient)
return self.apiclient.get(url, auth=auth)
def verify_email_code(self, uid, code):
body = {
"uid": uid,
"code": code,
}
url = "/recovery_email/verify_code"
return self.apiclient.post(url, body)
def send_unblock_code(self, email, **kwds):
body = {
"email": email
}
url = "/account/login/send_unblock_code"
return self.apiclient.post(url, body)
def reject_unblock_code(self, uid, unblockCode):
body = {
"uid": uid,
"unblockCode": unblockCode
}
url = "/account/login/reject_unblock_code"
return self.apiclient.post(url, body)
def get_key_stretch_version(self, email):
# Fall back to v1 stretching if an error occurs here, which happens when
# the account does not exist at all.
try:
body = {
"email": email
}
resp = self.apiclient.post("/account/credentials/status", body)
except ClientError:
return 1, email
version = resp["currentVersion"]
if version == "v1":
return 1, create_salt(1, email)
if version == "v2":
return 2, resp["clientSalt"]
raise ValueError("Unknown version provided by api! Aborting...")
class Session:
def __init__(self, client, email, stretchpwd, uid, token,
key_fetch_token=None, verified=False, verificationMethod=None,
auth_timestamp=0):
self.client = client
self.email = email
self.uid = uid
self.token = token
self.verified = verified
self.verificationMethod = verificationMethod
self.auth_timestamp = auth_timestamp
self.keys = None
self._auth = HawkTokenAuth(token, "sessionToken", self.apiclient)
self._key_fetch_token = key_fetch_token
# Quick validation on stretchpwd
if not isinstance(stretchpwd, StretchedPassword) and not isinstance(stretchpwd, bytes):
raise ValueError("stretchpwd must be a bytes or a StretchedPassword instance, " +
f"but was {stretchpwd}")
self.stretchpwd = stretchpwd
@property
def apiclient(self):
return self.client.apiclient
@property
def server_url(self):
return self.client.server_url
def fetch_keys(self, key_fetch_token=None, stretchpwd=None):
# Use values from session construction, if not overridden.
if key_fetch_token is None:
key_fetch_token = self._key_fetch_token
if key_fetch_token is None:
# XXX TODO: what error?
raise RuntimeError("missing key_fetch_token")
if stretchpwd is None:
if isinstance(self.stretchpwd, StretchedPassword):
stretchpwd = self.stretchpwd.v2
else:
stretchpwd = self.stretchpwd
elif isinstance(stretchpwd, StretchedPassword):
stretchpwd = stretchpwd.v2
if stretchpwd is None:
# XXX TODO: what error?
raise RuntimeError("missing stretchpwd")
self.keys = self.client.fetch_keys(key_fetch_token, stretchpwd)
self._key_fetch_token = None
self.stretchpwd = None
return self.keys
def check_session_status(self):
url = "/session/status"
# Raises an error if the session has expired etc.
try:
uid = self.apiclient.get(url, auth=self._auth)["uid"]
except KeyError:
pass
else:
# XXX TODO: what error?
assert uid == self.uid
def destroy_session(self):
url = "/session/destroy"
self.apiclient.post(url, {}, auth=self._auth)
def get_email_status(self):
url = "/recovery_email/status"
resp = self.apiclient.get(url, auth=self._auth)
self.verified = resp["verified"]
return resp
def verify_email_code(self, code):
return self.client.verify_email_code(self.uid, code) # note: not authenticated
def resend_email_code(self, **kwds):
body = {}
for extra in kwds:
if extra in ("service", "redirectTo", "resume"):
body[extra] = kwds[extra]
else:
msg = f"Unexpected keyword argument: {extra}"
raise TypeError(msg)
url = "/recovery_email/resend_code"
self.apiclient.post(url, body, auth=self._auth)
def totp_create(self):
url = "/totp/create"
return self.apiclient.post(url, {}, auth=self._auth)
def totp_exists(self):
url = "/totp/exists"
resp = self.apiclient.get(url, auth=self._auth)
return resp["exists"]
def totp_delete(self):
url = "/totp/destroy"
return self.apiclient.post(url, {}, auth=self._auth)
def totp_verify(self, code):
url = "/session/verify/totp"
body = {
"code": code,
}
resp = self.apiclient.post(url, body, auth=self._auth)
if resp["success"]:
self.verified = True
return resp["success"]
def change_password(self, oldpwd, newpwd,
oldstretchpwd=None, newstretchpwd=None):
return self.client.change_password(self.email, oldpwd, newpwd,
oldstretchpwd, newstretchpwd)
def start_password_change(self, stretchpwd):
return self.client.start_password_change(self.email, stretchpwd)
def finish_password_change(self, token, stretchpwd, wrapkb):
return self.client.finish_password_change(token, stretchpwd, wrapkb)
def get_random_bytes(self):
# XXX TODO: sanity-check the schema of the returned response
return self.client.get_random_bytes()
class PasswordForgotToken:
def __init__(self, client, email, token, ttl=0, code_length=16,
tries_remaining=1):
self.client = client
self.email = email
self.token = token
self.ttl = ttl
self.code_length = code_length
self.tries_remaining = tries_remaining
def verify_code(self, code):
resp = self.client.verify_reset_code(self.token, code)
return resp["accountResetToken"]
def resend_code(self, **kwds):
resp = self.client.resend_reset_code(self.email, self.token, **kwds)
self.ttl = resp["ttl"]
self.code_length = resp["codeLength"]
self.tries_remaining = resp["tries"]
def get_status(self):
resp = self.client.get_reset_code_status(self.token)
self.ttl = resp["ttl"]
self.tries_remaining = resp["tries"]
return resp
class StretchedPassword:
def __init__(self, version, email, salt=None, password=None, stretchpwd=None):
self.version = version
if version == 2:
if not salt:
salt = create_salt(2, hexlify(token_bytes(16)))
if stretchpwd and not isinstance(stretchpwd, StretchedPassword):
raise ValueError("invalid stretchpwd type")
if stretchpwd:
if not isinstance(stretchpwd, StretchedPassword):
raise ValueError(f"invalid stretchpwd type: {type(stretchpwd)}")
self.v1 = stretchpwd.v1
self.v2_salt = stretchpwd.v2_salt
self.v2 = stretchpwd.v2
else:
if not isinstance(password, str):
raise ValueError(f"invalid password type: {type(stretchpwd)}")
self.v1 = quick_stretch_password(email, password)
self.v2_salt = salt
self.v2 = stretch_password(self.v2_salt, password)
else:
if stretchpwd:
if not isinstance(stretchpwd, bytes):
raise ValueError(f"invalid stretchpwd type: {type(stretchpwd)}")
self.v1 = stretchpwd
else:
if not isinstance(password, str):
raise ValueError(f"invalid password type: {type(password)}")
self.v1 = quick_stretch_password(email, password)
def get_auth_pw(self):
if self.v2:
return self.get_auth_pw_v2()
elif self.v1:
return self.get_auth_pw_v1()
else:
return None
def get_auth_pw_v1(self):
return hexstr(derive_auth_pw(self.v1))
def get_auth_pw_v2(self):
return hexstr(derive_auth_pw(self.v2))
def get_wrapkb_v1(self, kb):
return hexstr(derive_wrap_kb(kb, self.v1))
def get_wrapkb_v2(self, kb):
return hexstr(derive_wrap_kb(kb, self.v2))