aa-integration-backend/ui-connector/auth_options.py (68 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import requests
import logging
import config
from urllib.parse import urlencode
def check_salesforce_token(token):
"""Verifies the access token using Salesforce OpenID Connect.
Reference: https://help.salesforce.com/s/articleView?id=sf.remoteaccess_using_userinfo_endpoint.htm
"""
request_headers = {
'Authorization': 'Bearer ' + token
}
# Get the user info.
user_info_url = f'https://{config.SALESFORCE_DOMAIN}/services/oauth2/userinfo'
user_info_resp = requests.get(user_info_url, headers=request_headers)
# Check response.
if user_info_resp.status_code == 200:
user_info = user_info_resp.json()
logging.info('Verifying user {}.'.format(user_info['user_id']))
# Check the user's organization.
if user_info['organization_id'] == config.SALESFORCE_ORGANIZATION_ID:
return True
else:
logging.warning('The organization of user {} is not allowed.'.format(user_info['user_id']))
else:
logging.warning('Failed to verify the access token, {0}, {1}.'.format(user_info_resp.status_code, user_info_resp.reason))
return False
def check_salesforce_lwc_token(token):
"""Verifies Salesforce Org Id using Client Credentials OAuth flow.
Uses `token` to call the oauth2/userinfo endpoint, getting
Salesforce Organization Id for the Connected App's authorized user.
If this matches `config.SALESFORCE_ORGANIZATION_ID`, return `True`.
Args:
token: an `access_token` from the SF OAuth2/token endpoint.
Returns:
A Boolean; if `True`, access org id matches `config` module, else `False`.
Raises:
Warning: When the token is not valid.
Warning: When the SF org id doesn't match.
Reference: https://help.salesforce.com/s/articleView?id=sf.remoteaccess_oauth_client_credentials_flow.htm&type=5
"""
access_token = re.sub(r'Bearer ', '', token, re.IGNORECASE)
if config.SALESFORCE_DOMAIN == 'login.salesforce.com':
logging.error('SALESFORCE_DOMAIN is not set. Auth will fail.')
if config.SALESFORCE_ORGANIZATION_ID == 'YOUR_ORGANIZATION_ID':
logging.error('SALESFORCE_ORGANIZATION_ID is not set. Auth will fail.')
user_info_resp = requests.get(
f'https://{config.SALESFORCE_DOMAIN}/services/oauth2/userinfo?' + \
urlencode({
'access_token': access_token
})
)
if user_info_resp.status_code == 200:
user_info = user_info_resp.json()
user_org = user_info['organization_id']
config_org = config.SALESFORCE_ORGANIZATION_ID
min_len = min(len(user_org), len(config_org))
if min_len and user_org[:min_len] == config_org[:min_len]:
return True
else:
logging.warning(
'The Salesforce org of user {} is not allowed.'.format(
user_info['user_id']))
else:
logging.warning(
'Failed to verify Salesforce access_token, {0}, {1}.'.format(
user_info_resp.status_code, user_info_resp.reason))
def check_genesyscloud_token(token):
"""Verifies the Genesys cloud token with Users API.
Call the Get method of Users API and use the response status to verify
whether the token is a valid token to call other Genesys cloud endpoint.
From the Gensys cloud connector,
we have to get Dialogflow JWT first then check the platform SDK token,
so the first pass with empty token will always return true to get JWT first
In the second pass, when token has value, we check if the token is valid
Args:
token: An open smalltable.Table instance.
Returns:
A Boolean, if it is True, then the token is valid, otherwise False.
Raises:
Warning: When The token is not valid.
Reference: https://developer.genesys.cloud/devapps/api-explorer#get-api-v2-users-me
"""
# Prepare for GET /api/v2/authorization/roles request.
request_headers = {
'Authorization': 'Bearer ' + token
}
response = requests.get(f'https://api.{config.GENESYS_CLOUD_ENVIRONMENT}/api/v2/users/me', headers=request_headers)
# Check response.
if response.status_code == 200:
response_json = response.json()
logging.info('Verifying user {}.'.format(response_json['id']))
# Genesys cloud response does not have organization to check
return True
else:
logging.warning('Failed to verify the access token, {0}, {1}.'.format(response.status_code, response.reason))
return False
def check_twilio_token(token):
"""Verifies the Twilio token for the flex plugin.
Args:
token: An open smalltable.Table instance.
Returns:
A Boolean, if it is True, then the token is valid, otherwise False.
Raises:
Warning: When The token is not valid.
Reference:
Twilio flex plugin
https://www.twilio.com/en-us/blog/sms-otp-authentication-flex
https://www.twilio.com/docs/flex/developer/ui/add-components-flex-ui
"""
body = {"Token": token}
response = requests.post(f'https://{config.TWILIO_FLEX_ENVIRONMENT}/verify', json=body)
# If the endpoint return 200 then the token was validated
if response.status_code == 200:
return True
else:
logging.warning('Failed to verify the access token, {0}, {1}.'.format(response.status_code, response.reason))