in gcpdiag/queries/iam.py [0:0]
def _expand_policy(self, resource_data: Dict[str, Any]) -> Dict[str, Any]:
"""Groups `getIamPolicy` bindings by member
API response contains a list of bindings of a role to members:
{
"bindings": [
{
"role": "roles/resourcemanager.organizationAdmin",
"members": [
"user:mike@example.com",
"serviceAccount:my-project-id@appspot.gserviceaccount.com"
]
},
...
}
This method will convert those bindings into the following structure:
{
"user:mike@example.com": {
"roles": { "roles/resourcemanager.organizationAdmin" },
},
"serviceAccount:my-project-id@appspot.gserviceaccount.com": {
"roles": { "roles/resourcemanager.organizationAdmin" },
},
}
"""
policy_roles = set()
policy_by_member: Dict[str, Any] = defaultdict(dict)
# Empty lists are omitted in GCP API responses
for binding in resource_data.get('bindings', []):
if 'condition' in binding:
logging.warning(
'IAM binding contains a condition, which would be ignored: %s',
binding)
# IAM binding should always have a role and at least one member
policy_roles.add(binding['role'])
for member in binding['members']:
member_policy = policy_by_member[member]
member_policy.setdefault('roles', set()).add(binding['role'])
# Populate cache for IAM roles used in the policy
# Unlike `has_role_permissions` this part will be executed inside
# `prefetch_rule` and will benefit from multi-threading execution
for role in policy_roles:
# Ignore all errors - there could be no rules involving this role
try:
_get_iam_role(role, self.project_id)
except (RoleNotFoundError, utils.GcpApiError) as err:
# Ignore roles if cannot retrieve a role
# For example, due to lack of permissions
if isinstance(err, utils.GcpApiError):
logging.error('API failure getting IAM roles: %s', err)
raise utils.GcpApiError(err) from err
elif isinstance(err, RoleNotFoundError):
logging.warning("Unable to get IAM role '%s', ignoring: %s", role,
err)
# Populate cache for service accounts used in the policy
# Note: not implemented as a generator expression because
# it looks ugly without assignment expressions, available
# only with Python >= 3.8.
sa_emails = set()
for member in policy_by_member.keys():
# Note: not matching / makes sure that we don't match for example fleet
# workload identities:
# https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity
m = re.match(r'serviceAccount:([^/]+)$', member)
if m:
sa_emails.add(m.group(1))
_batch_fetch_service_accounts(list(sa_emails), self.project_id)
return policy_by_member