def get_token_from_auth_server()

in dialogflow-cx/vpc-sc-demo/backend/get_token.py [0:0]


def get_token_from_auth_server(session_id, auth_service_hostname=AUTH_SERVICE_HOSTNAME):
    """Retrieve a stored token from the VPC-SC Demo Auth Server."""

    auth_service_auth_endpoint = f"http://{auth_service_hostname}/auth"

    params = {
        "session_id": session_id,
    }

    req = requests.get(auth_service_auth_endpoint, params=params, timeout=10)
    if req.status_code == 401:
        logger.error(
            "  auth-service %s rejected request: %s",
            auth_service_auth_endpoint,
            req.text,
        )
        return {
            "response": flask.Response(
                status=200,
                response=json.dumps(
                    {"status": "BLOCKED", "reason": "REJECTED_REQUEST"}
                ),
            )
        }

    with open(PRIVATE_PEM_FILENAME, "r", encoding="utf8") as file_handle:
        private_pem = file_handle.read()

    with zipfile.ZipFile(io.BytesIO(req.content)) as zip_file:
        with zip_file.open("key") as curr_zip:
            key_bytes_stream = curr_zip.read()
        with zip_file.open("session_data") as curr_zip:
            session_data_bytes_stream = curr_zip.read()

    try:
        decrypt = PKCS1_OAEP.new(key=RSA.import_key(private_pem))
        decrypted_message = decrypt.decrypt(key_bytes_stream)
        aes_cipher = AESCipher(key=decrypted_message)
        return {
            "auth_data": json.loads(
                aes_cipher.decrypt(session_data_bytes_stream).decode()
            )
        }
    except ValueError as exc:
        logger.error("Decryption Error: %s", exc)
        return {
            "response": flask.Response(
                status=200,
                response=json.dumps(
                    {"status": "BLOCKED", "reason": "DECRYPTION_ERROR"}
                ),
            )
        }