in Solutions/Infoblox/Data Connectors/InfobloxCloudDataConnector/SharedCode/utils.py [0:0]
def auth_sentinel(self):
"""Authenticate with microsoft sentinel and update header."""
__method_name = inspect.currentframe().f_code.co_name
try:
for i in range(consts.MAX_RETRIES):
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Generating microsoft sentinel access token.",
)
)
azure_auth_url = consts.AZURE_AUTHENTICATION_URL.format(consts.AZURE_TENANT_ID)
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Calling auth url = {}".format(azure_auth_url),
)
)
body = {
"client_id": consts.AZURE_CLIENT_ID,
"client_secret": consts.AZURE_CLIENT_SECRET,
"grant_type": "client_credentials",
"scope": "https://management.azure.com/.default",
}
try:
response = requests.post(url=azure_auth_url, data=body)
except requests.RequestException as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Request error : Error-{} Index = {}".format(error, i),
)
)
continue
if response.status_code >= 200 and response.status_code <= 299:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Got response with Status code : {}".format(response.status_code),
)
)
response_json = response.json()
bearer_token = self.get_bearer_token_from_response(response_json)
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Bearer Token Generated: {}".format(bearer_token),
)
)
self.headers = {
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(bearer_token),
}
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"MS authentication complete",
)
)
return
elif response.status_code == 400:
response_json = response.json()
error = response_json.get("error", "Bad request")
error_description = response_json.get("error_description", "")
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Status Code = {}, Error-{}, Error Description = {}".format(
response.status_code,
error,
error_description,
),
)
)
raise InfobloxException()
elif response.status_code == 401:
response_json = response.json()
error = response_json.get("error", "Unauthorized")
error_description = response_json.get("error_description", "")
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Status Code = {}, Error-{}, Error Description = {}".format(
response.status_code,
error,
error_description,
),
)
)
raise InfobloxException()
elif response.status_code == 500:
log_message = "Internal Server Error"
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Status Code = {}, Error-{}".format(response.status_code, log_message),
)
)
raise InfobloxException()
else:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Status Code = {}, Error-{}".format(response.status_code, response.content),
)
)
raise InfobloxException()
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Max retries reached for authentication of sentinel API",
)
)
raise InfobloxException()
except InfobloxException:
raise InfobloxException()
except requests.HTTPError as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.HTTP_ERROR_MSG.format(error),
)
)
raise InfobloxException()
except Exception as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(error),
)
)
raise InfobloxException()