scripts/generate_terraform.py (327 lines of code) (raw):

# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import pathlib import subprocess import typing import jinja2 from ruamel import yaml CURRENT_PATH = pathlib.Path(__file__).resolve().parent PROJECT_ROOT = CURRENT_PATH.parent DATASETS_PATH = PROJECT_ROOT / "datasets" TEMPLATES_PATH = PROJECT_ROOT / "templates" / "terraform" TEMPLATE_PATHS = { "storage_bucket": TEMPLATES_PATH / "google_storage_bucket.tf.jinja2", "bigquery_dataset": TEMPLATES_PATH / "google_bigquery_dataset.tf.jinja2", "bigquery_table": TEMPLATES_PATH / "google_bigquery_table.tf.jinja2", "license": TEMPLATES_PATH / "license_header.tf.jinja2", "tfvars": TEMPLATES_PATH / "terraform.tfvars.jinja2", "variables": TEMPLATES_PATH / "variables.tf.jinja2", "provider": TEMPLATES_PATH / "provider.tf.jinja2", "backend": TEMPLATES_PATH / "backend.tf.jinja2", } yaml = yaml.YAML(typ="safe") def main( dataset_id: str, project_id: str, bucket_name_prefix: str, region: str, impersonating_acct: str, env: str, tf_state_bucket: str, tf_state_prefix: str, tf_apply: bool = False, format_code: bool = True, ): validate_bucket_name(bucket_name_prefix) env_path = PROJECT_ROOT / f".{env}" create_gitignored_env_path(dataset_id, env_path) generate_provider_tf(project_id, dataset_id, region, impersonating_acct, env_path) generate_backend_tf(dataset_id, tf_state_bucket, tf_state_prefix, env_path) dataset_config = yaml.load( open(DATASETS_PATH / dataset_id / "pipelines" / "dataset.yaml") ) infra_vars = load_env_vars(dataset_id, env).get("infra") generate_dataset_tf(dataset_id, project_id, dataset_config, infra_vars, env) generate_all_pipelines_tf(dataset_id, project_id, env_path) generate_variables_tf(dataset_id, env_path) generate_tfvars_file( project_id, bucket_name_prefix, dataset_id, region, impersonating_acct, env_path, infra_vars, ) if format_code and (env_path / "datasets" / dataset_id / "infra").exists(): terraform_fmt(env_path / "datasets" / dataset_id / "infra") if format_code and (DATASETS_PATH / dataset_id / "infra").exists(): terraform_fmt(DATASETS_PATH / dataset_id / "infra") if tf_apply: actuate_terraform_resources(dataset_id, env_path) def load_env_vars(dataset_id: str, env: str) -> dict: env_vars_file = PROJECT_ROOT / "datasets" / dataset_id / f".vars.{env}.yaml" return yaml.load(open(env_vars_file)) if env_vars_file.exists() else {} def generate_provider_tf( project_id: str, dataset_id: str, region: str, impersonating_acct: str, env_path: pathlib.Path, ): contents = apply_substitutions_to_template( TEMPLATE_PATHS["provider"], { "project_id": project_id, "region": region, "impersonating_acct": impersonating_acct, }, ) create_file_in_dir_tree(dataset_id, contents, "provider.tf", env_path) def generate_backend_tf( dataset_id: str, tf_state_bucket: str, tf_state_prefix: str, env_path: pathlib.Path ): if not tf_state_bucket: return contents = apply_substitutions_to_template( TEMPLATE_PATHS["backend"], { "tf_state_bucket": tf_state_bucket, "tf_state_prefix": tf_state_prefix, }, ) create_file_in_dir_tree( dataset_id, contents, "backend.tf", env_path, use_project_dir=False ) def generate_dataset_tf( dataset_id: str, project_id: str, config: dict, infra_vars: typing.Union[dict, None], env: str, ): if infra_vars: subs = infra_vars else: subs = {"project_id": project_id, "dataset_id": dataset_id, "env": env} if not config["resources"]: return contents = "" for resource in config["resources"]: if resource.get("dataset_id"): _subs = {**subs, **{"dataset_id": resource["dataset_id"]}} else: _subs = subs contents += tf_resource_contents(resource, {**resource, **_subs}) create_file_in_dir_tree( dataset_id, contents, f"{dataset_id}_dataset.tf", PROJECT_ROOT / f".{env}" ) def generate_all_pipelines_tf(dataset_id: str, project_id: str, env_path: pathlib.Path): pipeline_paths = list_subdirs(DATASETS_PATH / dataset_id / "pipelines") for pipeline_path in pipeline_paths: pipeline_config = yaml.load(open(pipeline_path / "pipeline.yaml")) generate_pipeline_tf( dataset_id, project_id, pipeline_path.name, pipeline_config, env_path ) def generate_pipeline_tf( dataset_id: str, project_id: str, pipeline_id: str, config: dict, env_path: pathlib.Path, ): subs = {"project_id": project_id, "dataset_id": dataset_id} if not config["resources"]: return contents = "" for resource in config["resources"]: contents += tf_resource_contents(resource, {**subs, **resource}) create_file_in_dir_tree( dataset_id, contents, f"{pipeline_id}_pipeline.tf", env_path ) def generate_variables_tf(dataset_id: str, env_path: pathlib.Path): contents = pathlib.Path(TEMPLATE_PATHS["variables"]).read_text() create_file_in_dir_tree(dataset_id, contents, "variables.tf", env_path) def generate_tfvars_file( project_id: str, bucket_name_prefix: str, dataset_id: str, region: str, impersonating_acct: str, env_path: pathlib.Path, infra_vars: typing.Union[dict, None], ): if infra_vars: tf_vars = infra_vars else: tf_vars = { "project_id": project_id, "bucket_name_prefix": bucket_name_prefix, "impersonating_acct": impersonating_acct, "region": region, "env": env_path.name.replace(".", ""), } contents = apply_substitutions_to_template( TEMPLATE_PATHS["tfvars"], {"tf_vars": tf_vars} ) target_path = env_path / "datasets" / dataset_id / "infra" / "terraform.tfvars" write_to_file(contents + "\n", target_path) print_created_files([target_path]) def tf_resource_contents(resource: dict, subs: dict) -> str: if resource["type"] not in TEMPLATE_PATHS: raise ValueError template_path = TEMPLATE_PATHS[resource["type"]] subs = customize_template_subs(resource, subs) return apply_substitutions_to_template(template_path, subs) def customize_template_subs(resource: dict, subs: dict) -> dict: if resource["type"] == "storage_bucket": subs["name"] = validate_bucket_name(resource["name"]) subs["uniform_bucket_level_access"] = resource.get( "uniform_bucket_level_access" ) elif resource["type"] == "bigquery_table": dataset_table = f"{subs['dataset_id']}_{resource['table_id']}" # Terraform resource names cannot start with digits, but BigQuery allows # table names that start with digits. We prepend `bqt_` to table names # that doesn't comply with Terraform's naming rule. if resource["table_id"][0].isdigit(): subs["tf_resource_name"] = f"bqt_{dataset_table}" else: subs["tf_resource_name"] = dataset_table return subs def validate_bucket_name(name: str): if "." in name: raise ValueError # Bucket names cannot contain "google" or close misspellings, such as # "g00gle" as mentioned in the bucket naming guidelines: # https://cloud.google.com/storage/docs/naming-buckets#requirements mapped_name = name.replace("0", "o").replace("1", "l").replace("3", "e") if "google" in mapped_name.lower(): raise ValueError( "Bucket names cannot contain 'google' or close misspellings, such" " as 'g00gle' as mentioned in the bucket naming guidelines:" " https://cloud.google.com/storage/docs/naming-buckets#requirements" ) # We use the convention where hyphens are used for bucket names instead of # underscores. if "_" in mapped_name: raise ValueError("Use hyphens over underscores for bucket names") return name def uppercase_bq_schema_types(schema: list) -> list: return [{"name": col["name"], "type": col["type"].upper()} for col in schema] def create_gitignored_env_path(dataset_id: str, env_path: pathlib.Path): if not (PROJECT_ROOT / "datasets" / dataset_id).exists(): raise FileNotFoundError( f"Directory {PROJECT_ROOT / 'datasets' / dataset_id} doesn't exist" ) (env_path / "datasets" / dataset_id).mkdir(parents=True, exist_ok=True) def create_file_in_dir_tree( dataset_id: str, contents: str, filename: str, env_path: pathlib.Path, use_env_dir: bool = True, use_project_dir: bool = True, ): filepaths = [] prefixes = [] if use_env_dir: prefixes.append(env_path / "datasets" / dataset_id / "infra") if use_project_dir: prefixes.append(DATASETS_PATH / dataset_id / "infra") for prefix in prefixes: if not prefix.exists(): prefix.mkdir(parents=True, exist_ok=True) target_path = prefix / filename write_to_file(contents + "\n", target_path) filepaths.append(target_path) print_created_files(filepaths) def print_created_files(filepaths: list): print("\nCreated\n") for filepath in sorted(filepaths): print(f" - {filepath.relative_to(PROJECT_ROOT)}") def write_to_file(contents: str, filepath: pathlib.Path): license_header = pathlib.Path(TEMPLATE_PATHS["license"]).read_text() with open(filepath, "w") as file_: file_.write(license_header + contents.replace(license_header, "")) def list_subdirs(path: pathlib.Path) -> typing.List[pathlib.Path]: """Returns a list of subdirectories""" subdirs = [f for f in path.iterdir() if f.is_dir() and not f.name[0] in (".", "_")] return subdirs def terraform_fmt(target_file: pathlib.Path): subprocess.Popen( f"terraform fmt -write=true {target_file}", stdout=subprocess.DEVNULL, shell=True, ).wait() def actuate_terraform_resources(dataset_id: str, env_path: pathlib.Path): cwd = env_path / "datasets" / dataset_id / "infra" subprocess.check_call(["terraform", "init"], cwd=cwd) subprocess.check_call(["terraform", "apply"], cwd=cwd) def apply_substitutions_to_template(template: pathlib.Path, subs: dict) -> str: j2_template = jinja2.Template(pathlib.Path(template).read_text()) return j2_template.render(**subs) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Generate Terraform code for infrastructure required by every dataset" ) parser.add_argument( "-d", "--dataset", required=True, type=str, dest="dataset", help="The directory name of the dataset", ) parser.add_argument( "--gcp-project-id", type=str, default="", dest="project_id", help="The Google Cloud project ID", ) parser.add_argument( "-b", "--bucket-name-prefix", type=str, default="", dest="bucket_name_prefix", help="The prefix to use for GCS bucket names for global uniqueness", ) parser.add_argument( "--tf-state-bucket", type=str, dest="tf_state_bucket", help="The GCS bucket name for the Terraform remote state", ) parser.add_argument( "--tf-state-prefix", type=str, default="terraform/state", dest="tf_state_prefix", help="The GCS bucket prefix for the Terraform remote state", ) parser.add_argument( "-e", "--env", type=str, default="dev", dest="env", help="The stage used for the resources: dev|staging|prod", ) parser.add_argument( "-r", "--region", type=str, dest="region", help="The GCP region for the Cloud Composer environment", ) parser.add_argument( "-i", "--impersonating-acct", dest="impersonating_acct", help="Use service account impersonation for Terraform", ) parser.add_argument("--tf-apply", dest="tf_apply", action="store_true") # Usage: python scripts/generate_terraform.py -d covid19_staging -i sa@projectiam.gserviceaccount.com args = parser.parse_args() main( args.dataset, args.project_id, args.bucket_name_prefix, args.region, args.impersonating_acct, args.env, args.tf_state_bucket, args.tf_state_prefix, args.tf_apply, )