generate/resources/__init__.py (269 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from typing import Any
from constructs import Construct
from cdktf import (
TerraformStack,
TerraformVariable,
GcsBackend,
)
from cdktf_cdktf_provider_google.provider import GoogleProvider
import util
from ._users import generate_users
from ._group import generate_groups
from ._sa import generate_sa
from ._myfolders import generate_folders
from ._projects import generate_projects, generate_svc_projects
from ._iam import generate_iam
from ._api import generate_project_services
from ._network import (
generate_networks,
generate_peerings,
generate_firewalls,
generate_routers,
add_subnets,
generate_fw_policy_nw,
generate_fw_policy_rh,
)
from ._logging import (
generate_logging,
generate_log_destination,
generate_logsink,
add_dest_sink_map,
)
from ._monitoring import generate_monitoring
from ._org_policy import generate_org_policies, generate_custom_org_policies
from ._vpn import generate_vpn, generate_vpn_ha, generate_external_vpn_gateways
from ._gke import generate_gke
from ._mydata import data_google_org
from ._kms import generate_kms
from ._gcs import generate_gcs
from ._vm import (
generate_compute_disk,
generate_compute_instances,
generate_instance_from_template,
generate_instance_template,
generate_mig,
generate_umig,
)
from ._database import generate_cloudsql
from ._bigquery import (
generate_bigquery_dataset,
generate_bigquery_table,
generate_bigquery_routine,
)
from ._vpcsc import (
generate_sc_policy,
generate_sc_access_level,
generate_sc_perimeter,
generate_sc_perimeter_bridge,
)
from ._any_module import generate_any_module
creation = {
"users": generate_users,
"groups": generate_groups,
"folders": generate_folders,
"projects": generate_projects,
"iam": generate_iam,
"service_account": generate_sa,
"project_api": generate_project_services,
"network": generate_networks,
"firewall": generate_firewalls,
"firewall_policy_nw": generate_fw_policy_nw,
"firewall_policy_rh": generate_fw_policy_rh,
"_svc_projects": generate_svc_projects,
"peering": generate_peerings,
"router": generate_routers,
"logging": generate_logging,
"logsink": generate_logsink,
"logpubsub": generate_log_destination,
"logstorage": generate_log_destination,
"logbucket": generate_log_destination,
"logbigquery": generate_log_destination,
"logproject": generate_log_destination,
"monitoring": generate_monitoring,
"org_policy": generate_org_policies,
"org_node_policy": generate_org_policies,
"custom_org_policy": generate_custom_org_policies,
"external_vpn_gateway": generate_external_vpn_gateways,
"vpn": generate_vpn,
"vpn_ha": generate_vpn_ha,
"gke": generate_gke,
"gke_private": generate_gke,
"gke_autopilot": generate_gke,
"gke_autopilot_private": generate_gke,
"kms": generate_kms,
"sc_policy": generate_sc_policy,
"sc_access_level": generate_sc_access_level,
"sc_perimeter": generate_sc_perimeter,
"sc_perimeter_bridge": generate_sc_perimeter_bridge,
"gcs": generate_gcs,
"cloudsql": generate_cloudsql,
"pgsql": generate_cloudsql,
"mysql": generate_cloudsql,
"mssql": generate_cloudsql,
"vm": generate_compute_instances,
"vm_template": generate_instance_template,
"vm_from_template": generate_instance_from_template,
"mig": generate_mig,
"umig": generate_umig,
"disk": generate_compute_disk,
"bq_dataset": generate_bigquery_dataset,
"bq_table": generate_bigquery_table,
"bq_routine": generate_bigquery_routine,
"any_module": generate_any_module,
}
data_creation = {"google_org": data_google_org}
variable_creation = {
"users": ["organization_id", "setup_service_account"],
"iam": ["organization_id"],
"folders": ["organization_id"],
"projects": ["organization_id", "billing_id", "project_suffix"],
"firewall_policy_nw": ["organization_id"],
"firewall_policy_rh": ["organization_id"],
"org_policy": ["organization_id"],
"custom_org_policy": ["organization_id"],
"logging": ["organization_id"],
"logsink": ["organization_id"],
"sc_policy": ["organization_id"],
}
added_ref = {"network": [add_subnets], "logsink": [add_dest_sink_map]}
sentinel = object()
class MyStack(TerraformStack):
"""Creates GCP tf"""
def __init__(
self,
scope: Construct,
id: str,
eztf_config: Any,
sub_stack_name: str,
eztf_range_resources: Any,
):
super().__init__(scope, id)
self.eztf_config = eztf_config
self.created = {"vars": {}, "locals": {}, "data": {}, "null": {}}
self.added = {}
self._create_backend(sub_stack_name)
self.file_seprator_variable("variables", True)
for range_resource in eztf_range_resources:
for my_resource, resource in range_resource.items():
if not creation.get(resource):
continue
if not self.created.get(resource):
self.created[resource] = {}
if added_ref.get(resource):
for add_ref in added_ref[resource]:
add_ref(self, my_resource, resource)
if variable_creation.get(resource):
self.ensure_variables(variable_creation[resource])
for range_resource in eztf_range_resources:
for my_resource, resource in range_resource.items():
if creation.get(resource):
self.file_seprator_variable(my_resource)
creation[resource](self, my_resource, resource)
def file_seprator_variable(self, name, force=False):
if self.eztf_config.get(name) or force:
TerraformVariable(self, f"{util.RANDOM_WORD}file_{name}")
def _create_backend(self, config_sub_type):
var = self.eztf_config.get("variable", {})
GoogleProvider(
self,
id="google",
project=var.get("setup_project_id", ""),
)
GcsBackend(
self,
bucket=var.get("gcs_bucket", ""),
prefix=f"terraform-{config_sub_type}-state",
)
def ensure_data(self, data_li):
for name in data_li:
if not self.created["data"].get(name):
data_creation[name](self)
def ensure_variables(self, variables):
self._create_variables({variable: "" for variable in variables})
def _create_variables(self, variables):
if not self.created.get("vars"):
self.created["vars"] = {}
for variable, _ in variables.items():
if not self.created["vars"].get(variable):
self.created["vars"][variable] = TerraformVariable(
self,
variable,
description=" ".join(variable.split("_")),
)
def ref_principal(self, name):
p = name.split(":")
p_type, p_id = p[0], ":".join(p[1:])
ref_p_id = self.tf_ref(p_type, p_id)
return f"{p_type}:{ref_p_id}"
def _re_region_subnet(self, subnet_link):
pattern = r"/regions/(.+)/subnetworks/(.+)"
if match := re.search(pattern, subnet_link):
return f"{match.group(1)}/{match.group(2)}"
return subnet_link
def which_node(self, node):
if not node or node == "/" or node.startswith("organizations/"):
return "organization"
if node.startswith("/") or node.startswith("folders/"):
return "folder"
return "project"
def tf_param_list(self, data, key, attribute_object_func):
if data and data.get(key):
data[key] = [attribute_object_func(**item) for item in data[key]]
# fmt: off
def tf_ref(self, res_type, name, default=sentinel):
if default is sentinel:
default = name
refname = default
if res_type == "user" and self.created.get("users", {}).get(name):
refname = self.created["users"][name].primary_email
if res_type == "group" and self.created.get("groups", {}).get(name):
refname = self.created["groups"][name].id_output
if res_type == "group_name" and self.created.get("groups", {}).get(name):
refname = self.created["groups"][name].name_output
if res_type == "service_account" and self.created.get("service_account", {}).get(name):
refname = self.created["service_account"][name].email
elif res_type == "network" and self.created.get("network", {}).get(name):
refname = self.created["network"][name].network_self_link_output
elif res_type == "network_name" and self.created.get("network", {}).get(name):
refname = self.created["network"][name].network_name_output
elif res_type == "network_id" and self.created.get("network", {}).get(name):
refname = self.created["network"][name].network_id_output
elif res_type == "project" and self.created.get("projects", {}).get(name):
refname = self.created["projects"][name].project_id_output
elif res_type == "project_number" and self.created.get("projects", {}).get(name):
refname = self.created["projects"][name].project_number_output
elif res_type == "projects/number" and self.created.get("projects", {}).get(name):
refname = f'projects/{self.created["projects"][name].project_number_output}'
elif res_type == "organization":
self.ensure_variables(["organization_id"])
refname = self.created["vars"]["organization_id"].string_value
elif res_type == "folder" and self.created.get("folders", {}).get(name):
refname = self.created["folders"][name].name
elif res_type == "folder_id" and self.created.get("folders", {}).get(name):
refname = self.created["folders"][name].folder_id
elif res_type == "subnet":
region_subnet = self._re_region_subnet(name)
if vpc_name := self.added.get("subnets", {}).get(region_subnet):
refname = f'${{module.nw_{vpc_name}.subnets["{region_subnet}"].self_link}}'
elif res_type == "vpn_ha" and name in self.added.get("vpn_ha", set()):
refname = f"${{module.vpn_ha_{name}.self_link}}"
elif res_type == "external_vpn_gateway" and self.created.get("external_vpn_gateway", {}).get(name):
refname = self.created["external_vpn_gateway"][name].self_link
elif res_type == "sc_policy" and self.created.get("sc_policy", {}).get(name):
refname = self.created["sc_policy"][name].name
elif res_type == "sc_access_level_name" and self.created.get("sc_access_level", {}).get(name):
refname = self.created["sc_access_level"][name].name_output
elif res_type == "custom_org_policy" and self.created.get("custom_org_policy", {}).get(name):
refname = self.created["custom_org_policy"][name].name
elif res_type == "log_destination" and self.created.get(res_type, {}).get(name):
refname = self.created[res_type][name].destination_uri_output
if res_type == "vm_template" and self.created.get("vm_template", {}).get(name):
refname = self.created["vm_template"][name].self_link_unique_output
if res_type == "disk" and self.created.get("disk", {}).get(name):
refname = self.created["disk"][name].self_link
if res_type == "bq_dataset" and self.created.get("bq_dataset", {}).get(name):
refname = self.created["bq_dataset"][name].dataset_id
if res_type == "bq_table" and self.created.get("bq_table", {}).get(name):
refname = self.created["bq_table"][name].table_id
if res_type == "bq_routine" and self.created.get("bq_routine", {}).get(name):
refname = self.created["bq_routine"][name].routine_id
return refname
# fmt: on