def _get_session_token()

in cookbooks/aws-parallelcluster-platform/files/dcv/pcluster_dcv_authenticator.py [0:0]


    def _get_session_token(cls, request_token):
        """
        Obtain the session token to connect to the DCV session.

        Generate a Session token, store in memory and returns a json containing the token itself.
        """
        logger.info("New request for Session Token.")
        DCVAuthenticator._validate_param(request_token, DCVAuthenticator.TOKEN_REGEX, "requestToken")

        # retrieve request token information to validate it
        logger.info("Validating Request Token..")
        token_info = cls.request_token_manager.get_token_info(request_token)
        if not token_info:
            raise DCVAuthenticator.IncorrectRequestError("The requestToken parameter is not valid")
        user = token_info.user
        session_id = token_info.dcv_session_id
        access_file = token_info.access_file
        logger.info("Request Token is valid.")

        # verify token expiration
        logger.info("Verifying Request Token..")
        if datetime.utcnow() - token_info.creation_time > cls.request_token_ttl:
            raise DCVAuthenticator.IncorrectRequestError("The requestToken is not valid anymore")
        logger.info("Request Token is valid.")

        # verify user by checking if the access_file is created by the user asking the session token
        logger.info("Verifying Access File..")
        try:
            access_file_path = f"{AUTHORIZATION_FILE_DIR}/{access_file}"
            file_details = os.stat(access_file_path)
            if getpwuid(file_details.st_uid).pw_name != user:
                raise DCVAuthenticator.IncorrectRequestError("The user is not the one that created the access file")
            if datetime.utcnow() - datetime.utcfromtimestamp(file_details.st_mtime) > cls.request_token_ttl:
                raise DCVAuthenticator.IncorrectRequestError("The access file has expired")
            logger.info("Access File is valid. User identified correctly.")
            os.remove(access_file_path)
            logger.info("Access File removed correctly.")
        except OSError:
            raise DCVAuthenticator.IncorrectRequestError("The Access File does not exist")

        # create and register internally a session token
        logger.info("Generating new Session Token..")
        DCVAuthenticator._verify_session_existence(user, session_id)
        session_token = generate_random_token(256)
        cls.session_token_manager.add_token(
            session_token, DCVAuthenticator.SessionTokenInfo(user, session_id, datetime.utcnow())
        )
        logger.info("Session Token created successfully.")

        return json.dumps({"sessionToken": session_token})