generate/resources/_network.py (113 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from imports.network import Network from imports.firewall_rules import FirewallRules from imports.firewall_policy_nw import FirewallPolicyNw from imports.firewall_policy_rh import FirewallPolicyRh from imports.network_peering import NetworkPeering from imports.cloud_router import CloudRouter import util def create_network(self, vpc): vpc_name = vpc["network_name"] vpc["project_id"] = self.tf_ref("project", vpc["project_id"]) self.created["network"][vpc_name] = Network( self, f"nw_{vpc_name}", **vpc, ) def create_firewall(self, fw): vpc_name = fw["network_name"] fw["project_id"] = self.tf_ref("project", fw["project_id"]) fw["network_name"] = self.tf_ref("network", vpc_name) FirewallRules( self, f"fw_{vpc_name}", **fw, ) def create_fw_policy_rh(self, fp): node_type = self.which_node(fp["parent_node"]) fp["parent_node"] = self.tf_ref(node_type, fp["parent_node"]) if fp.get("target_org"): fp["target_org"] = self.tf_ref("organization", "/") fp["target_folders"] = [ self.tf_ref("folder_id", fldr) for fldr in fp.get("target_folders", []) ] for rule in fp["rules"]: if rule.get("target_service_accounts"): rule["target_service_accounts"] = self.tf_ref( "service_accounts", rule["target_service_accounts"] ) FirewallPolicyRh( self, f'fprh_{fp["policy_name"]}', **fp, ) def create_fw_policy_nw(self, fp): fp["project_id"] = self.tf_ref("project", fp["project_id"]) fp["target_vpcs"] = [ self.tf_ref("network_id", nw) for nw in fp.get("target_vpcs", []) ] for rule in fp["rules"]: if rule.get("target_service_accounts"): rule["target_service_accounts"] = self.tf_ref( "service_accounts", rule["target_service_accounts"] ) FirewallPolicyNw( self, f'fpnw_{fp["policy_name"]}', **fp, ) def create_peering(self, pair, peer_module_depends_on): local, peer = pair["local_network"], pair["peer_network"] peer_name = f"peer_{local}_{peer}" pair["prefix"] = pair.get("prefix","peer") pair["local_network"] = self.tf_ref("network", local) pair["peer_network"] = self.tf_ref("network", peer) pair["module_depends_on"] = peer_module_depends_on self.created["peering"][peer_name] = NetworkPeering( self, peer_name, **pair, ) return [self.created["peering"][peer_name].complete_output] def create_router(self, cr): vpc_name = cr["network"] region = cr["region"] vpc_re_name = f"{vpc_name}-{util.short_region(region)}" cr["name"] = f"cr-{vpc_re_name}" for nat in cr.get("nats"): nat["name"] = f"nat-{vpc_re_name}" for sub in nat.get("subnetworks", []): sub["name"] = self.tf_ref("subnet", sub["name"]) cr["project"] = self.tf_ref("project", cr["project"]) cr["network"] = self.tf_ref("network", vpc_name) CloudRouter( self, f"cr_{vpc_re_name}", **cr, ) def generate_networks(self, my_resource, resource): for vpc in self.eztf_config.get(my_resource, []): create_network(self, vpc) generate_routers(self, f"router_{my_resource}", "router") def generate_firewalls(self, my_resource, resource): for fw in self.eztf_config.get(my_resource, []): create_firewall(self, fw) def generate_peerings(self, my_resource, resource): peer_module_depends_on = [] for pair in self.eztf_config.get(my_resource, []): peer_module_depends_on = create_peering(self, pair, peer_module_depends_on) def generate_routers(self, my_resource, resource): self.file_seprator_variable(my_resource) for cr in self.eztf_config.get(my_resource, []): create_router(self, cr) def add_subnets(self, my_resource, resource): self.added["subnets"] = self.added.get("subnets", {}) for vpc in self.eztf_config.get(my_resource, []): for sub in vpc.get("subnets", []): region, subnet = sub["subnet_region"], sub["subnet_name"] self.added["subnets"][f"{region}/{subnet}"] = vpc["network_name"] def generate_fw_policy_rh(self, my_resource, resource): for fw in self.eztf_config.get(my_resource, []): create_fw_policy_rh(self, fw) def generate_fw_policy_nw(self, my_resource, resource): for fw in self.eztf_config.get(my_resource, []): create_fw_policy_nw(self, fw)