def get_sentinel_checkpoint_data()

in Solutions/CofenseTriage/Data Connectors/CofenseTriageDataConnector/IndicatorCreatorToDefender/sentinel.py [0:0]


    def get_sentinel_checkpoint_data(self):
        """To get the sentinel defender checkpoint data from state manager."""
        __method_name = inspect.currentframe().f_code.co_name
        # Initializing state manager for sentinel and defender indicator id table.
        self.sentinel_checkpoint_state = StateManager(
            connection_string=consts.CONNECTION_STRING,
            file_path="sentinel_defender_checkpoint",
        )
        try:
            sentinel_checkpoint_data = self.sentinel_checkpoint_state.get(
                consts.SENTINEL_TO_DEFENDER
            )
        except Exception as error:
            applogger.error(
                "{}(method={}) : {} Error: {} : "
                "Error occurred while getting sentinel defender checkpoint data from "
                "sentinel defender checkpoint state manager.".format(
                    consts.LOGS_STARTS_WITH,
                    __method_name,
                    consts.SENTINEL_TO_DEFENDER,
                    error,
                )
            )
            raise error

        # If checkpoint data is not none, then fetch the checkpoint fields.
        if sentinel_checkpoint_data is not None:
            self.sentinel_checkpoint_json_data = json.loads(sentinel_checkpoint_data)
            self.new_execution_flag = self.sentinel_checkpoint_json_data.get(
                "new_execution_flag"
            )
            self.current_checkpoint_indicator_date = (
                self.sentinel_checkpoint_json_data.get(
                    "current_checkpoint_indicator_date"
                )
            )
            self.next_checkpoint_indicator_date = (
                self.sentinel_checkpoint_json_data.get("next_checkpoint_indicator_date")
            )
            if self.new_execution_flag == "False":
                self.sentinel_skiptoken = self.sentinel_checkpoint_json_data.get(
                    "last_execution_skip_token"
                )
                applogger.debug(
                    "{}(method={}) : {}: "
                    "Last checkpoint skip token is : {}".format(
                        consts.LOGS_STARTS_WITH,
                        __method_name,
                        consts.SENTINEL_TO_DEFENDER,
                        self.sentinel_skiptoken,
                    )
                )
            else:
                self.sentinel_skiptoken = ""
            applogger.debug(
                "{}(method={}) : {} : "
                "Sentinel defender checkpoint state manager data fetch successfully.".format(
                    consts.LOGS_STARTS_WITH,
                    __method_name,
                    consts.SENTINEL_TO_DEFENDER,
                )
            )

            # Convert string date to datetime object.
            self.current_checkpoint_indicator_date = self.convert_datetime_format(
                self.sentinel_checkpoint_json_data.get(
                    "current_checkpoint_indicator_date"
                )
            )
            applogger.info(
                "{}(method={}) : {} : "
                "Last checkpoint date is : {}".format(
                    consts.LOGS_STARTS_WITH,
                    __method_name,
                    consts.SENTINEL_TO_DEFENDER,
                    self.current_checkpoint_indicator_date,
                )
            )

        else:
            # If checkpoint is none then initialize from starting.
            # If checkpoint file is not found then, it is first run.
            self.sentinel_checkpoint_json_data = {}
            self.sentinel_checkpoint_json_data["new_execution_flag"] = "True"
            self.sentinel_checkpoint_json_data["last_execution_skip_token"] = ""
            # In first run we need to fetch last 15 days of indicators from sentinel TI.
            self.sentinel_checkpoint_json_data["current_checkpoint_indicator_date"] = (
                datetime.utcnow() - timedelta(days=15)
            ).isoformat()
            self.sentinel_checkpoint_json_data["next_checkpoint_indicator_date"] = ""
            try:
                self.sentinel_checkpoint_state.post(
                    json.dumps(self.sentinel_checkpoint_json_data)
                )
            except Exception as error:
                applogger.error(
                    "{}(method={}) : {} Error: {} : "
                    "Error occurred while posting checkpoint data to "
                    "sentinel defender checkpoint state manager.".format(
                        consts.LOGS_STARTS_WITH,
                        __method_name,
                        consts.SENTINEL_TO_DEFENDER,
                        error,
                    )
                )
                raise error
            self.new_execution_flag = self.sentinel_checkpoint_json_data.get(
                "new_execution_flag"
            )
            self.sentinel_skiptoken = self.sentinel_checkpoint_json_data.get(
                "last_execution_skip_token"
            )
            self.next_checkpoint_indicator_date = (
                self.sentinel_checkpoint_json_data.get("next_checkpoint_indicator_date")
            )

            applogger.info(
                "{}(method={}) : {} : "
                "Sentinel defender checkpoint state manager data not found. Creating it.".format(
                    consts.LOGS_STARTS_WITH,
                    __method_name,
                    consts.SENTINEL_TO_DEFENDER,
                )
            )

            # Convert string date to datetime object.
            self.current_checkpoint_indicator_date = self.convert_datetime_format(
                self.sentinel_checkpoint_json_data.get(
                    "current_checkpoint_indicator_date"
                )
            )
            applogger.info(
                "{}(method={}) : {} : "
                "Getting indicators data of last 15 days from : {}".format(
                    consts.LOGS_STARTS_WITH,
                    __method_name,
                    consts.SENTINEL_TO_DEFENDER,
                    self.current_checkpoint_indicator_date,
                )
            )