backend/services/users/authentication_service.py (112 lines of code) (raw):

import base64 import urllib.parse from flask import current_app, request, session from flask_httpauth import HTTPTokenAuth from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired from backend import osm from backend.api.utils import TMAPIDecorators from backend.services.messaging.message_service import MessageService from backend.services.users.user_service import UserService, NotFound from werkzeug import url_quote token_auth = HTTPTokenAuth(scheme="Token") tm = TMAPIDecorators() @token_auth.verify_token def verify_token(token): """ Verify the supplied token and check user role is correct for the requested resource""" tm.authenticated_user_id = None if not token: return False try: decoded_token = base64.b64decode(token).decode("utf-8") except UnicodeDecodeError: current_app.logger.debug(f"Unable to decode token {request.base_url}") return False # Can't decode token, so fail login valid_token, user_id = AuthenticationService.is_valid_token(decoded_token, 604800) if not valid_token: current_app.logger.debug(f"Token not valid {request.base_url}") return False tm.authenticated_user_id = ( user_id # Set the user ID on the decorator as a convenience ) return user_id # All tests passed token is good for the requested resource class AuthServiceError(Exception): """ Custom Exception to notify callers an error occurred when authenticating """ def __init__(self, message): if current_app: current_app.logger.debug(message) class AuthenticationService: @staticmethod def login_user(osm_user_details, email, user_element="user") -> dict: """ Generates authentication details for user, creating in DB if user is unknown to us :param osm_user_details: XML response from OSM :param redirect_to: Route to redirect user to, from callback url :param user_element: Exists for unit testing :raises AuthServiceError :returns A dictionary containing the keys "username", "session_token" and "picture." """ osm_user = osm_user_details.find(user_element) if osm_user is None: raise AuthServiceError("User element not found in OSM response") osm_id = int(osm_user.attrib["id"]) username = osm_user.attrib["display_name"] try: # get gravatar profile picture file name user_picture = osm_user.find("img").attrib["href"] except (AttributeError, IndexError): user_picture = None try: UserService.get_user_by_id(osm_id) UserService.update_user(osm_id, username, user_picture) except NotFound: # User not found, so must be new user changesets = osm_user.find("changesets") changeset_count = int(changesets.attrib["count"]) new_user = UserService.register_user( osm_id, username, changeset_count, user_picture, email ) MessageService.send_welcome_message(new_user) session_token = AuthenticationService.generate_session_token_for_user(osm_id) return { "username": username, "session_token": session_token, "picture": user_picture, } @staticmethod def authenticate_email_token(username: str, token: str): """ Validate that the email token is valid """ try: user = UserService.get_user_by_username(username) except NotFound: raise AuthServiceError("Unable to authenticate") is_valid, tokenised_email = AuthenticationService.is_valid_token(token, 86400) if not is_valid: raise AuthServiceError("Unable to authenticate") if user.email_address != tokenised_email: raise AuthServiceError("Unable to authenticate") # Token is valid so update DB and return user.set_email_verified_status(is_verified=True) return AuthenticationService._get_email_validated_url(True) @staticmethod def _get_email_validated_url(is_valid: bool) -> str: """ Helper function to generate redirect url for email verification """ base_url = current_app.config["APP_BASE_URL"] verification_params = {"is_valid": is_valid} verification_url = "{0}/validate-email?{1}".format( base_url, urllib.parse.urlencode(verification_params) ) return verification_url @staticmethod def get_authentication_failed_url(): """ Generates the auth-failed URL for the running app """ base_url = current_app.config["APP_BASE_URL"] auth_failed_url = f"{base_url}/auth-failed" return auth_failed_url @staticmethod def generate_session_token_for_user(osm_id: int): """ Generates a unique token with the osm_id and current time embedded within it :param osm_id: OSM ID of the user authenticating :return: Token """ entropy = current_app.secret_key if current_app.secret_key else "un1testingmode" serializer = URLSafeTimedSerializer(entropy) return serializer.dumps(osm_id) @staticmethod def generate_authorize_url(callback): token, secret = osm.generate_request_token(callback) url = f"{osm.expand_url(osm.authorize_url)}?oauth_token={url_quote(token)}" # Remove tokens from session. The library creates it. session.pop("osm_oauthtok") return {"auth_url": url, "oauth_token": token, "oauth_token_secret": secret} @staticmethod def is_valid_token(token, token_expiry): """ Validates if the supplied token is valid, and hasn't expired. :param token: Token to check :param token_expiry: When the token expires in seconds :return: True if token is valid, and user_id contained in token """ entropy = current_app.secret_key if current_app.secret_key else "un1testingmode" serializer = URLSafeTimedSerializer(entropy) try: tokenised_user_id = serializer.loads(token, max_age=token_expiry) except SignatureExpired: current_app.logger.debug("Token has expired") return False, None except BadSignature: current_app.logger.debug("Bad Token Signature") return False, None return True, tokenised_user_id