in src/olympia/accounts/views.py [0:0]
def with_user(f):
@functools.wraps(f)
@use_primary_db
def inner(self, request):
# If we get an error, we want a new session without the 2FA enforcement
# requirement active and with a fresh state for future authentication
# attempts, so pop them.
enforce_2fa_for_this_session = request.session.pop('enforce_2fa', False)
fxa_state_session = request.session.pop('fxa_state', None)
fxa_config = get_fxa_config(request)
if request.method == 'GET':
data = request.query_params
else:
data = request.data
state_parts = data.get('state', '').split(':', 1)
state = state_parts[0]
next_path = parse_next_path(state_parts, request)
if not fxa_state_session or fxa_state_session != state:
log.info(
'FxA Auth State mismatch: {state} Session: {fxa_state_session}'.format(
state=data.get('state'),
fxa_state_session=fxa_state_session,
)
)
# Redirect to / as the state mismatch indicates the next path might
# not be reliable.
return safe_redirect(request, '/', ERROR_STATE_MISMATCH)
elif data.get('error'):
if enforce_2fa_for_this_session:
# If we're trying to enforce 2FA, we should try again without
# prompt=none (and a new state), maybe there is a mismatch
# between what user we currently have logged in and what
# Mozilla account the browser is logged into.
return redirect_for_login_with_2fa_enforced(
request,
config=fxa_config,
next_path=next_path,
)
else:
return safe_redirect(request, next_path, ERROR_FXA_ERROR)
elif not data.get('code'):
log.info('No code provided.')
return safe_redirect(request, next_path, ERROR_NO_CODE)
elif request.user.is_authenticated and not enforce_2fa_for_this_session:
return safe_redirect(request, next_path, ERROR_AUTHENTICATED)
try:
if use_fake_fxa(fxa_config) and 'fake_fxa_email' in data:
# Bypassing real authentication, we take the email provided
# and generate a random fxa id.
identity = {
'email': data['fake_fxa_email'],
'uid': 'fake_fxa_id-%s'
% force_str(binascii.b2a_hex(os.urandom(16))),
'twoFactorAuthentication': data.get(
'fake_two_factor_authentication'
),
}
id_token, token_data = identity['email'], {}
else:
identity, token_data = verify.fxa_identify(
data['code'], config=fxa_config
)
token_data['config_name'] = get_fxa_config_name(request)
id_token = token_data.get('id_token')
except verify.IdentificationError:
log.info('Profile not found. Code: {}'.format(data['code']))
return safe_redirect(request, next_path, ERROR_NO_PROFILE)
else:
# The following log statement is used by foxsec-pipeline.
log.info(
'Logging in FxA user %s',
identity['email'],
extra={'sensitive': True},
)
user = find_user(identity)
# We can't use waffle.flag_is_active() wrapper, because
# request.user isn't populated at this point (and we don't want
# it to be).
enforce_2fa_flag = waffle.get_waffle_flag_model().get(
'2fa-enforcement-for-developers-and-special-users'
)
enforce_2fa_flag_is_active = enforce_2fa_flag.is_active(request) or (
enforce_2fa_flag.pk and enforce_2fa_flag.is_active_for_user(user)
)
if (
user
and not identity.get('twoFactorAuthentication')
and enforce_2fa_flag_is_active
and (
user.is_addon_developer
or user.groups_list
or enforce_2fa_for_this_session
)
):
# https://github.com/mozilla/addons/issues/732
# https://github.com/mozilla/addons-server/issues/20943
# The user is an add-on developer (with other types of add-ons
# than just themes) or part of any group (so they are special
# in some way, may be an admin or a reviewer), or trying to
# access a page restricted to users with 2FA and they have
# successfully logged in from FxA, but without a second factor.
# Immediately redirect them to start the FxA flow again, this
# time requesting 2FA to be present:
# They should be automatically logged in FxA with the existing
# id_token, and should be prompted to create the second factor
# before coming back to AMO.
log.info('Redirecting user %s to enforce 2FA', user)
# There wasn't any error, we can keep the original state.
request.session['fxa_state'] = fxa_state_session
return redirect_for_login_with_2fa_enforced(
request,
next_path=next_path,
id_token_hint=id_token,
)
return f(
self,
request,
user=user,
identity=identity,
next_path=next_path,
token_data=token_data,
)
return inner