def with_user()

in src/olympia/accounts/views.py [0:0]


def with_user(f):
    @functools.wraps(f)
    @use_primary_db
    def inner(self, request):
        # If we get an error, we want a new session without the 2FA enforcement
        # requirement active and with a fresh state for future authentication
        # attempts, so pop them.
        enforce_2fa_for_this_session = request.session.pop('enforce_2fa', False)
        fxa_state_session = request.session.pop('fxa_state', None)

        fxa_config = get_fxa_config(request)
        if request.method == 'GET':
            data = request.query_params
        else:
            data = request.data
        state_parts = data.get('state', '').split(':', 1)
        state = state_parts[0]
        next_path = parse_next_path(state_parts, request)

        if not fxa_state_session or fxa_state_session != state:
            log.info(
                'FxA Auth State mismatch: {state} Session: {fxa_state_session}'.format(
                    state=data.get('state'),
                    fxa_state_session=fxa_state_session,
                )
            )
            # Redirect to / as the state mismatch indicates the next path might
            # not be reliable.
            return safe_redirect(request, '/', ERROR_STATE_MISMATCH)
        elif data.get('error'):
            if enforce_2fa_for_this_session:
                # If we're trying to enforce 2FA, we should try again without
                # prompt=none (and a new state), maybe there is a mismatch
                # between what user we currently have logged in and what
                # Mozilla account the browser is logged into.
                return redirect_for_login_with_2fa_enforced(
                    request,
                    config=fxa_config,
                    next_path=next_path,
                )
            else:
                return safe_redirect(request, next_path, ERROR_FXA_ERROR)
        elif not data.get('code'):
            log.info('No code provided.')
            return safe_redirect(request, next_path, ERROR_NO_CODE)
        elif request.user.is_authenticated and not enforce_2fa_for_this_session:
            return safe_redirect(request, next_path, ERROR_AUTHENTICATED)
        try:
            if use_fake_fxa(fxa_config) and 'fake_fxa_email' in data:
                # Bypassing real authentication, we take the email provided
                # and generate a random fxa id.
                identity = {
                    'email': data['fake_fxa_email'],
                    'uid': 'fake_fxa_id-%s'
                    % force_str(binascii.b2a_hex(os.urandom(16))),
                    'twoFactorAuthentication': data.get(
                        'fake_two_factor_authentication'
                    ),
                }
                id_token, token_data = identity['email'], {}
            else:
                identity, token_data = verify.fxa_identify(
                    data['code'], config=fxa_config
                )
                token_data['config_name'] = get_fxa_config_name(request)
                id_token = token_data.get('id_token')
        except verify.IdentificationError:
            log.info('Profile not found. Code: {}'.format(data['code']))
            return safe_redirect(request, next_path, ERROR_NO_PROFILE)
        else:
            # The following log statement is used by foxsec-pipeline.
            log.info(
                'Logging in FxA user %s',
                identity['email'],
                extra={'sensitive': True},
            )
            user = find_user(identity)
            # We can't use waffle.flag_is_active() wrapper, because
            # request.user isn't populated at this point (and we don't want
            # it to be).
            enforce_2fa_flag = waffle.get_waffle_flag_model().get(
                '2fa-enforcement-for-developers-and-special-users'
            )
            enforce_2fa_flag_is_active = enforce_2fa_flag.is_active(request) or (
                enforce_2fa_flag.pk and enforce_2fa_flag.is_active_for_user(user)
            )
            if (
                user
                and not identity.get('twoFactorAuthentication')
                and enforce_2fa_flag_is_active
                and (
                    user.is_addon_developer
                    or user.groups_list
                    or enforce_2fa_for_this_session
                )
            ):
                # https://github.com/mozilla/addons/issues/732
                # https://github.com/mozilla/addons-server/issues/20943
                # The user is an add-on developer (with other types of add-ons
                # than just themes) or part of any group (so they are special
                # in some way, may be an admin or a reviewer), or trying to
                # access a page restricted to users with 2FA and they have
                # successfully logged in from FxA, but without a second factor.
                # Immediately redirect them to start the FxA flow again, this
                # time requesting 2FA to be present:
                # They should be automatically logged in FxA with the existing
                # id_token, and should be prompted to create the second factor
                # before coming back to AMO.
                log.info('Redirecting user %s to enforce 2FA', user)
                # There wasn't any error, we can keep the original state.
                request.session['fxa_state'] = fxa_state_session
                return redirect_for_login_with_2fa_enforced(
                    request,
                    next_path=next_path,
                    id_token_hint=id_token,
                )
            return f(
                self,
                request,
                user=user,
                identity=identity,
                next_path=next_path,
                token_data=token_data,
            )

    return inner