in Solutions/Armis/Data Connectors/ArmisActivities/ArmisActivitySentinelConnector/__init__.py [0:0]
def _get_activity_data(self, armis_link_suffix, parameter):
"""_get_activity_data is used to get data using api.
Args:
self: Armis object.
armis_link_suffix (String): will be containing the other part of link.
parameter (json): will contain the json data to sends to parameter to get data from REST API.
"""
try:
response = requests.get(
(self._link + armis_link_suffix), params=parameter, headers=self._header
)
if response.status_code == 200:
logging.info("API connected successfully with Armis to fetch the data.")
results = json.loads(response.text)
if(results["data"]["count"] == 0):
raise ArmisDataNotFoundException(
"Armis Activity Connector: Data not found."
)
if (
"data" in results
and "results" in results["data"]
and "total" in results["data"]
and "count" in results["data"]
and "next" in results["data"]
):
total_data_length = results["data"]["total"]
count_per_frame_data = results["data"]["count"]
data = results["data"]["results"]
for i in data:
i["armis_activity_time"] = i["time"]
body = json.dumps(data, indent=2)
logging.info(
"Armis Activity Connector: From %s length 1000",
self._data_activity_from,
)
self._data_activity_from = results["data"]["next"]
current_time = data[-1]["time"][:19]
if len(current_time) != 19:
if len(current_time) == 10:
current_time += "T00:00:00"
logging.info("Armis Activity Connector: 'T:00:00:00' added as only date is available.")
else:
splited_time = current_time.split('T')
if len(splited_time[1]) == 5:
splited_time[1] += ":00"
logging.info("Armis Activity Connector: ':00' added as seconds not available.")
elif len(splited_time[1]) == 2:
splited_time[1] += ":00:00"
logging.info("Armis Activity Connector: ':00:00' added as only hour is available.")
current_time = "T".join(splited_time)
return body, current_time, total_data_length, count_per_frame_data
else:
raise ArmisException(
"Armis Activity Connector: There are no proper keys in data."
)
elif response.status_code == 400:
raise ArmisException(HTTP_ERRORS[400])
elif response.status_code == 401 and self._retry_activity_token <= 3:
logging.info(
"Armis Activity Connector: Retry number: {}".format(
str(self._retry_activity_token)
)
)
self._retry_activity_token += 1
logging.error(HTTP_ERRORS[401])
logging.info("Armis Activity Connector: Generating access token again.")
self._get_access_token_activity("/access_token/")
return self._get_activity_data(armis_link_suffix, parameter)
else:
raise ArmisException(
"Armis Activity Connector: Error while fetching data. status Code:{} error message:{}.".format(
response.status_code, response.text
)
)
except requests.exceptions.ConnectionError:
logging.error(ERROR_MESSAGES["HOST_CONNECTION_ERROR"])
raise ArmisException(
"Armis Activity Connector:Connection error while getting data from activity api."
)
except requests.exceptions.RequestException as request_err:
logging.error(request_err)
raise ArmisException(
"Armis Activity Connector:Request error while getting data from activity api."
)
except ArmisException as err:
logging.error(err)
raise ArmisException(
"Armis Activity Connector: Error while getting data from activity api."
)
except ArmisDataNotFoundException as err:
logging.info(err)
raise ArmisDataNotFoundException()