in cli/aws_orbit/remote_files/cdk/team_builders/iam.py [0:0]
def process_policies(policy_names, account_id) -> Tuple[List[Any], List[Any]]: # type: ignore
iam_client = boto3_client("iam")
aws_managed_user_policies = []
orbit_custom_policies = []
def _check_policy_exists(arn: str) -> bool:
try:
iam_client.get_policy(PolicyArn=arn)
return True
except iam_client.exceptions.NoSuchEntityException:
return False
def _check_for_orbit_tag(policyArn: str) -> bool:
_logger.info(f"Fetching policy tags for {policyArn}")
retries = 3
while retries > 0:
try:
response = iam_client.list_policy_tags(PolicyArn=policyArn)
break
except iam_client.exceptions.ClientError as ce:
if ce.response["Error"]["Code"] == "Throttling" and ce.response["Error"]["Message"] == "Rate exceeded":
_logger.warning(ce)
_logger.info(f"Retrying. Retry count: {4 - retries}")
time.sleep(60)
retries -= 1
if retries == 0:
raise Exception(ce)
for tag in response["Tags"]:
key, value = tag["Key"], tag["Value"]
if "orbit-available" in key and "true" in value:
return True
return False
for policy_name in policy_names:
aws_managed_arn = f"arn:aws:iam::aws:policy/{policy_name}"
customer_arn = f"arn:aws:iam::{account_id}:policy/{policy_name}"
_logger.info(f"Checking policy name {policy_name}")
if _check_policy_exists(aws_managed_arn):
aws_managed_user_policies.append(policy_name)
_logger.info(f"Found {policy_name} to be AWS-Managed..adding to build")
elif _check_policy_exists(customer_arn):
_logger.info(f"Found {policy_name} to be Customer-Managed...checking name or tag")
if _check_for_orbit_tag(customer_arn) or "orbit" in policy_name:
orbit_custom_policies.append(policy_name)
_logger.info(f"Found {policy_name} to be Customer-Managed and properly tagged/named...adding to build")
else:
_logger.info(
f"Found {policy_name} to be Customer-Managed BUT not tagged or properly named... NOT added to build"
)
else:
_logger.info(f"Found {policy_name} not to exist...NOT added to build")
return aws_managed_user_policies, orbit_custom_policies