in aa-integration-backend/ui-connector/auth_options.py [0:0]
def check_salesforce_lwc_token(token):
"""Verifies Salesforce Org Id using Client Credentials OAuth flow.
Uses `token` to call the oauth2/userinfo endpoint, getting
Salesforce Organization Id for the Connected App's authorized user.
If this matches `config.SALESFORCE_ORGANIZATION_ID`, return `True`.
Args:
token: an `access_token` from the SF OAuth2/token endpoint.
Returns:
A Boolean; if `True`, access org id matches `config` module, else `False`.
Raises:
Warning: When the token is not valid.
Warning: When the SF org id doesn't match.
Reference: https://help.salesforce.com/s/articleView?id=sf.remoteaccess_oauth_client_credentials_flow.htm&type=5
"""
access_token = re.sub(r'Bearer ', '', token, re.IGNORECASE)
if config.SALESFORCE_DOMAIN == 'login.salesforce.com':
logging.error('SALESFORCE_DOMAIN is not set. Auth will fail.')
if config.SALESFORCE_ORGANIZATION_ID == 'YOUR_ORGANIZATION_ID':
logging.error('SALESFORCE_ORGANIZATION_ID is not set. Auth will fail.')
user_info_resp = requests.get(
f'https://{config.SALESFORCE_DOMAIN}/services/oauth2/userinfo?' + \
urlencode({
'access_token': access_token
})
)
if user_info_resp.status_code == 200:
user_info = user_info_resp.json()
user_org = user_info['organization_id']
config_org = config.SALESFORCE_ORGANIZATION_ID
min_len = min(len(user_org), len(config_org))
if min_len and user_org[:min_len] == config_org[:min_len]:
return True
else:
logging.warning(
'The Salesforce org of user {} is not allowed.'.format(
user_info['user_id']))
else:
logging.warning(
'Failed to verify Salesforce access_token, {0}, {1}.'.format(
user_info_resp.status_code, user_info_resp.reason))