in dialogflow-cx/vpc-sc-demo/backend/get_token.py [0:0]
def get_token_from_auth_server(session_id, auth_service_hostname=AUTH_SERVICE_HOSTNAME):
"""Retrieve a stored token from the VPC-SC Demo Auth Server."""
auth_service_auth_endpoint = f"http://{auth_service_hostname}/auth"
params = {
"session_id": session_id,
}
req = requests.get(auth_service_auth_endpoint, params=params, timeout=10)
if req.status_code == 401:
logger.error(
" auth-service %s rejected request: %s",
auth_service_auth_endpoint,
req.text,
)
return {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "REJECTED_REQUEST"}
),
)
}
with open(PRIVATE_PEM_FILENAME, "r", encoding="utf8") as file_handle:
private_pem = file_handle.read()
with zipfile.ZipFile(io.BytesIO(req.content)) as zip_file:
with zip_file.open("key") as curr_zip:
key_bytes_stream = curr_zip.read()
with zip_file.open("session_data") as curr_zip:
session_data_bytes_stream = curr_zip.read()
try:
decrypt = PKCS1_OAEP.new(key=RSA.import_key(private_pem))
decrypted_message = decrypt.decrypt(key_bytes_stream)
aes_cipher = AESCipher(key=decrypted_message)
return {
"auth_data": json.loads(
aes_cipher.decrypt(session_data_bytes_stream).decode()
)
}
except ValueError as exc:
logger.error("Decryption Error: %s", exc)
return {
"response": flask.Response(
status=200,
response=json.dumps(
{"status": "BLOCKED", "reason": "DECRYPTION_ERROR"}
),
)
}