solutions_builder/copier_extensions/sb_helpers.py (154 lines of code) (raw):

""" Copyright 2023 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import re import os import yaml import subprocess from jinja2.ext import Extension # Read YAML file and convert to a dict. def read_yaml(filepath): with open(filepath) as f: data = yaml.safe_load(f) return data def print_indent(text, offest=6): text = " " * offest + text print(text) # Execute shell commands def exec_output(command, working_dir=".", stop_when_error=False): try: output = subprocess.check_output(command, stderr=subprocess.STDOUT, cwd=working_dir, shell=True, text=True) except subprocess.CalledProcessError as e: # print("Status : FAIL", e.returncode, e.output) raise e else: return output def exec_gcloud_output(command, working_dir="."): output = "" output = exec_output(command) output = output.strip() return output def check_project_exist(project_id): """ Check if a GCP project exists. """ print_indent(f"(Retrieving project info for {project_id}...)") command = f"gcloud projects describe {project_id} --format='value(projectId)'" try: project_id = exec_gcloud_output(command).strip() if not project_id: return False return True except subprocess.CalledProcessError: print_indent( f"Unable to retrieve '{project_id}'. GCP project may not exist yet.\n") return False def get_project_number(project_id): """ Get GCP project number based on project_id using gcloud command. """ print_indent(f"(Retrieving project number for {project_id}...)") command = f"gcloud projects describe {project_id} --format='value(projectNumber)'" try: project_number = exec_gcloud_output(command).strip() if not project_number.isnumeric(): print_indent(f"project_number is not numeric: {project_number}") return "" return project_number except subprocess.CalledProcessError: print_indent( f"Unable to retrieve project_number for '{project_id}'. GCP project may not exist yet.\n") return "" def get_current_gcloud_auth_account(): command = "gcloud config list account --format 'value(core.account)'" return exec_gcloud_output(command).strip() def get_current_billing_account(project_id): command = "gcloud alpha billing accounts list --format='value(ACCOUNT_ID)' --quiet" return exec_gcloud_output(command).strip() def create_gcp_project(project_id): """ Get GCP project number based on project_id using gcloud command. """ print_indent(f"(Creating GCP project '{project_id}'...)") try: # Create account. command = f"gcloud projects create {project_id}" exec_gcloud_output(command) # Link billing account. billing_account = get_current_billing_account() command = f"gcloud beta billing projects link {project_id} --billing-account={billing_account}" exec_gcloud_output(command) except subprocess.CalledProcessError as e: print_indent(f"{e.output}") print_indent( f"Unable to create GCP project '{project_id}'.\n") def ask_to_create_gcp_project(project_id): # raw_input returns the empty string for "enter" yes = {"yes", "y", "ye"} no = {"no", "n"} choice = input(f"🎤 Create GCP project '{project_id}'? (y/n) ").lower() if choice.lower() in yes: create_gcp_project(project_id) return True elif choice.lower() in no: return False def get_existing_firestore(project_id): """ Get boolean whether to initialize Firestore. """ print_indent(f"(Retrieving Firestore databases list...)") command = f"gcloud alpha firestore databases list --format='value(databases[0].name)' --project='{project_id}' --quiet" try: database_name = exec_gcloud_output(command) return database_name except subprocess.CalledProcessError as e: print_indent( f"Unable to retrieve default Firestore database name for '{project_id}'. GCP project '{project_id}' may not exist on GCP yet.\n") return "" def get_current_user(project_id): """ Get current authenticated gcloud user. """ print(f" (Retrieving current authenticated gcloud user...)") command = f"gcloud config list account --format 'value(core.account)' | head -n 1" try: email = exec_gcloud_output(command) return email except subprocess.CalledProcessError as e: print_indent(f"{e.output}") print_indent(f"Unable to retrieve current authenticated gcloud user.") return "" def get_cloud_run_services(project_id): """ Get all deployed Cloud Run services. """ print(f" (Retrieving existing Cloud Run services for {project_id}...)") command = f"gcloud run services list --format='value(name)'" service_names = exec_gcloud_output(command) service_names = re.sub(r"\n", ",", service_names) return service_names def get_cluster_value(arguments): """ Get a specific GKE cluster value from describe. """ key, cluster_name, region = arguments print(f" (Retrieving {key} from cluster {cluster_name}...)") command = f"gcloud container clusters describe {cluster_name} --region={region} --format='value({key})'" return exec_gcloud_output(command) def get_services_from_yaml(solution_path): """ Get the service list from root yaml. """ st_yaml = read_yaml(f"{solution_path}/sb.yaml") services = [] components = st_yaml.get("components", {}) for component_name, properties in components.items(): if properties.get("service_path"): services.append(properties["resource_name"]) return ",".join(services) def get_default_gke_version(arguments): """ Get the default GKE verions in a specific channel. """ region, channel = arguments print(f"(Retrieving the default GKE version in channel {channel}...)") command = f"gcloud container get-server-config --region={region} --format='value(channels.defaultVersion)'" default_versions = exec_gcloud_output(command).split(";") channel_index = { "RAPID": 0, "REGULAR": 1, "STABLE": 2, } return default_versions[channel_index[channel]] def convert_resource_name(resource_name): """ Convert to valid resource_name: lower case, alpha-numeric, dash. """ resource_name = re.sub(r"\W+", "", resource_name) resource_name = resource_name.replace(" ", "").replace("_", "-") resource_name = resource_name.lower() if len(resource_name) > 63: resource_name = resource_name[:62] return resource_name def assert_non_empty(value): assert value, "Got empty value in Copier variable." return value class SolutionsTemplateHelpersExtension(Extension): def __init__(self, environment): super().__init__(environment) environment.filters["get_project_number"] = get_project_number environment.filters["check_project_exist"] = check_project_exist environment.filters["get_current_billing_account"] = get_current_billing_account environment.filters["get_existing_firestore"] = get_existing_firestore environment.filters["get_current_user"] = get_current_user environment.filters["get_cloud_run_services"] = get_cloud_run_services environment.filters["get_cluster_value"] = get_cluster_value environment.filters["get_default_gke_version"] = get_default_gke_version environment.filters["get_services_from_yaml"] = get_services_from_yaml environment.filters["convert_resource_name"] = convert_resource_name environment.filters["assert_non_empty"] = assert_non_empty