in cookbooks/aws-parallelcluster-platform/files/dcv/pcluster_dcv_authenticator.py [0:0]
def _get_session_token(cls, request_token):
"""
Obtain the session token to connect to the DCV session.
Generate a Session token, store in memory and returns a json containing the token itself.
"""
logger.info("New request for Session Token.")
DCVAuthenticator._validate_param(request_token, DCVAuthenticator.TOKEN_REGEX, "requestToken")
# retrieve request token information to validate it
logger.info("Validating Request Token..")
token_info = cls.request_token_manager.get_token_info(request_token)
if not token_info:
raise DCVAuthenticator.IncorrectRequestError("The requestToken parameter is not valid")
user = token_info.user
session_id = token_info.dcv_session_id
access_file = token_info.access_file
logger.info("Request Token is valid.")
# verify token expiration
logger.info("Verifying Request Token..")
if datetime.utcnow() - token_info.creation_time > cls.request_token_ttl:
raise DCVAuthenticator.IncorrectRequestError("The requestToken is not valid anymore")
logger.info("Request Token is valid.")
# verify user by checking if the access_file is created by the user asking the session token
logger.info("Verifying Access File..")
try:
access_file_path = f"{AUTHORIZATION_FILE_DIR}/{access_file}"
file_details = os.stat(access_file_path)
if getpwuid(file_details.st_uid).pw_name != user:
raise DCVAuthenticator.IncorrectRequestError("The user is not the one that created the access file")
if datetime.utcnow() - datetime.utcfromtimestamp(file_details.st_mtime) > cls.request_token_ttl:
raise DCVAuthenticator.IncorrectRequestError("The access file has expired")
logger.info("Access File is valid. User identified correctly.")
os.remove(access_file_path)
logger.info("Access File removed correctly.")
except OSError:
raise DCVAuthenticator.IncorrectRequestError("The Access File does not exist")
# create and register internally a session token
logger.info("Generating new Session Token..")
DCVAuthenticator._verify_session_existence(user, session_id)
session_token = generate_random_token(256)
cls.session_token_manager.add_token(
session_token, DCVAuthenticator.SessionTokenInfo(user, session_id, datetime.utcnow())
)
logger.info("Session Token created successfully.")
return json.dumps({"sessionToken": session_token})