def _batch_fetch_service_accounts()

in gcpdiag/queries/iam.py [0:0]


def _batch_fetch_service_accounts(emails: List[str], billing_project_id: str):
  """Retrieve a list of service accounts.

  This function is used when inspecting a project, to retrieve all service accounts
  that are used in the IAM policy, so that we can do this in a single batch request.
  The goal is to be able to call is_service_account_enabled() without triggering
  another API call.

  `project_id` is used primarily as the default billing project. Service
  accounts from other projects in `emails` will be also retrieved.
  """

  iam_api = apis.get_api('iam', 'v1', billing_project_id)
  service_accounts_api = iam_api.projects().serviceAccounts()

  requests = [
      service_accounts_api.get(
          name=f'projects/{_extract_project_id(email)}/serviceAccounts/{email}')
      for email in emails
      if email not in _service_account_cache_fetched
  ]
  for email in emails:
    _service_account_cache_fetched[email] = True

  for request in requests:
    try:
      response = request.execute(num_retries=config.API_RETRIES)
      sa = ServiceAccount(response['projectId'], response)
      _service_account_cache[sa.email] = sa
    except googleapiclient.errors.HttpError as err:
      exception = utils.GcpApiError(err)
      if exception:
        # Extract the requested service account and its associated project ID
        # from the URI. This is especially useful when dealing with scenarios
        #  involving cross-project service accounts within a project.
        m = re.search(r'/projects/([^/]+)/[^/]+/([^?]+@[^?]+)', request.uri)
        if not m:
          logging.warning("BUG: can't determine SA email from request URI: %s",
                          request.uri)
          continue
        sa_project_id = m.group(1)
        email = m.group(2)

        # 403 or 404 is expected for Google-managed service agents.
        if email.partition('@')[2] in SERVICE_AGENT_DOMAINS or \
          email.partition('@')[2].startswith('gcp-sa-'):
          # Too noisy even for debug-level
          # logging.debug(
          #     'ignoring error retrieving google-managed service agent %s: %s', email, exception)
          pass
        elif isinstance(exception,
                        utils.GcpApiError) and exception.status == 404:
          _service_account_cache_is_not_found[email] = True
        else:
          # Determine if the failing service account belongs to a different project.
          # Retrieving service account details may fail due to various conditions.
          if sa_project_id != billing_project_id:
            logging.warning(
                "can't retrieve service account %s belonging to project %s but used in project: %s",
                email, sa_project_id, billing_project_id)
            _service_account_cache_is_not_found[email] = True
            continue

          project_nr = crm.get_project(sa_project_id).number
          if ((sa_project_id == billing_project_id) and re.match(rf'{project_nr}-\w+@', email) \
              or email.endswith(f'@{billing_project_id}.iam.gserviceaccount.com')):
            # if retrieving service accounts from the project being inspected fails,
            # we need to fail hard because many rules won't work correctly.
            raise utils.GcpApiError(err) from err

          else:
            logging.warning("can't get service account %s: %s", email,
                            exception)