in fxa/core.py [0:0]
def login(self, email, password=None, stretchpwd=None, keys=False, unblock_code=None,
verification_method=None, reason="login"):
exactly_one_of(password, "password", stretchpwd, "stretchpwd")
if self.key_stretch_version == 2:
version, salt = self.get_key_stretch_version(email)
salt = salt if version == 2 else create_salt(2, hexlify(token_bytes(16)))
spwd = StretchedPassword(2, email, salt, password, stretchpwd)
try:
resp = self.start_password_change(email, spwd.v1)
key_fetch_token = resp["keyFetchToken"]
password_change_token = resp["passwordChangeToken"]
kb = self.fetch_keys(resp["keyFetchToken"], spwd.v1)[1]
resp = self.finish_password_change_v2(
password_change_token,
spwd,
kb
)
body = {
"email": email,
"authPW": spwd.get_auth_pw_v2(),
"reason": reason,
}
except Exception as inst:
# If something goes wrong fallback to v1 logins!
print("Warning! v2 key stretch auto upgrade failed! Falling back to v1 login. " +
f"Reason: {inst}")
body = {
"email": email,
"authPW": spwd.get_auth_pw_v1(),
"reason": reason,
}
else:
spwd = StretchedPassword(1, email, None, password, stretchpwd)
body = {
"email": email,
"authPW": spwd.get_auth_pw_v1(),
"reason": reason,
}
url = "/account/login"
if keys:
url += "?keys=true"
if unblock_code:
body["unblockCode"] = unblock_code
if verification_method:
body["verificationMethod"] = verification_method
resp = self.apiclient.post(url, body)
# Repackage stretchpwd based on version
if self.key_stretch_version == 2:
stretchpwd_final = spwd
key_fetch_token = resp.get("keyFetchTokenVersion2")
else:
stretchpwd_final = spwd.v1
key_fetch_token = resp.get("keyFetchToken")
# XXX TODO: somehow sanity-check the schema on this endpoint
return Session(
client=self,
email=email,
stretchpwd=stretchpwd_final,
uid=resp["uid"],
token=resp["sessionToken"],
key_fetch_token=key_fetch_token,
verified=resp["verified"],
verificationMethod=resp.get("verificationMethod"),
auth_timestamp=resp["authAt"],
)