django_airavata/apps/auth/utils.py (160 lines of code) (raw):

"""Auth utilities.""" import time from airavata.model.security.ttypes import AuthzToken from django.conf import settings from django.contrib.auth import authenticate from django.core.mail import EmailMessage from django.http.request import split_domain_port from django.template import Context, Template from oauthlib.oauth2 import BackendApplicationClient from requests_oauthlib import OAuth2Session from . import models def get_authz_token(request, user=None, access_token=None): """Construct AuthzToken instance from session; refresh token if needed.""" if access_token is not None: return _create_authz_token(request, user=user, access_token=access_token) elif is_request_access_token(request): return _create_authz_token(request, user=user) elif is_session_access_token(request) and not is_session_access_token_expired(request, user=user): return _create_authz_token(request, user=user, access_token=access_token) elif not is_refresh_token_expired(request): # Have backend reauthenticate the user with the refresh token user = authenticate(request) if user: return _create_authz_token(request, user=user) return None def get_service_account_authz_token(): client_id = settings.KEYCLOAK_CLIENT_ID client_secret = settings.KEYCLOAK_CLIENT_SECRET token_url = settings.KEYCLOAK_TOKEN_URL verify_ssl = settings.KEYCLOAK_VERIFY_SSL client = BackendApplicationClient(client_id=client_id) oauth = OAuth2Session(client=client) verify = verify_ssl if verify_ssl and hasattr(settings, 'KEYCLOAK_CA_CERTFILE'): verify = settings.KEYCLOAK_CA_CERTFILE token = oauth.fetch_token( token_url=token_url, client_id=client_id, client_secret=client_secret, verify=verify) access_token = token.get('access_token') return AuthzToken( accessToken=access_token, # This is a service account, so leaving out userName for now claimsMap={'gatewayID': settings.GATEWAY_ID}) def _create_authz_token(request, user=None, access_token=None): if access_token is None: access_token = _get_access_token(request) if user is None: user = request.user username = user.username gateway_id = settings.GATEWAY_ID return AuthzToken(accessToken=access_token, claimsMap={'gatewayID': gateway_id, 'userName': username}) def _get_access_token_source(request): if hasattr(request, 'auth') and request.auth is not None: return 'request' elif 'ACCESS_TOKEN' in request.session: return 'session' else: return None def _get_access_token(request): source = _get_access_token_source(request) if source == 'request': return request.auth elif source == 'session': return request.session['ACCESS_TOKEN'] else: return None def is_session_access_token(request): """Return True if access token is stored in the user's session.""" return _get_access_token_source(request) == 'session' def is_request_access_token(request): """Return True if access token passed in request, e.g., a Bearer token.""" return _get_access_token_source(request) == 'request' def is_session_access_token_expired(request, user=None): """Return True if session access_token is not available or is expired.""" user = user if user is not None else request.user now = time.time() return not user.is_authenticated \ or 'ACCESS_TOKEN' not in request.session \ or 'ACCESS_TOKEN_EXPIRES_AT' not in request.session \ or request.session['ACCESS_TOKEN_EXPIRES_AT'] < now def is_refresh_token_expired(request): """Return True if refresh_token is not available or is expired.""" now = time.time() return 'REFRESH_TOKEN' not in request.session \ or 'REFRESH_TOKEN_EXPIRES_AT' not in request.session \ or request.session['REFRESH_TOKEN_EXPIRES_AT'] < now def send_new_user_email(request, username, email, first_name, last_name): """Send new user email notification to portal admins.""" new_user_email_template = models.EmailTemplate.objects.get( pk=models.NEW_USER_EMAIL_TEMPLATE) domain, port = split_domain_port(request.get_host()) context = Context({ "username": username, "email": email, "first_name": first_name, "last_name": last_name, "portal_title": settings.PORTAL_TITLE, "gateway_id": settings.GATEWAY_ID, "http_host": domain, }) subject = Template(new_user_email_template.subject).render(context) body = Template(new_user_email_template.body).render(context) send_email_to_admins(subject, body) def send_admin_alert_about_uninitialized_username(request, username, email, first_name, last_name): domain, port = split_domain_port(request.get_host()) context = Context({ "username": username, "email": email, "first_name": first_name, "last_name": last_name, "portal_title": settings.PORTAL_TITLE, "gateway_id": settings.GATEWAY_ID, "http_host": domain, }) subject = Template("Please fix username: a user of {{portal_title}} ({{http_host}}) has been assigned an auto-generated username ({{username}})").render(context) body = Template(""" <p> Dear Admin, </p> <p> The following user has an auto-generated username because the system could not determine a proper username: </p> <p>Username: {{username}}</p> <p>Name: {{first_name}} {{last_name}}</p> <p>Email: {{email}}</p> <p> This likely happened because there was no appropriate user attribute (typically email address) to use for the user's username when the user logged in through an external identity provider. Please update the username to the user's email address or some other appropriate value in the <a href="https://{{http_host}}/admin/users/">Manage Users</a> view in the portal. </p> """.strip()).render(context) send_email_to_admins(subject, body) def send_admin_user_completed_profile(request, user_profile): domain, port = split_domain_port(request.get_host()) user = user_profile.user extended_profile_values = user_profile.extended_profile_values.filter( ext_user_profile_field__deleted=False).order_by("ext_user_profile_field__order").all() context = Context({ "username": user.username, "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "portal_title": settings.PORTAL_TITLE, "gateway_id": settings.GATEWAY_ID, "http_host": domain, "extended_profile_values": extended_profile_values }) user_profile_completed_template = models.EmailTemplate.objects.get( pk=models.USER_PROFILE_COMPLETED_TEMPLATE) subject = Template(user_profile_completed_template.subject).render(context) body = Template(user_profile_completed_template.body).render(context) send_email_to_admins(subject, body) def send_email_to_admins(subject, body): msg = EmailMessage(subject=subject, body=body, from_email=f'"{settings.PORTAL_TITLE}" <{settings.SERVER_EMAIL}>', to=[f'"{a[0]}" <{a[1]}>' for a in getattr(settings, 'PORTAL_ADMINS', settings.ADMINS)]) msg.content_subtype = 'html' msg.send() def send_email_to_user(template_id, context): email_template = models.EmailTemplate.objects.get(pk=template_id) subject = Template(email_template.subject).render(context) body = Template(email_template.body).render(context) msg = EmailMessage( subject=subject, body=body, from_email="\"{}\" <{}>".format(settings.PORTAL_TITLE, settings.SERVER_EMAIL), to=["\"{} {}\" <{}>".format(context['first_name'], context['last_name'], context['email'])], reply_to=[f"\"{a[0]}\" <{a[1]}>" for a in getattr(settings, 'PORTAL_ADMINS', settings.ADMINS)] ) msg.content_subtype = 'html' msg.send()