in Solutions/CofenseTriage/Data Connectors/CofenseTriageDataConnector/NonCofenseBasedIndicatorCreatorToCofense/sentinel.py [0:0]
def post_indicators(self, sentinel_json_indicator_list, cofense_object):
"""To post and update the indicators into Cofense Triage."""
try:
__method_name = inspect.currentframe().f_code.co_name
applogger.debug(
"{}(method={}) : {} : "
"Posting and updating indicators into Cofense Triage.".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
)
)
# posting and updating indicators
for indicator in sentinel_json_indicator_list:
current_indicator_updated_date = self.convert_sentinel_datetime_format(
indicator=indicator
)
if self.new_execution_flag == "True":
if (
current_indicator_updated_date
<= self.current_checkpoint_indicator_date
):
# exit the class. execution is completed.
return False
self.sentinel_checkpoint_json_data[
"next_checkpoint_indicator_date"
] = indicator.get("properties", {}).get("lastUpdatedTimeUtc")
self.sentinel_checkpoint_json_data["new_execution_flag"] = "False"
try:
self.sentinel_checkpoint_state.post(
json.dumps(self.sentinel_checkpoint_json_data)
)
except Exception as error:
applogger.error(
"{}(method={}) : {} Error: {} : "
"Error occurred while posting sentinel checkpoint data to "
"sentinel checkpoint state manager.".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
error,
)
)
raise error
self.next_checkpoint_indicator_date = (
self.sentinel_checkpoint_json_data.get(
"next_checkpoint_indicator_date"
)
)
applogger.debug(
"{}(method={}) : {}: "
"Next execution checkpoint date stored : {}".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
self.next_checkpoint_indicator_date,
)
)
self.new_execution_flag = self.sentinel_checkpoint_json_data.get(
"new_execution_flag"
)
# Completed current execution.
if (
current_indicator_updated_date
<= self.current_checkpoint_indicator_date
):
self.complete_current_execution(__method_name)
return True
# getting indicator source and mapping sentinel source to cofense.
sentinel_indicator_source = indicator.get("properties", {}).get(
"source", None
)
indicator_source = (
self.sentinel_cofense_mapping_obj.get_indicator_source(
sentinel_indicator_source
)
)
# convert sentinel indicator type to cofense indicator type.
indicator_threat_type = (
self.sentinel_cofense_mapping_obj.get_cofense_threat_type(indicator)
)
# Convert confidence integer to Level of threat (Malicious, Suspicious, or Benign).
confidence = indicator.get("properties", {}).get("confidence", "")
threat_level = sentinel_to_cofense_threat_level_mapping(
confidence=confidence, azure_function_name=SENTINEL_TO_COFENSE
)
# get the threat value from the sentinel indicator data.
sentinel_pattern = indicator.get("properties", {}).get("pattern", None)
threat_value = self.sentinel_cofense_mapping_obj.get_threat_value(
sentinel_pattern
)
# if indicator source and sentinel indicator pattern type is not as need.
# To do: remove "is not None".
if (
indicator_source is not None
and indicator_threat_type is not None
and threat_level is not None
and threat_value is not None
):
# if the sentinel indicator name is connected with a cofense id then update,
# the indicator data into cofense triage.
sentinel_indicator_name = indicator.get("name", "")
if self.sentinel_cofense_id_table.get(sentinel_indicator_name):
# get the cofense id from the id table.
cofense_id = self.sentinel_cofense_id_table.get(
indicator.get("name", "")
)
# Update the indicator.
self.update_indicator(
cofense_object=cofense_object,
cofense_id=cofense_id,
threat_level=threat_level,
)
# or else create a new indicator into cofense triage and relate the sentinel and cofense id,
# into id table.
else:
# Create indicator into cofense.
self.create_indicator(
cofense_object=cofense_object,
indicator_threat_type=indicator_threat_type,
indicator_source=indicator_source,
threat_level=threat_level,
threat_value=threat_value,
sentinel_indicator_name=indicator.get("name"),
)
# Update the total executed indicators count.
self.indicator_count += 1
# Completed current execution.
return True
except CofenseException as error:
applogger.error(
"{}(method={}) : {} : Error occurred while posting indicator in cofense : {}".format(
LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error
)
)
raise CofenseException()