def _expand_policy()

in gcpdiag/queries/iam.py [0:0]


  def _expand_policy(self, resource_data: Dict[str, Any]) -> Dict[str, Any]:
    """Groups `getIamPolicy` bindings by member

    API response contains a list of bindings of a role to members:
    {
      "bindings": [
      {
        "role": "roles/resourcemanager.organizationAdmin",
        "members": [
          "user:mike@example.com",
          "serviceAccount:my-project-id@appspot.gserviceaccount.com"
        ]
      },
      ...
    }

    This method will convert those bindings into the following structure:
    {
      "user:mike@example.com": {
        "roles": { "roles/resourcemanager.organizationAdmin" },
      },
      "serviceAccount:my-project-id@appspot.gserviceaccount.com": {
        "roles": { "roles/resourcemanager.organizationAdmin" },
      },
    }
    """

    policy_roles = set()
    policy_by_member: Dict[str, Any] = defaultdict(dict)

    # Empty lists are omitted in GCP API responses
    for binding in resource_data.get('bindings', []):
      if 'condition' in binding:
        logging.warning(
            'IAM binding contains a condition, which would be ignored: %s',
            binding)

      # IAM binding should always have a role and at least one member
      policy_roles.add(binding['role'])
      for member in binding['members']:
        member_policy = policy_by_member[member]
        member_policy.setdefault('roles', set()).add(binding['role'])

    # Populate cache for IAM roles used in the policy
    # Unlike `has_role_permissions` this part will be executed inside
    # `prefetch_rule` and will benefit from multi-threading execution
    for role in policy_roles:
      # Ignore all errors - there could be no rules involving this role
      try:
        _get_iam_role(role, self.project_id)
      except (RoleNotFoundError, utils.GcpApiError) as err:
        # Ignore roles if cannot retrieve a role
        # For example, due to lack of permissions
        if isinstance(err, utils.GcpApiError):
          logging.error('API failure getting IAM roles: %s', err)
          raise utils.GcpApiError(err) from err
        elif isinstance(err, RoleNotFoundError):
          logging.warning("Unable to get IAM role '%s', ignoring: %s", role,
                          err)

    # Populate cache for service accounts used in the policy
    # Note: not implemented as a generator expression because
    # it looks ugly without assignment expressions, available
    # only with Python >= 3.8.
    sa_emails = set()
    for member in policy_by_member.keys():
      # Note: not matching / makes sure that we don't match for example fleet
      # workload identities:
      # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity
      m = re.match(r'serviceAccount:([^/]+)$', member)
      if m:
        sa_emails.add(m.group(1))
    _batch_fetch_service_accounts(list(sa_emails), self.project_id)

    return policy_by_member