gcpdiag/queries/kms.py (44 lines of code) (raw):

# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Lint as: python3 """Queries related to GCP Cloud Key Management.""" import logging import googleapiclient.errors from gcpdiag import caching, config, models, utils from gcpdiag.queries import apis, iam from gcpdiag.utils import GcpApiError class CryptoKey(models.Resource): """Represents a KMS Crypto Key. See also the API documentation: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys """ @property def full_path(self) -> str: return self._resource_data['name'] @property def name(self) -> str: return self._resource_data['name'] def is_destroyed(self) -> bool: return self._resource_data['primary'].get('state') == 'DESTROYED' def is_enabled(self) -> bool: return self._resource_data['primary'].get('state') == 'ENABLED' def __init__(self, project_id, resource_data): super().__init__(project_id=project_id) self._resource_data = resource_data class KMSCryptoKeyIAMPolicy(iam.BaseIAMPolicy): def _is_resource_permission(self, permission): return True @caching.cached_api_call def get_crypto_key(key_name: str) -> CryptoKey: """Get a Crypto Key object by its resource name, caching the result.""" project_id = utils.get_project_by_res_name(key_name) kms_api = apis.get_api('cloudkms', 'v1', project_id) query = kms_api.projects().locations().keyRings().cryptoKeys().get( name=key_name) logging.debug('fetching KMS Key %s in project %s', utils.extract_value_from_res_name(key_name, 'cryptoKeys'), project_id) try: resource_data = query.execute(num_retries=config.API_RETRIES) except googleapiclient.errors.HttpError as err: raise GcpApiError(err) from err return CryptoKey(project_id, resource_data) @caching.cached_api_call def get_crypto_key_iam_policy(key_name: str) -> KMSCryptoKeyIAMPolicy: project_id = utils.get_project_by_res_name(key_name) kms_api = apis.get_api('cloudkms', 'v1', project_id) query = kms_api.projects().locations().keyRings().cryptoKeys().getIamPolicy( resource=key_name) return iam.fetch_iam_policy(query, KMSCryptoKeyIAMPolicy, project_id, key_name)