in Solutions/SecurityScorecard Cybersecurity Ratings/Data Connectors/SecurityScorecardIssue/SecurityScorecardIssueSentinelConnector/__init__.py [0:0]
def main(mytimer: func.TimerRequest) -> None:
"""Start the execution.
Args:
mytimer (func.TimerRequest): This variable will be used to trigger the function.
"""
utc_now_time = datetime.datetime.utcnow()
utc_timestamp = utc_now_time.replace(tzinfo=datetime.timezone.utc).isoformat()
logging.info(
"SecurityScorecard Connector: Python timer trigger function ran at %s",
utc_timestamp,
)
portfolio_ids = []
portfolios = []
try:
# remove whitespace from environment variables
secret_key = SECURITY_SCORECARD_SECRET_KEY.strip()
base_url = BASE_URL.strip()
domain = DOMAIN.strip()
level_issue_change = LEVEL_ISSUE_CHANGE.strip()
portfolio_ids_str = PORTFOLIO_IDS_STR.strip()
# Validation of Secret Key, URL and Domain
if not secret_key:
raise APIKeyNotProvidedError()
if not domain:
raise DomainNotProvidedError()
if not base_url:
raise BaseURLNotProvidedError()
# Creating list of portfolioIds and getting data of portfolios
if portfolio_ids_str:
portfolio_ids = [
portfolio_id.strip()
for portfolio_id in portfolio_ids_str.split(",")
if portfolio_id.strip()
]
portfolios = list(
filter(
lambda portfolio: portfolio_ids.__contains__(portfolio.get("id")),
ScoreCardHelperClass(base_url, secret_key).get_portfolios(),
)
)
logging.info(
"SecurityScorecard Connector: Total {} portfolio ids are provided.".format(
len(portfolio_ids)
)
)
if not portfolios:
logging.info(
"SecurityScorecard Connector: Portfolio ids not provided or portfolio id is not available in the API."
)
# Object to save the data of checkpoints
state_manager_object = StateManager(
connection_string=connection_string, file_path="funcstimestamp"
)
state_manager_data = state_manager_object.get()
state_manager_json = json.loads(state_manager_data)
to_date = utc_now_time.date()
from_date_sm = str(to_date - datetime.timedelta(days=1))
from_date = "%s|%s" % (from_date_sm, from_date_sm)
new_statemanager_data = {}
new_statemanager_data = {
domain: state_manager_json.get(domain) or from_date
}
company_pname_pids = {}
# Creating statemanager data with latest checkpoints
for portfolio in portfolios:
portfolio_id = portfolio.get("id")
portfolio_name = portfolio.get("name")
companies = ScoreCardHelperClass(base_url, secret_key).get_portfolio_data(
portfolio_id
)
porfolio_companies = {
company.get("domain"): state_manager_json.get(portfolio_id, {})
.get("companies", {})
.get(company.get("domain"))
or from_date
for company in companies
}
new_statemanager_data[portfolio_id] = {
"portfolio_name": portfolio_name,
"companies": porfolio_companies,
}
for company, state_from_date in porfolio_companies.items():
from_date_portfolio_companies = state_from_date.split('|')[0]
company_pname_pids[from_date_portfolio_companies] = (
company_pname_pids.get(from_date_portfolio_companies) or []
)
company_pname_pids[from_date_portfolio_companies].append(
{
"company": company,
"portfolio_id": portfolio_id,
"portfolio_name": portfolio_name,
"check_date": state_from_date.split('|')[1],
}
)
state_manager_object.post(json.dumps(new_statemanager_data))
# Sorting companies by date
company_to_fetch_data = dict(OrderedDict(sorted(company_pname_pids.items())))
from_date = (new_statemanager_data[domain]).split('|')[0]
check_date = (new_statemanager_data[domain]).split('|')[1]
config = {
"level_issue_change": level_issue_change,
"to_date": str(to_date),
"from_date": from_date,
}
if check_date != str(to_date):
# Fetching data of domain Issue and writing it into the Microsoft Sentinel
scorecard_obj = ScoreCardHelperClass(base_url, secret_key, domain)
writer_obj = CompanyWriter(scorecard_obj, state_manager_object)
logging.info(
"SecurityScorecard Connector: Writting Issue data into the Microsoft Sentinel!"
)
writer_obj.write_issues(**config)
logging.info(
"SecurityScorecard Connector: Issue data successfully ingested!"
)
else:
logging.info("SecurityScorecard Connector: skipped domain company.")
# Fetching data of portfolio companies and writing it into the Microsoft Sentinel
for from_date_company in company_to_fetch_data.keys():
date_wise_company_list = company_to_fetch_data.get(from_date_company) or []
for company_data in date_wise_company_list:
company_name = company_data.get("company")
company_pid = company_data.get("portfolio_id")
company_pname = company_data.get("portfolio_name")
if company_data.get('check_date') == str(to_date):
logging.info(
"SecurityScorecard Connector: skipped the company : {}".format(
company_name
)
)
continue
config.update(
{
"from_date": from_date_company,
"portfolioId": company_pid,
"portfolioName": company_pname,
}
)
logging.info(
"SecurityScorecard Connector: configuration for {} company is {}".format(
company_name, config
)
)
company_obj = ScoreCardHelperClass(base_url, secret_key, company_name)
writer_obj = CompanyWriter(company_obj, state_manager_object)
logging.info(
"SecurityScorecard Connector: Writting Issue data into the Microsoft Sentinel!"
)
writer_obj.write_issues(company_pid, **config)
logging.info(
"SecurityScorecard Connector: Issue data successfully ingested!"
)
except APIKeyNotProvidedError:
raise APIKeyNotProvidedError(
"SecurityScorecard Connector: Secret Key is not Provided."
)
except DomainNotProvidedError:
raise DomainNotProvidedError(
"SecurityScorecard Connector: Domain is not Provided."
)
except BaseURLNotProvidedError:
raise BaseURLNotProvidedError(
"SecurityScorecard Connector: Base URL is not Provided."
)
except SSIssueException as err:
raise SSIssueException("SecurityScorecard Connector: {}".format(err))
utc_timestamp_final = (
datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
)
logging.info(
"SecurityScorecard Connector: execution completed for alerts at %s.",
utc_timestamp_final,
)
if mytimer.past_due:
logging.info("SecurityScorecard Connector: The timer is past due!")