in Solutions/Armis/Data Connectors/ArmisAlerts/ArmisAlertSentinelConnector/__init__.py [0:0]
def _get_alert_data(self, armis_link_suffix, parameter):
"""_get_alert_data is used to get data using api.
Args:
self: Armis object.
armis_link_suffix (String): will be containing the other part of link.
parameter (json): will contain the json data to sends to parameter to get data from REST API.
"""
try:
response = requests.get(
(self._link + armis_link_suffix), params=parameter, headers=self._header
)
if response.status_code == 200:
logging.info("API connected successfully with Armis to fetch the data.")
results = json.loads(response.text)
if(results["data"]["count"] == 0):
raise ArmisDataNotFoundException(
"Armis Alert Connector: Data not found."
)
if (
"data" in results
and "results" in results["data"]
and "total" in results["data"]
and "count" in results["data"]
and "next" in results["data"]
):
total_data_length = results["data"]["total"]
count_per_frame_data = results["data"]["count"]
data = results["data"]["results"]
for i in data:
i["armis_alert_time"] = i["time"]
body = json.dumps(data, indent=2)
logging.info(
"Armis Alert Connector: From %s length 1000",
self._data_alert_from,
)
self._data_alert_from = results["data"]["next"]
current_time = data[-1]["time"][:19]
if len(current_time) != 19:
if len(current_time) == 10:
current_time += "T00:00:00"
logging.info("Armis Alert Connector: 'T:00:00:00' added as only date is available.")
else:
splited_time = current_time.split('T')
if len(splited_time[1]) == 5:
splited_time[1] += ":00"
logging.info("Armis Alert Connector: ':00' added as seconds not available.")
elif len(splited_time[1]) == 2:
splited_time[1] += ":00:00"
logging.info("Armis Alert Connector: ':00:00' added as only hour is available.")
current_time = "T".join(splited_time)
return body, current_time, total_data_length, count_per_frame_data
else:
raise ArmisException(
"Armis Alert Connector: There are no proper keys in data."
)
elif response.status_code == 400:
raise ArmisException(HTTP_ERRORS[400])
elif response.status_code == 401 and self._retry_alert_token <= 3:
logging.info(
"Armis Alert Connector: Retry number: {}".format(
str(self._retry_alert_token)
)
)
self._retry_alert_token += 1
logging.error(HTTP_ERRORS[401])
logging.info("Armis Alert Connector: Generating access token again!")
self._get_access_token_alert("/access_token/")
return self._get_alert_data(armis_link_suffix, parameter)
else:
raise ArmisException(
"Armis Alert Connector: Error while fetching data. status code:{} error message:{}.".format(
response.status_code, response.text
)
)
except requests.exceptions.ConnectionError:
logging.error(ERROR_MESSAGES["HOST_CONNECTION_ERROR"])
raise ArmisException(
"Armis Alert Connector:Connection error while getting data from alert api."
)
except requests.exceptions.RequestException as request_err:
logging.error(request_err)
raise ArmisException(
"Armis Alert Connector:Request error while getting data from alert api."
)
except ArmisException as err:
logging.error(err)
raise ArmisException(
"Armis Alert Connector: Error while getting data from alert api."
)
except ArmisDataNotFoundException as err:
logging.info(err)
raise ArmisDataNotFoundException()