in Solutions/CofenseTriage/Data Connectors/CofenseTriageDataConnector/NonCofenseBasedIndicatorCreatorToCofense/sentinel.py [0:0]
def get_sentinel_checkpoint_data(self):
"""To get the sentinel checkpoint data from state manager."""
__method_name = inspect.currentframe().f_code.co_name
# Initializing state manager for sentinel and cofense indicator id table.
self.sentinel_checkpoint_state = StateManager(
connection_string=CONNECTION_STRING,
file_path="sentinel_checkpoint",
)
try:
sentinel_checkpoint_data = self.sentinel_checkpoint_state.get(
SENTINEL_TO_COFENSE
)
except Exception as error:
applogger.error(
"{}(method={}) : {} Error: {} : "
"Error occurred while getting sentinel checkpoint data from sentinel checkpoint state manager.".format(
LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error
)
)
raise error
# If checkpoint data is not none, then fetch the checkpoint fields.
if sentinel_checkpoint_data is not None:
self.sentinel_checkpoint_json_data = json.loads(sentinel_checkpoint_data)
self.new_execution_flag = self.sentinel_checkpoint_json_data.get(
"new_execution_flag"
)
self.current_checkpoint_indicator_date = (
self.sentinel_checkpoint_json_data.get(
"current_checkpoint_indicator_date"
)
)
self.next_checkpoint_indicator_date = (
self.sentinel_checkpoint_json_data.get("next_checkpoint_indicator_date")
)
if self.new_execution_flag == "False":
self.sentinel_skiptoken = self.sentinel_checkpoint_json_data.get(
"last_execution_skip_token"
)
applogger.debug(
"{}(method={}) : {}: "
"Last checkpoint skip token is : {}".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
self.sentinel_skiptoken,
)
)
else:
self.sentinel_skiptoken = ""
applogger.debug(
"{}(method={}) : {} : "
"Sentinel checkpoint state manager data fetch successfully.".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
)
)
# Convert string date to datetime object.
self.current_checkpoint_indicator_date = self.convert_datetime_format(
self.sentinel_checkpoint_json_data.get(
"current_checkpoint_indicator_date"
)
)
applogger.info(
"{}(method={}) : {} : "
"Last checkpoint date is : {}".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
self.current_checkpoint_indicator_date,
)
)
else:
# If checkpoint is none then initialize from starting.
# If checkpoint file is not found then, it is first run.
self.sentinel_checkpoint_json_data = {}
self.sentinel_checkpoint_json_data["new_execution_flag"] = "True"
self.sentinel_checkpoint_json_data["last_execution_skip_token"] = ""
# In first run we need to fetch last 15 days of indicators from sentinel TI.
self.sentinel_checkpoint_json_data["current_checkpoint_indicator_date"] = (
datetime.utcnow() - timedelta(days=15)
).isoformat()
self.sentinel_checkpoint_json_data["next_checkpoint_indicator_date"] = ""
try:
self.sentinel_checkpoint_state.post(
json.dumps(self.sentinel_checkpoint_json_data)
)
except Exception as error:
applogger.error(
"{}(method={}) : {} Error: {} : "
"Error occurred while posting checkpoint data to sentinel checkpoint state manager.".format(
LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error
)
)
raise error
self.new_execution_flag = self.sentinel_checkpoint_json_data.get(
"new_execution_flag"
)
self.sentinel_skiptoken = self.sentinel_checkpoint_json_data.get(
"last_execution_skip_token"
)
self.next_checkpoint_indicator_date = (
self.sentinel_checkpoint_json_data.get("next_checkpoint_indicator_date")
)
applogger.info(
"{}(method={}) : {} : "
"Sentinel checkpoint state manager data not found. Creating it.".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
)
)
# Convert string date to datetime object.
self.current_checkpoint_indicator_date = self.convert_datetime_format(
self.sentinel_checkpoint_json_data.get(
"current_checkpoint_indicator_date"
)
)
applogger.info(
"{}(method={}) : {} : "
"Getting indicators data of last 15 days from : {}".format(
LOGS_STARTS_WITH,
__method_name,
SENTINEL_TO_COFENSE,
self.current_checkpoint_indicator_date,
)
)