in gcpdiag/queries/iam.py [0:0]
def _batch_fetch_service_accounts(emails: List[str], billing_project_id: str):
"""Retrieve a list of service accounts.
This function is used when inspecting a project, to retrieve all service accounts
that are used in the IAM policy, so that we can do this in a single batch request.
The goal is to be able to call is_service_account_enabled() without triggering
another API call.
`project_id` is used primarily as the default billing project. Service
accounts from other projects in `emails` will be also retrieved.
"""
iam_api = apis.get_api('iam', 'v1', billing_project_id)
service_accounts_api = iam_api.projects().serviceAccounts()
requests = [
service_accounts_api.get(
name=f'projects/{_extract_project_id(email)}/serviceAccounts/{email}')
for email in emails
if email not in _service_account_cache_fetched
]
for email in emails:
_service_account_cache_fetched[email] = True
for request in requests:
try:
response = request.execute(num_retries=config.API_RETRIES)
sa = ServiceAccount(response['projectId'], response)
_service_account_cache[sa.email] = sa
except googleapiclient.errors.HttpError as err:
exception = utils.GcpApiError(err)
if exception:
# Extract the requested service account and its associated project ID
# from the URI. This is especially useful when dealing with scenarios
# involving cross-project service accounts within a project.
m = re.search(r'/projects/([^/]+)/[^/]+/([^?]+@[^?]+)', request.uri)
if not m:
logging.warning("BUG: can't determine SA email from request URI: %s",
request.uri)
continue
sa_project_id = m.group(1)
email = m.group(2)
# 403 or 404 is expected for Google-managed service agents.
if email.partition('@')[2] in SERVICE_AGENT_DOMAINS or \
email.partition('@')[2].startswith('gcp-sa-'):
# Too noisy even for debug-level
# logging.debug(
# 'ignoring error retrieving google-managed service agent %s: %s', email, exception)
pass
elif isinstance(exception,
utils.GcpApiError) and exception.status == 404:
_service_account_cache_is_not_found[email] = True
else:
# Determine if the failing service account belongs to a different project.
# Retrieving service account details may fail due to various conditions.
if sa_project_id != billing_project_id:
logging.warning(
"can't retrieve service account %s belonging to project %s but used in project: %s",
email, sa_project_id, billing_project_id)
_service_account_cache_is_not_found[email] = True
continue
project_nr = crm.get_project(sa_project_id).number
if ((sa_project_id == billing_project_id) and re.match(rf'{project_nr}-\w+@', email) \
or email.endswith(f'@{billing_project_id}.iam.gserviceaccount.com')):
# if retrieving service accounts from the project being inspected fails,
# we need to fail hard because many rules won't work correctly.
raise utils.GcpApiError(err) from err
else:
logging.warning("can't get service account %s: %s", email,
exception)