in src/get_org_dependent_resources.py [0:0]
def get_org_level_resources(region: str, account: dict, session, _org_id, target_org_id) -> dict:
status = Constant.StateMachineStates.COMPLETED
analyzer_client = session.client('accessanalyzer', region_name=region)
logger.debug(f"analyzer for region: {region}")
analysers = analyzer_client.list_analyzers()['analyzers']
if not analysers:
msg = f"No Analyzer found in region {region}"
log_error(logger=logger, account_id=account['AccountId'], company_name=account['CompanyName'],
error_type=Constant.ErrorType.OLPE, msg=msg,
notify=True, slack_handle=account['SlackHandle'])
update_item(Constant.DB_TABLE, account)
return Constant.StateMachineStates.WAIT
arn = analysers[0]['arn']
# check for PrincipalOrgID
org_level_resource_list = []
org_issues = [resource['resource'] for resource in analyzer_client.list_findings(
analyzerArn=arn,
filter={
'condition.aws:PrincipalOrgID': {
'contains': [
target_org_id]}
})['findings'] if resource['status'] == 'ACTIVE']
if org_issues:
org_issues_resolved = [resource['resource'] for resource in analyzer_client.list_findings(
analyzerArn=arn,
filter={
'condition.aws:PrincipalOrgID': {
'contains': [_org_id]}
})['findings'] if resource['status'] == 'ACTIVE']
org_level_resource_list = list(set(org_issues).symmetric_difference(set(org_issues_resolved)))
# check for PrincipalOrgID
org_path_level_resource_list = []
org_path_issue = [resource['resource'] for resource in analyzer_client.list_findings(
analyzerArn=arn,
filter=
{
'condition.aws:PrincipalOrgPaths': {
'contains': [
target_org_id]}
})['findings'] if resource['status'] == 'ACTIVE']
if org_path_issue:
org_path_resolved = [resource['resource'] for resource in analyzer_client.list_findings(
analyzerArn=arn,
filter={
'condition.aws:PrincipalOrgPaths': {
'contains': [_org_id]}
})['findings'] if resource['status'] == 'ACTIVE']
org_path_level_resource_list = list(set(org_path_issue).symmetric_difference(set(org_path_resolved)))
org_level_permissions = org_level_resource_list + org_path_level_resource_list
if org_level_permissions:
status = Constant.StateMachineStates.WAIT
for resource in org_level_permissions:
msg = f"Resource {resource} is using organization level permission to access resource"
log_error(logger=logger, account_id=account['AccountId'], company_name=account['CompanyName'],
error_type=Constant.ErrorType.OLPE, msg=msg,
notify=True, slack_handle=account['SlackHandle'])
account["OrgLevelPermissions"] = org_level_permissions
update_item(Constant.DB_TABLE, account)
return status