in Solutions/CofenseTriage/Data Connectors/CofenseTriageDataConnector/NonCofenseBasedIndicatorCreatorToCofense/cofense.py [0:0]
def update_indicators(self, threat_level, id=""):
"""To update indicators into cofense triage.
Args:
threat_level (String): (Required) Level of threat (Malicious, Suspicious, or Benign).
id (str, optional): Cofense Indicator ID for updating indicator.. Defaults to "".
Raises:
CofenseException: Custom exception raises while getting exception.
Returns:
update_indicator_json_response: API response to fetch the cofense indicator id.
"""
try:
__method_name = inspect.currentframe().f_code.co_name
cofense_update_indicator_url = COFENSE_UPDATE_INDICATOR_URL + str(id)
body = json.dumps(
{
"data": {
"id": str(id),
"type": "threat_indicators",
"attributes": {
"threat_level": threat_level,
},
}
}
)
headers = {
"Accept": self.accept_content_type,
"Content-Type": self.accept_content_type,
"Authorization": "Bearer " + self.access_token,
}
retry_count_429 = 0
retry_count_401 = 0
while retry_count_429 <= 1 and retry_count_401 <= 1:
update_indicator_response = make_rest_call(
url=cofense_update_indicator_url,
method="PUT",
azure_function_name=SENTINEL_TO_COFENSE,
payload=body,
headers=headers,
proxies=self.proxy,
)
update_indicator_status_code = update_indicator_response.status_code
if (
update_indicator_status_code >= 200
and update_indicator_status_code <= 299
):
update_indicator_json_response = json.loads(
update_indicator_response.text
)
# return the json response to fetch the indicator id.
return update_indicator_json_response
elif update_indicator_status_code == 401:
retry_count_401 = retry_count_401 + 1
applogger.error(
"{}(method={}) : {} : url: {}, Status Code : {}: Error Reason: {}: "
" Cofense access token expired, generating new access token. Retry count: {}.".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
cofense_update_indicator_url,
update_indicator_response.status_code,
update_indicator_response.reason,
retry_count_401,
)
)
applogger.debug(
"{}(method={}) : {} : url: {}, Status Code : {}, Error Reason: {}, Response: {} :"
" Cofense access token expired, generating new access token. Retry count: {}.".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
cofense_update_indicator_url,
update_indicator_response.status_code,
update_indicator_response.reason,
update_indicator_response.text,
retry_count_401,
)
)
self.access_token = auth_cofense(SENTINEL_TO_COFENSE)
elif update_indicator_status_code == 429:
applogger.error(
"{}(method={}) : {}: url: {}, Status Code : {} : "
"Getting 429 from cofense api call. Retrying again after {} seconds.".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
cofense_update_indicator_url,
update_indicator_response.status_code,
COFENSE_429_SLEEP,
)
)
applogger.debug(
"{}(method={}) : {}: url: {}, Status Code : {}, Response reason: {}, Response: {} : "
"Getting 429 from cofense api call. Retry count: {}.".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
cofense_update_indicator_url,
update_indicator_response.status_code,
update_indicator_response.reason,
update_indicator_response.text,
retry_count_429,
)
)
retry_count_429 += 1
time.sleep(COFENSE_429_SLEEP)
else:
applogger.error(
"{}(method={}) : {} : url: {}, Status Code : {}: Error while updating indicators"
" into cofense triage. Error Reason: {}".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
cofense_update_indicator_url,
update_indicator_response.status_code,
update_indicator_response.reason,
)
)
applogger.debug(
"{}(method={}) : {}: url: {}, Status Code : {}, Response reason: {}, Response: {} : "
"Error while creating indicators into cofense triage.".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
cofense_update_indicator_url,
update_indicator_response.status_code,
update_indicator_response.reason,
update_indicator_response.text,
)
)
raise CofenseException()
applogger.error(
"{}(method={}) : {} : Max retries exceeded for updating indicators into cofense.".format(
LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE
)
)
raise CofenseException()
except CofenseException as error:
applogger.error(
"{}(method={}) : {} : Error occurred while updating indicator in cofense : {}".format(
LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error
)
)
raise CofenseException()