in Solutions/CofenseTriage/Data Connectors/CofenseTriageDataConnector/IndicatorCreatorToDefender/sentinel.py [0:0]
def post_indicators(self, sentinel_json_indicator_list, defender_object):
"""To post and update the indicators into MS Defender."""
try:
__method_name = inspect.currentframe().f_code.co_name
applogger.debug(
"{}(method={}) : {} : "
"Posting and updating indicators into MS Defender.".format(
consts.LOGS_STARTS_WITH,
__method_name,
consts.SENTINEL_TO_DEFENDER,
)
)
# posting and updating indicators
for indicator in sentinel_json_indicator_list:
current_indicator_updated_date = self.convert_sentinel_datetime_format(
indicator=indicator
)
if self.new_execution_flag == "True":
if (
current_indicator_updated_date
<= self.current_checkpoint_indicator_date
):
# exit the class. execution is completed.
return False
self.sentinel_checkpoint_json_data[
"next_checkpoint_indicator_date"
] = indicator.get("properties", {}).get("lastUpdatedTimeUtc")
self.sentinel_checkpoint_json_data["new_execution_flag"] = "False"
try:
self.sentinel_checkpoint_state.post(
json.dumps(self.sentinel_checkpoint_json_data)
)
except Exception as error:
applogger.error(
"{}(method={}) : {} Error: {} : "
"Error occurred while posting checkpoint data to "
"sentinel defender checkpoint state manager.".format(
consts.LOGS_STARTS_WITH,
__method_name,
consts.SENTINEL_TO_DEFENDER,
error,
)
)
raise error
self.next_checkpoint_indicator_date = (
self.sentinel_checkpoint_json_data.get(
"next_checkpoint_indicator_date"
)
)
applogger.debug(
"{}(method={}) : {}: "
"Next execution checkpoint date stored : {}".format(
consts.LOGS_STARTS_WITH,
__method_name,
consts.SENTINEL_TO_DEFENDER,
self.next_checkpoint_indicator_date,
)
)
self.new_execution_flag = self.sentinel_checkpoint_json_data.get(
"new_execution_flag"
)
# Completed current execution.
if (
current_indicator_updated_date
<= self.current_checkpoint_indicator_date
):
self.complete_current_execution(__method_name)
return True
# Creating indicator into defender.
try:
# bool to find cofense indicator.
cofense_indicator = self.check_cofense_indicator(indicator)
# bool to check threat level value and indicator confidence.
threat_level_filter = self.check_threat_level_filter(indicator)
# Create indicator data for defender.
indicator_data = self.create_indicator_data(indicator)
if (
cofense_indicator
and threat_level_filter
and indicator_data.get("indicatorValue", None)
and indicator_data.get("indicatorType", None)
and indicator_data.get("title", None)
and indicator_data.get("action", None)
and indicator_data.get("description", None)
):
self.create_indicator(
defender_object=defender_object,
indicator_data=indicator_data,
)
# Update the total executed indicators count.
self.indicator_count += 1
except CofenseException as error:
applogger.error(
"{}(method={}) : {} : Error occurred while posting indicator into MS Defender. "
"Sentinel indicator name : {}. Error : {}".format(
consts.LOGS_STARTS_WITH,
__method_name,
consts.SENTINEL_TO_DEFENDER,
indicator_data.get("title"),
error,
)
)
self.failed_indicator_count += 1
self.failed_indicator_list.append(indicator_data.get("title"))
# Completed current execution.
return True
except CofenseException as error:
applogger.error(
"{}(method={}) : {} : Error occurred while posting indicator into MS Defender : {}".format(
consts.LOGS_STARTS_WITH,
__method_name,
consts.SENTINEL_TO_DEFENDER,
error,
)
)
raise CofenseException()