generate/util.py (231 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import random import string import os import re from datetime import datetime import yaml import json import google.auth import google.auth.credentials import google.oauth2.credentials import google.auth.transport.requests from google.cloud import storage from git import Repo import requests RANDOM_WORD = "gcp-cdk-tf-id_" TF_SINGLE_OUT = "cdktf.out/stacks/{stack_name}/cdk.tf" continent_short_name = { "africa": "af", "asia": "az", "australia": "au", "europe": "eu", "northamerica": "na", "southamerica": "sa", "us": "us", "me": "me", } DIR_REGEX = r"(n)orth|(s)outh|(e)ast|(w)est|(c)entral" DIR_SUBS = "\\1\\2\\3\\4\\5" def cdktf_output(stack_name, output_folder="cdktf.out"): return f"{output_folder}/stacks/{stack_name}/cdk.tf" def clean_principal_id(name: str) -> str: """returns clean principal id""" if name.startswith("principalSet:") or name.startswith("principal:"): return name.split("/")[-1] return name.replace(":", "_").replace(".", "").replace("@", "") def clean_res_id(name): """returns resource id name lowercase and hypen""" return name.lower().replace(".", "-").replace("_", "-") def lower(name): """return lower case name replaces with underscore""" name = name.strip().lower() name = re.sub(r"\s+", "_", name) return name def clean_tf_folder(name): """returns clean folder td id string""" name = str(name) if not name: return "org" if name[0] == "/": name = name[1:] return name.replace("/", "__") def time_str(): "return human readable time" return datetime.now().strftime("%Y-%m-%d_%H-%M") def random_str(n=8): """returns random string, N is length of string, default - 8""" return "".join( random.choices( string.ascii_lowercase + string.ascii_uppercase + string.digits, k=n ) ) def short_region(region): """return short region or zone name""" cot_reg = region.split("-") cot_reg[0] = continent_short_name[cot_reg[0]] cot_reg[1] = re.sub(DIR_REGEX, DIR_SUBS, cot_reg[1]) return "".join(cot_reg) def get_file_yaml(filename): "returns yaml as dict" with open(filename, "r", encoding="utf-8") as fp: yaml_dict = yaml.safe_load(fp) return yaml_dict def write_file_yaml(filename, dict_data): "creates yaml file" os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, "w", encoding="utf-8") as fp: yaml.safe_dump(dict_data, fp, sort_keys=False, default_flow_style=False) def write_file_json(filename, dict_data): "creates json file" os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, "w", encoding="utf-8") as fp: json.dump(dict_data, fp, indent=4, sort_keys=False) def write_file_any(filename, content): "creates json file" os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, "w", encoding="utf-8") as fp: fp.write(content) def upload_to_gcs(bucket_name, source_file_name, destination_blob_name): """Uploads a file to the bucket.""" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(source_file_name) print(f"File {source_file_name} uploaded to {destination_blob_name}.") def download_from_gcs(bucket_name, source_blob_name): """Downloads a blob from the bucket.""" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(source_blob_name) os.makedirs(os.path.dirname(source_blob_name), exist_ok=True) blob.download_to_filename(source_blob_name) print(f"Blob {source_blob_name} downloaded to {source_blob_name}.") contents = blob.download_as_string().decode("utf-8") return contents def upload_folder_to_gcs(bucket_name, local_folder, gcs_prefix=""): """Uploads a folder to the bucket recursively.""" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) for path, dirs, files in os.walk(local_folder): # skip hidden files dirs[:] = [d for d in dirs if not d.startswith(".")] files = [f for f in files if not f.startswith(".")] for name in files: local_file = os.path.join(path, name) local_file_path = os.path.relpath(local_file, local_folder) blob_name = os.path.join(gcs_prefix, local_file_path) blob = bucket.blob(blob_name) blob.upload_from_filename(local_file) print(f"Files {','.join(files)} uploaded to gs://{bucket_name}/{gcs_prefix}") def tf_vars_file(variables, output_folder): """ Creates a tfvars file from a dictionary of variables. Args: variables: A dictionary of variables, where the key is the variable name and the value is the variable value. output_folder: The path to the folder where the tfvars file should be saved. """ os.makedirs(output_folder, exist_ok=True) output_file = os.path.join(output_folder, "terraform.tfvars") with open(output_file, "w", encoding="utf-8") as f: for key, value in variables.items(): if isinstance(value, str): f.write(f'{key} = "{value}"\n') else: f.write(f"{key} = {value}\n") def split_tf_file(input_file, output_folder): """Splits a .tf file into multiple files based on the variable pattern. Args: input_file: Path to the input .tf file. output_folder: Path to the folder where output files will be saved. """ os.makedirs(output_folder, exist_ok=True) with open(input_file, "r", encoding="utf-8") as f: content = f.read() tf_content = re.sub(r"\"\$\{(.+)\}\"", "\\1", content) pattern = rf"variable\s+\"{RANDOM_WORD}(file_.*?)\"\s+\{{\s*\}}" tf_list = re.split(pattern, tf_content) filename = "backend" i = 0 j = 0 output_files = [] while i < len(tf_list): tfdata = tf_list[i] i += 1 if match := re.search(r"^file_(\S+)$", tfdata): filename = match.group(1) continue j += 1 filename = filename or f"file_{j}" filename = f"{filename}.tf" output_file = os.path.join(output_folder, filename) output_files.append(filename) with open(output_file, "w", encoding="utf-8") as f: f.write(tfdata) filename = "" print(f"Created files: {','.join(output_files)} in {output_folder}") def ssm_url_extract(url): """Extract instance_id, project_number & location {instance_id}-{project_number}-api.{location}.sourcemanager.dev""" ssm_host = url.replace("https://", "").replace("http://", "") ssm_li = ssm_host.split(".") location = ssm_li[1] ssm_sub = ssm_li[0].removesuffix("-api").split("-") instance_id, project_number = "-".join(ssm_sub[:-1]), ssm_sub[-1] return instance_id, project_number, location def ssm_repository(repo_name, ssm_url): """Lists repositories in a Secure Source Manager instance using the requests library.""" instance_id, project_number, location = ssm_url_extract(ssm_url) access_token = get_access_token() repo_uri = f"projects/{project_number}/locations/{location}/repositories" base_url = ( f"https://{instance_id}-{project_number}-api.{location}.sourcemanager.dev/v1/" ) repo_id = f"{repo_uri}/{repo_name}" url = f"{base_url}/{repo_uri}" headers = {"Authorization": f"Bearer {access_token}"} # list repositories params = {} next_page = None while True: if next_page: params = {"page_token": next_page} response = requests.get(url, params=params, headers=headers, timeout=120) rep = response.json() if not response.ok: print("list repositories failed", rep) response.raise_for_status() rep = response.json() for repo in rep.get("repositories", []): if repo.get("name") == repo_id: print(f"git repo present, skipping creation {repo['uris']['gitHttps']}") return repo["uris"]["gitHttps"] next_page = rep.get("nextPageToken") if not next_page: break # create repository response = requests.post( url, headers=headers, params={"repository_id": repo_name}, data={}, timeout=120 ) rep = response.json() if not response.ok: print("create repository failed", rep) response.raise_for_status() created_repo = "" try: created_repo = rep["response"]["uris"]["gitHttps"] except KeyError: print("create repository failed", rep) print(f"ssm git repo created{created_repo}") return created_repo def push_folder_to_git(repo_path, remote_url, branch_name="main"): """Pushes the contents of an existing folder to a Git repository. Args: repo_path: Local path to the folder (already initialized as a Git repository). remote_url: The URL of the remote git repository (e.g., "https://github.com/your-username/your-repo.git"). branch_name: The branch to push to (defaults to "main"). """ repo = Repo.init(repo_path) if branch_name not in repo.heads: repo.git.checkout("-b", branch_name) else: repo.heads[branch_name].checkout() # Add, commit, and push changes repo.git.add(all=True, force=True) repo.index.commit("eztf auto commit") # Add remote if "origin" not in repo.remotes: repo.create_remote("origin", remote_url) origin = repo.remotes.origin r = re.compile(r"refs/tags/0\.(\d+)-auto") max_tag = 0 # below command returns # 0206504b3460ac4e63e28461c525a3708f20a960 refs/tags/0.1-auto rem_tag = repo.git.ls_remote("--tags", "origin", "0.*-auto").strip() for tagline in rem_tag.split("\n"): tagl = tagline.strip().split() if len(tagl) < 2: continue if match := r.match(tagl[1].strip()): curr_tag = int(match.group(1)) max_tag = max(curr_tag, max_tag) max_tag += 1 tag = f"0.{max_tag}-auto" repo.create_tag(tag) print(f"pushing branch {branch_name} & tag {tag} to {remote_url}") origin.push( refspec=[f"{branch_name}:{branch_name}", f"{tag}:{tag}"], force=True, atomic=True, ) print("Successfully pushed to Remote Git") def get_env_token(): return os.environ.get("EZTF_GCLOUD_ACCESS") def get_access_token(): auth_req = google.auth.transport.requests.Request() gcp_token = google.oauth2.credentials.UserAccessTokenCredentials() gcp_token.refresh(request=auth_req) return gcp_token.token