in Solutions/CofenseTriage/Data Connectors/CofenseTriageDataConnector/SharedCode/utils.py [0:0]
def auth_cofense(azure_function_name):
"""To authenticate with cofense."""
__method_name = inspect.currentframe().f_code.co_name
try:
applogger.info(
"{}(method={}) : {}: generating cofense access token.".format(
LOGS_STARTS_WITH, __method_name, azure_function_name
)
)
cofense_auth_url = "{}{}".format(COFENSE_BASE_URL, ENDPOINTS["authentication"])
COFENSE_429_SLEEP = 60
body = {
"client_id": COFENSE_CLIENT_ID,
"client_secret": COFENSE_CLIENT_SECRET,
"grant_type": "client_credentials",
}
proxies = create_proxy()
retry_count_429 = 0
while retry_count_429 <= 1:
response = make_rest_call(
url=cofense_auth_url,
method="POST",
azure_function_name=azure_function_name,
payload=body,
proxies=proxies,
)
response_status_code = response.status_code
if response_status_code >= 200 and response_status_code <= 299:
json_response = response.json()
if "access_token" not in json_response:
applogger.error(
"{}(method={}) : {}: url: {}, Status Code : {} : "
"Access token field not found in cofense authentication api call response.".format(
LOGS_STARTS_WITH,
__method_name,
azure_function_name,
cofense_auth_url,
response.status_code,
)
)
applogger.debug(
"{}(method={}) : {}: url: {}, Status Code : {}, Response reason: {}, Response: {} : "
"Access token not found in cofense authentication api call.".format(
LOGS_STARTS_WITH,
__method_name,
azure_function_name,
cofense_auth_url,
response.status_code,
response.reason,
response.text,
)
)
raise CofenseException()
else:
access_token = json_response.get("access_token")
applogger.info(
"{}(method={}) : {}: cofense access token generated successfully.".format(
LOGS_STARTS_WITH, __method_name, azure_function_name
)
)
applogger.debug(
"{}(method={}) : {}: url: {}, Status Code : {}: Cofense Triage access token generated.".format(
LOGS_STARTS_WITH,
__method_name,
azure_function_name,
cofense_auth_url,
response.status_code,
)
)
return access_token
elif response_status_code == 429:
applogger.error(
"{}(method={}) : {}: url: {}, Status Code : {} : "
"Getting 429 from cofense api call. Retrying again after {} seconds.".format(
LOGS_STARTS_WITH,
__method_name,
azure_function_name,
cofense_auth_url,
response.status_code,
COFENSE_429_SLEEP,
)
)
applogger.debug(
"{}(method={}) : {}: url: {}, Status Code : {}, Response reason: {}, Response: {} : "
"Getting 429 from cofense api call. Retry count: {}.".format(
LOGS_STARTS_WITH,
__method_name,
azure_function_name,
cofense_auth_url,
response.status_code,
response.reason,
response.text,
retry_count_429,
)
)
retry_count_429 += 1
time.sleep(COFENSE_429_SLEEP)
else:
applogger.error(
"{}(method={}) : {}: url: {}, Status Code : {}: Error while creating cofense triage access token."
" Error Reason: {}".format(
LOGS_STARTS_WITH,
__method_name,
azure_function_name,
cofense_auth_url,
response.status_code,
response.reason,
)
)
applogger.debug(
"{}(method={}) : {}: url: {}, Status Code : {}, Response: {} :"
" Error while creating cofense triage access token. Error Reason: {}".format(
LOGS_STARTS_WITH,
__method_name,
azure_function_name,
cofense_auth_url,
response.status_code,
response.text,
response.reason,
)
)
raise CofenseException()
applogger.error(
"{}(method={}) : {} : Max retries exceeded for getting cofense access token.".format(
azure_function_name, LOGS_STARTS_WITH, __method_name
)
)
raise CofenseException()
except CofenseException as error:
applogger.error(
"{}(method={}) : {}: Error generated while getting cofense access token :{}".format(
LOGS_STARTS_WITH,
__method_name,
azure_function_name,
error,
)
)
raise CofenseException()