aa-integration-backend/ui-connector/auth.py (66 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import jwt
import requests
from flask import request, jsonify
from functools import wraps
import config, auth_options
jwt_secret_key = '' # To be loaded from config.JWT_SECRET_KEY_PATH
def load_jwt_secret_key():
with open(config.JWT_SECRET_KEY_PATH, 'r') as key_file:
jwt_secret_key = key_file.read()
def check_auth(token):
if (config.AUTH_OPTION == 'SalesforceLWC'):
return auth_options.check_salesforce_lwc_token(token)
elif (config.AUTH_OPTION == 'Salesforce'):
return auth_options.check_salesforce_token(token)
elif (config.AUTH_OPTION == 'GenesysCloud'):
return auth_options.check_genesyscloud_token(token)
elif (config.AUTH_OPTION == 'Twilio'):
return auth_options.check_twilio_token(token)
elif (config.AUTH_OPTION == 'Skip'):
return True
# Customize your authentication method here.
# elif (config.AUTH_OPTION == 'Your Auth Option'):
# return auth_options.check_my_token(token)
return False
def check_jwt(token):
try:
# Decode the payload to fetch the stored details.
data = jwt.decode(token, jwt_secret_key, algorithms=['HS256'])
if 'gcp_agent_assist_project' not in data:
return False, 'The target project in your token is missing.'
if data['gcp_agent_assist_project'] != config.GCP_PROJECT_ID:
return False, 'The target project in your token is invalid.'
if 'exp' not in data:
return False, 'The expiration time in your token is missing.'
if data['exp'] < datetime.datetime.now().timestamp():
return False, 'Your token has expired.'
return True, 'Your token is valid.'
except:
return False, 'Failed to parse your token.'
def generate_jwt(user_info=None):
gcp_agent_assist_user = ''
if user_info:
gcp_agent_assist_user = user_info.get('gcp_agent_assist_user')
return jwt.encode({'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=config.JWT_TOKEN_LIFETIME),
'gcp_agent_assist_project': config.GCP_PROJECT_ID,
'gcp_agent_assist_user': gcp_agent_assist_user},
jwt_secret_key,
'HS256')
def check_app_auth(auth):
if config.APP_AUTH_OPTION == 'Twilio':
response = requests.get(
config.TWILIO_ACCOUNTS_API_URL, auth=(auth.get('accountSid'), auth.get('authToken')))
if response.status_code == 200:
data = response.json()
if data.get('sid') == config.TWILIO_ACCOUNT_SID:
return True
return False
def token_required(f):
"""Verifies JWT as a function decorator."""
@wraps(f)
def decorator(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'message': 'Token is missing.'}), 401
result, message = check_jwt(token)
if result:
return f(*args, **kwargs)
else:
return jsonify({'message': message}), 401
return decorator