src/utils/interactive/apply_config.py (305 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Resource Configuration execution routines"""
import logging
import typing
from google.api_core import retry
from google.api_core.exceptions import Conflict
from google.api_core.exceptions import Forbidden
from google.api_core.exceptions import Unauthorized
from google import auth as gauth
from google.auth.transport import requests
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.bigquery.enums import EntityTypes
import googleapiclient.discovery
from googleapiclient.errors import HttpError
_RETRY_TIMEOUT_SEC = 60.0 # Timeout for API retries
SOURCE_PROJECT_APIS = [
"cloudresourcemanager", "storage-component", "bigquery", "cloudbuild"
]
TARGET_PROJECT_APIS = ["storage-component", "bigquery"]
PROJECT_ROLES = [
"roles/bigquery.user", "roles/cloudbuild.builds.builder",
"roles/iam.serviceAccountUser"
]
@retry.Retry(predicate=retry.if_exception_type(KeyError, HttpError),
timeout=_RETRY_TIMEOUT_SEC)
def get_cloud_build_account(project_id: str) -> str:
"""
Retrieves GCP project Cloud Build account principal by project name/id.
Since this gets called soon after the Cloud Build API is enabled,
the @retry.Retry dectorator is called to ensure the API is available before
this function proceeds with retrieving the serivce account.
Args:
project_id (str): project id
Returns:
str: Cloud Build account principal
"""
# Get default Cloud Build account
cloudbuild_account_url = (
"https://cloudbuild.googleapis.com/v1/projects/"
f"{project_id}/locations/global/defaultServiceAccount")
credentials, _ = gauth.default(quota_project_id=project_id)
session = requests.AuthorizedSession(credentials)
response_json = session.get(cloudbuild_account_url).json()
sa_email = response_json["serviceAccountEmail"]
return sa_email.split("/")[-1]
def add_bq_roles(client: bigquery.Client, dataset: bigquery.Dataset,
service_account: str, roles: typing.List[str]):
"""Adds role bindings to a BigQuery dataset for a service account.
Args:
client (bigquery.Client): BigQuery client object
dataset (bigquery.Dataset): BigQuery dataset
service_account (str): Service Account principal (email)
roles (typing.List[str]): List of roles as role/<rolename>
"""
logging.info("\tConfiguring roles %s on dataset %s for %s.", str(roles),
dataset.full_dataset_id, service_account)
entries = dataset.access_entries
modified = False
for role in roles:
found = False
all_role_names = [role]
if role == "roles/bigquery.dataViewer":
all_role_names.append("READER")
elif role == "roles/bigquery.dataEditor":
all_role_names.append("WRITER")
elif role == "roles/bigquery.dataOwner":
all_role_names.append("OWNER")
for entry in entries:
if (entry.entity_id in [
service_account, f"serviceAccount:{service_account}"
] and entry.role in all_role_names):
found = True
break
if not found:
modified = True
entries.append(
bigquery.AccessEntry(
role=role,
entity_type=EntityTypes.USER_BY_EMAIL,
entity_id=service_account,
))
if modified:
dataset.access_entries = entries
dataset = client.update_dataset(dataset, ["access_entries"])
def add_project_roles(project_id: str, service_account: str,
roles: typing.List[str]):
"""Adds IAM role bindings to a Service Account in a Project.
Args:
project_id (str): project id
service_account (str): servicce account principal (email)
roles (typing.List[str]): list of roles
"""
logging.info("Configuring roles %s on project %s for %s.", str(roles),
project_id, service_account)
crm = googleapiclient.discovery.build("cloudresourcemanager",
"v1",
cache_discovery=False)
service_account_name = f"serviceAccount:{service_account}"
modified = False
trying = True
while modified or trying:
trying = False
policy = (crm.projects().getIamPolicy(
resource=project_id,
body={
"options": {
"requestedPolicyVersion": "1"
}
},
).execute())
for role in roles:
role_binding = None
if not "bindings" in policy:
policy["bindings"] = []
for binding in policy["bindings"]:
if binding["role"] == role:
role_binding = binding
break
if not role_binding:
role_binding = {"role": role, "members": [service_account_name]}
modified = True
policy["bindings"].append(role_binding)
else:
if service_account_name not in role_binding["members"]:
modified = True
role_binding["members"].append(service_account_name)
if modified:
try:
crm.projects().setIamPolicy(resource=project_id,
body={
"policy": policy
}).execute()
break
except (Conflict, HttpError) as ex:
if isinstance(ex, HttpError) and ex.status_code != 409:
raise
continue
def create_bq_dataset_with_roles(project_id: str, location: str,
dataset_name: str, service_account: str,
roles: typing.List[str]):
"""Creates a BigQuery dataset, and adds role bindings on it for
a service account.
If datasets already exists, only does role binding.
Args:
project_id (str): project id
location (str): BigQuery location
dataset_name (str): dataset name
service_account (str): service account principal (email)
roles (typing.List[str]): list of roles to bind
"""
client = bigquery.Client(project=project_id, location=location)
try:
logging.info("Creating dataset %s.%s.", project_id, dataset_name)
dataset = client.create_dataset(dataset_name, exists_ok=False)
except Conflict:
logging.info("\tDataset %s.%s already exists.", project_id,
dataset_name)
dataset = client.get_dataset(dataset_name)
add_bq_roles(client, dataset, service_account, roles)
def create_storage_bucket_with_roles(project_id: str, location: str,
bucket_name: str, service_account: str,
roles: typing.List[str]):
"""Creates a Storage Bucket, and adds role bindings on it for
a service account.
If bucket already exists, only does role binding.
Args:
project_id (str): project id
location (str): location
bucket_name (str): bucket name
service_account (str): service account principal (email)
roles (typing.List[str]): list of roles
"""
logging.info("Creating storage bucket %s.", bucket_name)
client = storage.Client(project=project_id)
try:
bucket = client.create_bucket(bucket_name, location=location)
except Conflict:
# Bucket already exists, it's ok.
logging.info("\tBucket %s already exists.", bucket_name)
bucket = client.get_bucket(bucket_name)
add_bucket_roles(client, bucket, service_account, roles)
def add_bucket_roles(client: storage.Client, bucket: storage.Bucket,
service_account: str, roles: typing.List[str]):
logging.info("\tConfiguring roles %s on bucket %s for %s.", str(roles),
bucket.name, service_account)
service_account_name = f"serviceAccount:{service_account}"
modified = False
trying = True
while modified or trying:
trying = False
policy = bucket.get_iam_policy(client=client)
bindings = policy.bindings
for role in roles:
role_binding = None
for binding in bindings:
if binding["role"] == role:
role_binding = binding
break
if not role_binding:
role_binding = {"role": role, "members": [service_account_name]}
modified = True
bindings.append(role_binding)
else:
if service_account_name not in role_binding["members"]:
modified = True
role_binding["members"].add(service_account_name)
if modified:
try:
bucket.set_iam_policy(policy, client=client)
break
except Conflict:
continue
def enable_apis(project_id: str, apis: typing.List[str]):
"""Enables APIs in Google Cloud project
Args:
project_id (str): Google Cloud project id
apis (typing.List[str]): list of APIs to enable
"""
client = googleapiclient.discovery.build("serviceusage",
"v1",
cache_discovery=False)
for api in apis:
api_name = (api if api.endswith(".googleapis.com") else
f"{api}.googleapis.com")
response = (client.services().get(
name=f"projects/{project_id}/services/{api_name}").execute())
state = response["state"]
if state != "ENABLED":
logging.info("Enabling %s API in project %s", api_name, project_id)
client.services().enable(
name=f"projects/{project_id}/services/{api_name}").execute()
logging.info("\t%s API is enabled in project %s.", api_name, project_id)
def apply_all(config: typing.Dict[str, typing.Any]) -> bool:
"""Applies Cortex Data Foundation configuration changes:
* enables APIs
* adds necessary role bindings on projects for Cloud Build account
* creates datasets
* adds necessary role bindings on these datasets for Cloud Build account
* creates buckets
* adds necessary role bindings on these buckets for Cloud Build account
Args:
config (typing.Dict[str, typing.Any]): Data Foundation config dictionary
Returns:
bool: True if configuration was successful, False otherwise.
"""
source_project = config["projectIdSource"]
target_project = config["projectIdTarget"]
location = config["location"]
try:
logging.info("Enabling APIs in %s.", source_project)
try:
enable_apis(source_project, SOURCE_PROJECT_APIS)
except HttpError as ex:
if ex.status_code == 400 and "billing account" in ex.reason.lower():
logging.critical(("Project %s doesn't have "
"a Billing Account linked to it."),
source_project)
return False
else:
raise
if target_project != source_project:
try:
logging.info("Enabling APIs in %s.", target_project)
enable_apis(target_project, TARGET_PROJECT_APIS)
except HttpError as ex:
if (ex.status_code == 400 and
"billing account" in ex.reason.lower()):
logging.critical(("Project %s doesn't have "
"a Billing Account linked to it."),
source_project)
return False
else:
raise
cloud_build_account = get_cloud_build_account(source_project)
logging.info("Using Cloud Build account %s.", cloud_build_account)
# Add project-wide role binding for Cloud Build account
add_project_roles(source_project, cloud_build_account, PROJECT_ROLES)
if target_project != source_project:
add_project_roles(target_project, cloud_build_account,
PROJECT_ROLES)
dataset_dicts = []
source_datasets = []
reporting_datasets = []
dataset_dicts.append(config["k9"]["datasets"])
if config.get("deploySAP"):
dataset_dicts.append(config["SAP"]["datasets"])
if config.get("deploySFDC"):
dataset_dicts.append(config["SFDC"]["datasets"])
if config.get("deployOracleEBS"):
dataset_dicts.append(config["OracleEBS"]["datasets"])
if config.get("deployMarketing"):
if config["marketing"].get("deployGoogleAds"):
dataset_dicts.append(
config["marketing"]["GoogleAds"]["datasets"])
if config["marketing"].get("deployCM360"):
dataset_dicts.append(config["marketing"]["CM360"]["datasets"])
if config["marketing"].get("deployTikTok"):
dataset_dicts.append(config["marketing"]["TikTok"]["datasets"])
if config["marketing"].get("deployLiveRamp"):
dataset_dicts.append(
config["marketing"]["LiveRamp"]["datasets"])
if config["marketing"].get("deployMeta"):
dataset_dicts.append(config["marketing"]["Meta"]["datasets"])
if config["marketing"].get("deploySFMC"):
dataset_dicts.append(config["marketing"]["SFMC"]["datasets"])
if config["marketing"].get("deployDV360"):
dataset_dicts.append(config["marketing"]["DV360"]["datasets"])
if config["marketing"].get("deployGA4"):
dataset_dicts.append({
"cdc":
config["marketing"]["GA4"]["datasets"]["cdc"][0]
["name"],
"reporting":
config["marketing"]["GA4"]["datasets"]["reporting"]
})
for dataset_dict in dataset_dicts:
for ds in dataset_dict.items():
add_to = (reporting_datasets
if ds[0] == "reporting" else source_datasets)
if ds not in add_to: # type: ignore
if ds[1] != "":
add_to.append(ds[1]) # type: ignore
# Create datasets (if needed),
# and add "roles/bigquery.dataEditor" binding on them
# for the source project's Cloud Build account.
logging.info("Creating datasets in %s.", source_project)
for ds in source_datasets:
create_bq_dataset_with_roles(source_project, location, ds,
cloud_build_account,
["roles/bigquery.dataEditor"])
# If Cross Media is enabled, create VertexAI processing dataset.
# It cannot be in a multi-location.
if config["k9"].get("deployCrossMedia"):
ds = config["VertexAI"]["processingDataset"]
vertexai_region = location.lower()
if vertexai_region == "us":
vertexai_region = "us-central1"
elif vertexai_region == "eu":
vertexai_region = "europe-west4"
create_bq_dataset_with_roles(source_project, vertexai_region, ds,
cloud_build_account,
["roles/bigquery.dataEditor"])
if target_project != source_project:
# This check is only for logging.
logging.info("Creating datasets in %s.", target_project)
for ds in reporting_datasets:
create_bq_dataset_with_roles(target_project, location, ds,
cloud_build_account,
["roles/bigquery.dataEditor"])
# Create target storage bucket (if needed),
# and add "roles/storage.admin" binding on it for Cloud Build account.
create_storage_bucket_with_roles(source_project, location,
config["targetBucket"],
cloud_build_account,
["roles/storage.admin"])
if config.get("deployMarketing"):
marketing = config["marketing"]
if marketing.get("deployCM360"):
create_storage_bucket_with_roles(
source_project, location,
marketing["CM360"]["dataTransferBucket"],
cloud_build_account, ["roles/storage.admin"])
if marketing.get("deploySFMC"):
create_storage_bucket_with_roles(
source_project, location,
marketing["SFMC"]["fileTransferBucket"],
cloud_build_account, ["roles/storage.admin"])
except (HttpError, Forbidden, Unauthorized) as ex:
if isinstance(ex, HttpError):
message = ex.reason
if ex.status_code not in (401, 403):
raise
else:
message = ex.message
logging.critical("You do not have sufficient permissions: %s", message)
return False
return True