in Solutions/BitSight/Data Connectors/BitSightDataConnector/FindingsDetails/bitsight_findings.py [0:0]
def get_findings_details(self, company_name, company_guid):
"""Post the data of findings details.
Args:
company_name (str): company name
company_guid (str): company guid
"""
try:
data_to_post = None
risk_categories = [
{"risk_category": "Diligence"},
{"risk_category": "Compromised Systems"},
{"risk_category": "User Behavior"},
]
last_data = self.checkpoint_obj.get_last_data(self.findings_state, table_name=FINDINGS_TABLE_NAME, checkpoint_query=FINDING_DETAILS_QUERY)
findings_url = self.base_url + self.findings_endpoint_path.format(
company_guid
)
for params in risk_categories:
if int(time.time()) >= self.start_time + 540:
applogger.info(
"BitSight: 9:00 mins executed hence breaking. In next iteration, start fetching after {}".format(
company_name
)
)
break
risk = params["risk_category"]
checkpoint_key = "{}_{}".format(risk, company_guid)
last_date = self.checkpoint_obj.get_endpoint_last_data(
last_data, "findings_details'", checkpoint_key
)
params["sort"] = "last_seen"
params["limit"] = self.limit
params["expand"] = "attributed_companies"
params["offset"] = 0
params["last_seen_gte"] = last_date if last_date is not None else ""
results = self.get_bitsight_data(findings_url, params)
if not results or len(results.get("results")) == 0:
applogger.info(
'BitSight: No new findings found for "{}" ({})'.format(
risk, company_name
)
)
continue
results["id"] = risk
results["Company_name"] = company_name
next_link = results.get("links").get("next")
index = len(results.get("results")) - 1
data_to_post = results.get("results")[index].get("last_seen")
formatted_data = self.prepare_data_to_post(results, company_name)
data_to_post = datetime.datetime.strptime(data_to_post, "%Y-%m-%d")
data_to_post += datetime.timedelta(days=1)
self.send_data_to_sentinel(
formatted_data,
FINDINGS_TABLE_NAME,
company_name,
"findings details",
)
self.checkpoint_obj.save_checkpoint(
self.findings_state,
last_data,
"findings_details",
"{}_{}".format(FINDINGS_TABLE_NAME, "Checkpoint"),
checkpoint_key,
str(data_to_post.date()),
)
c_data = {}
params["offset"] += self.limit
page = 0
while next_link:
if int(time.time()) >= self.start_time + 540:
applogger.info(
"BitSight: 9:00 mins executed hence breaking. In next iteration, start fetching after {}".format(
company_name
)
)
break
page += 1
applogger.info(
"BitSight: Findings: Page {} of {} ({})".format(
page, company_name, risk
)
)
c_data["next1"] = self.get_bitsight_data(findings_url, params)
next_link = c_data["next1"].get("links").get("next")
length_results = len(c_data.get("next1").get("results"))
if length_results == 0:
applogger.info(
'BitSight: No new findings found for {} on page {} ({})'.format(
company_name, page, risk
)
)
break
applogger.info(
"BitSight: Got {} findings for {} on page {}".format(
length_results, company_name, page
)
)
c_data["next1"]["id"] = risk
c_data["next1"]["Company_name"] = company_name
index = length_results - 1
data_to_post = (
c_data["next1"].get("results")[index].get("last_seen")
)
data_to_post = datetime.datetime.strptime(data_to_post, "%Y-%m-%d")
data_to_post += datetime.timedelta(days=1)
formatted_data = self.prepare_data_to_post(
c_data["next1"], company_name
)
self.send_data_to_sentinel(
formatted_data,
FINDINGS_TABLE_NAME,
company_name,
"findings details",
)
self.checkpoint_obj.save_checkpoint(
self.findings_state,
last_data,
"findings_details",
"{}_{}".format(FINDINGS_TABLE_NAME, "Checkpoint"),
checkpoint_key,
str(data_to_post.date()),
)
params["offset"] += self.limit
except BitSightException:
raise BitSightException()
except Exception as err:
applogger.error("BitSight: GET FINDINGS DETAILS: {}".format(err))
raise BitSightException()