django_airavata/apps/auth/backends.py (279 lines of code) (raw):

"""Django Airavata Auth Backends: KeycloakBackend.""" import logging import os import time import requests from django.conf import settings from django.contrib.auth.models import User from django.views.decorators.debug import sensitive_variables from oauthlib.oauth2 import InvalidGrantError, LegacyApplicationClient from requests_oauthlib import OAuth2Session from django_airavata.apps.auth.utils import get_authz_token from . import models, utils logger = logging.getLogger(__name__) class KeycloakBackend(object): """Django authentication backend for Keycloak.""" # mask all local variables from error emails since they contain the user's # password and/or client_secret. Note, we could selectively just hide # variables that are sensitive, but this decorator doesn't apply explicitly # listed variable masking to library function calls @sensitive_variables() def authenticate(self, request=None, username=None, password=None, refresh_token=None, idp_alias=None): try: user = None access_token = None if username and password: token, userinfo = self._get_token_and_userinfo_password_flow( username, password) if token is None: # login failed return None self._process_token(request, token) user = self._process_userinfo(request, userinfo) access_token = token['access_token'] elif 'HTTP_AUTHORIZATION' in request.META: bearer, token = request.META.get('HTTP_AUTHORIZATION').split() if bearer != "Bearer": raise Exception("Unexpected Authorization header") # implicitly validate token by using it to get userinfo userinfo = self._get_userinfo_from_token(request, token) user = self._process_userinfo(request, userinfo) access_token = token # user is already logged in and can use refresh token elif request.user.is_authenticated and not utils.is_refresh_token_expired(request): logger.debug("Refreshing token...") token, userinfo = \ self._get_token_and_userinfo_from_refresh_token(request) if token is None: # refresh failed return None self._process_token(request, token) # user is already logged in user = request.user access_token = token['access_token'] elif refresh_token: logger.debug("Refreshing supplied token...") token, userinfo = \ self._get_token_and_userinfo_from_refresh_token( request, refresh_token=refresh_token) if token is None: # refresh failed return None self._process_token(request, token) user = self._process_userinfo(request, userinfo) access_token = token['access_token'] else: token, userinfo = self._get_token_and_userinfo_redirect_flow( request) self._process_token(request, token) user = self._process_userinfo(request, userinfo) if idp_alias is not None: self._store_idp_userinfo(user, token, idp_alias) self._check_username_initialization(request, user) access_token = token['access_token'] # authz_token_middleware has already run, so must manually add # the `request.authz_token` attribute if user is not None: request.authz_token = get_authz_token( request, user=user, access_token=access_token) return user except Exception as e: logger.warning("login failed", exc_info=e) raise def get_user(self, user_id): try: return User.objects.get(pk=user_id) except User.DoesNotExist: return None def _get_token_and_userinfo_password_flow(self, username, password): try: client_id = settings.KEYCLOAK_CLIENT_ID client_secret = settings.KEYCLOAK_CLIENT_SECRET token_url = settings.KEYCLOAK_TOKEN_URL userinfo_url = settings.KEYCLOAK_USERINFO_URL verify_ssl = settings.KEYCLOAK_VERIFY_SSL scope = ['openid', 'profile', 'email'] oauth2_session = OAuth2Session(client=LegacyApplicationClient(client_id=client_id), scope=scope) verify = verify_ssl if verify_ssl and hasattr(settings, 'KEYCLOAK_CA_CERTFILE'): verify = settings.KEYCLOAK_CA_CERTFILE token = oauth2_session.fetch_token(token_url=token_url, username=username, password=password, client_id=client_id, client_secret=client_secret, verify=verify, scope=scope) userinfo = oauth2_session.get(userinfo_url).json() return token, userinfo except InvalidGrantError as e: # password wasn't valid, just log as a warning logger.warning(f"Failed to log in user {username} with " f"password: {e}") return None, None def _get_token_and_userinfo_redirect_flow(self, request): authorization_code_url = request.build_absolute_uri() client_id = settings.KEYCLOAK_CLIENT_ID client_secret = settings.KEYCLOAK_CLIENT_SECRET token_url = settings.KEYCLOAK_TOKEN_URL userinfo_url = settings.KEYCLOAK_USERINFO_URL verify_ssl = settings.KEYCLOAK_VERIFY_SSL state = request.session['OAUTH2_STATE'] redirect_uri = request.session['OAUTH2_REDIRECT_URI'] logger.debug("state={}".format(state)) oauth2_session = OAuth2Session(client_id, scope='openid profile email', redirect_uri=redirect_uri, state=state) verify = verify_ssl if verify_ssl and hasattr(settings, 'KEYCLOAK_CA_CERTFILE'): verify = settings.KEYCLOAK_CA_CERTFILE if not request.is_secure() and settings.DEBUG and not os.environ.get('OAUTHLIB_INSECURE_TRANSPORT'): # For local development (DEBUG=True), allow insecure OAuth redirect flow # if OAUTHLIB_INSECURE_TRANSPORT isn't already set os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = "1" logger.info("Adding env var OAUTHLIB_INSECURE_TRANSPORT=1 to allow " "OAuth redirect flow even though request is not secure") token = oauth2_session.fetch_token( token_url, client_secret=client_secret, authorization_response=authorization_code_url, verify=verify) userinfo = oauth2_session.get(userinfo_url).json() return token, userinfo def _get_token_and_userinfo_from_refresh_token(self, request, refresh_token=None): client_id = settings.KEYCLOAK_CLIENT_ID client_secret = settings.KEYCLOAK_CLIENT_SECRET token_url = settings.KEYCLOAK_TOKEN_URL userinfo_url = settings.KEYCLOAK_USERINFO_URL verify_ssl = settings.KEYCLOAK_VERIFY_SSL oauth2_session = OAuth2Session(client_id, scope='openid profile email') verify = verify_ssl if verify_ssl and hasattr(settings, 'KEYCLOAK_CA_CERTFILE'): verify = settings.KEYCLOAK_CA_CERTFILE refresh_token_ = (refresh_token if refresh_token is not None else request.session['REFRESH_TOKEN']) # refresh_token doesn't take client_secret kwarg, so create auth # explicitly auth = requests.auth.HTTPBasicAuth(client_id, client_secret) try: token = oauth2_session.refresh_token(token_url=token_url, refresh_token=refresh_token_, auth=auth, verify=verify) userinfo = oauth2_session.get(userinfo_url).json() return token, userinfo except InvalidGrantError as e: # probably session was terminated by admin or by user logging out in another client logger.warning(f"Failed to refresh token for user {request.user.username} " f": {e}") return None, None def _get_userinfo_from_token(self, request, token): client_id = settings.KEYCLOAK_CLIENT_ID userinfo_url = settings.KEYCLOAK_USERINFO_URL verify_ssl = settings.KEYCLOAK_VERIFY_SSL oauth2_session = OAuth2Session( client_id, token={'access_token': token}) verify = verify_ssl if verify_ssl and hasattr(settings, 'KEYCLOAK_CA_CERTFILE'): verify = settings.KEYCLOAK_CA_CERTFILE userinfo = oauth2_session.get( userinfo_url, verify=verify).json() if 'error' in userinfo: msg = userinfo.get('error_description') if msg is None: msg = f"Error fetching userinfo: {userinfo['error']}" raise Exception(msg) return userinfo def _process_token(self, request, token): # TODO validate the JWS signature logger.debug("token: {}".format(token)) now = time.time() # Put access_token into session to be used for authenticating with API # server sess = request.session sess['ACCESS_TOKEN'] = token['access_token'] sess['ACCESS_TOKEN_EXPIRES_AT'] = now + token['expires_in'] sess['REFRESH_TOKEN'] = token['refresh_token'] sess['REFRESH_TOKEN_EXPIRES_AT'] = now + token['refresh_expires_in'] def _process_userinfo(self, request, userinfo): logger.debug("userinfo: {}".format(userinfo)) sub = userinfo['sub'] username = userinfo['preferred_username'] email = userinfo.get('email', '') first_name = userinfo.get('given_name', None) last_name = userinfo.get('family_name', None) user = self._get_or_create_user(sub, username) user_profile = user.user_profile # Save the user info claims for (claim, value) in userinfo.items(): if user_profile.userinfo_set.filter(claim=claim).exists(): userinfo_claim = user_profile.userinfo_set.get(claim=claim) userinfo_claim.value = value userinfo_claim.save() else: user_profile.userinfo_set.create(claim=claim, value=value) # Update User model fields user = user_profile.user user.username = username user.email = email user.first_name = first_name user.last_name = last_name user.save() return user def _get_or_create_user(self, sub, username): try: user_profile = models.UserProfile.objects.get( userinfo__claim='sub', userinfo__value=sub) return user_profile.user except models.UserProfile.DoesNotExist: try: # For backwards compatibility, lookup by username user = User.objects.get(username=username) # Make sure there is a user_profile with the sub claim, which # will be used to do the lookup next time if not hasattr(user, 'user_profile'): user_profile = models.UserProfile(user=user) user_profile.save() user_profile.userinfo_set.create( claim='sub', value=sub) else: userinfo = user.user_profile.userinfo_set.get(claim='sub') logger.warning( f"User {username} exists but sub claims don't match: " f"old={userinfo.value}, new={sub}. Updating to new " "sub claim.") userinfo.value = sub userinfo.save() return user except User.DoesNotExist: user = User(username=username) user.save() user_profile = models.UserProfile(user=user) user_profile.save() user_profile.userinfo_set.create(claim='sub', value=sub) return user def _store_idp_userinfo(self, user, token, idp_alias): try: idp_token_url = None userinfo_url = None for auth_option in settings.AUTHENTICATION_OPTIONS['external']: if auth_option['idp_alias'] == idp_alias: idp_token_url = auth_option.get('idp_token_url') userinfo_url = auth_option.get('userinfo_url') break if idp_token_url is None or userinfo_url is None: logger.debug(f"idp_token_url and/or userinfo_url not set for {idp_alias} " "in AUTHENTICATION_OPTIONS, skipping retrieval of external IDP userinfo") return access_token = token['access_token'] logger.debug(f"access_token={access_token} for idp_alias={idp_alias}") # fetch the idp's token headers = {'Authorization': f'Bearer {access_token}'} # For the following to work, in Keycloak the IDP should have 'Store # Tokens' and 'Stored Tokens Readable' enabled and the user needs # the broker/read-token role r = requests.get(idp_token_url, headers=headers) idp_token = r.json() idp_headers = {'Authorization': f"Bearer {idp_token['access_token']}"} r = requests.get(userinfo_url, headers=idp_headers) userinfo = r.json() logger.debug(f"userinfo={userinfo}") # Save the idp user info claims user_profile = user.user_profile for (claim, value) in userinfo.items(): if user_profile.idp_userinfo.filter(idp_alias=idp_alias, claim=claim).exists(): userinfo_claim = user_profile.idp_userinfo.get(idp_alias=idp_alias, claim=claim) userinfo_claim.value = value userinfo_claim.save() else: user_profile.idp_userinfo.create(idp_alias=idp_alias, claim=claim, value=value) except Exception: logger.exception(f"Failed to store IDP userinfo for {user.username} from IDP {idp_alias}") def _check_username_initialization(self, request, user): # Check if the username assigned to the user was based on the user's # email address or if it was assigned some random string (Keycloak's # sub). If the latter, we'll want to alert the admins so that they can # assign a proper username for the user. user_profile = user.user_profile if (not user_profile.username_initialized and user_profile.userinfo_set.filter(claim='email').exists() and user_profile.userinfo_set.filter(claim='preferred_username').exists() and user_profile.userinfo_set.get(claim='email').value == user_profile.userinfo_set.get(claim='preferred_username').value): user_profile.username_initialized = True user_profile.save() # TODO: also check idp_userinfo.preferred_username if it exists if not user_profile.username_initialized and not user_profile.is_username_valid: try: utils.send_admin_alert_about_uninitialized_username( request, user.username, user.email, user.first_name, user.last_name) except Exception: logger.exception(f"Failed to send alert about username being uninitialized: {user.username}", extra={'request': request})