generate/resources/_vpcsc.py (96 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from cdktf_cdktf_provider_google.access_context_manager_access_policy import ( AccessContextManagerAccessPolicy, ) from imports.sc_access_level import ScAccessLevel from imports.sc_perimeter import ScPerimeter from imports.sc_perimeter_bridge import ScPerimeterBridge from cdktf_cdktf_provider_null.resource import Resource from cdktf_cdktf_provider_null.provider import NullProvider from cdktf import LocalExecProvisioner def create_null_resource(self, depends_on): """creates null resource""" NullProvider(self, "null") self.created["null"]["wait_for_members"] = Resource( self, "wait_for_members", provisioners=[LocalExecProvisioner(type="local-exec", command="sleep 60")], ) self.created["null"]["wait_for_members"].add_override( "depends_on", [name for name in depends_on] ) def create_sc_policy(self, data): """creates sc policy""" name = data["title"] data["parent"] = f'organizations/{self.tf_ref("organization", "")}' scopes = data.get("scopes", []) new_scopes = [] for scope in scopes: tf_scope = self.tf_ref("projects/number", scope) if tf_scope == scope: tf_scope = self.tf_ref("folder", scope) new_scopes.append(tf_scope) if new_scopes: data["scopes"] = new_scopes self.created["sc_policy"][name] = AccessContextManagerAccessPolicy( self, f"sc_policy_{name}", **data ) def create_sc_access_level(self, data): """creates sc access level""" name = data["name"] data["policy"] = self.tf_ref("sc_policy", data["policy"]) self.created["sc_access_level"][name] = ScAccessLevel( self, f"sc_access_level_{name}", **data ) def create_sc_perimeter(self, data): """creates sc perimeter""" name = data["perimeter_name"] data["policy"] = self.tf_ref("sc_policy", data["policy"]) data["description"] = ( f'regular perimeter {name} {self.created["null"]["wait_for_members"].id}' ) perimeter_al = [] for al in data.get("access_levels", []): tf_al = self.tf_ref("sc_access_level_name", al) perimeter_al.append(tf_al) if perimeter_al: data["access_levels"] = perimeter_al for policy in data.get("ingress_policies", []): ingress_from_resources = [] ingress_from_al = [] for resources in policy.get("from", {}).get("sources", {}).get("resources", []): tf_resources = self.tf_ref("projects/number", resources) ingress_from_resources.append(tf_resources) if ingress_from_resources: policy["from"]["sources"]["resources"] = ingress_from_resources for al in policy.get("from", {}).get("sources", {}).get("access_levels", []): tf_al = self.tf_ref("sc_access_level_name", al) ingress_from_al.append(tf_al) if ingress_from_al: policy["from"]["sources"]["access_levels"] = ingress_from_al for policy_name in ["ingress_policies", "egress_policies"]: for policy in data.get(policy_name, []): to_resources = [] for resources in policy.get("to", {}).get("resources", []): tf_resources = self.tf_ref("projects/number", resources) to_resources.append(tf_resources) if to_resources: policy["to"]["resources"] = to_resources ScPerimeter(self, f"sc_perimiter_{name}", **data) def create_sc_perimeter_bridge(self, data): """creates sc perimeter bridge""" name = data["perimeter_name"] data["description"] = ( f'regular perimeter {name} {self.created["null"]["wait_for_members"].id}' ) data["policy"] = self.tf_ref("sc_policy", data["policy"]) ScPerimeterBridge(self, f"sc_bridge_{name}", **data) def generate_sc_policy(self, my_resource, resource): """creates sc policy""" for data in self.eztf_config.get(my_resource, []): create_sc_policy(self, data) def generate_sc_access_level(self, my_resource, resource): """creates sc access level""" depends_on = [] for data in self.eztf_config.get(my_resource, []): create_sc_access_level(self, data) depends_on.append(f'${{module.sc_access_level_{data["name"]}}}') create_null_resource(self, depends_on) def generate_sc_perimeter(self, my_resource, resource): """creates sc perimeter""" for data in self.eztf_config.get(my_resource, []): create_sc_perimeter(self, data) def generate_sc_perimeter_bridge(self, my_resource, resource): """creates sc perimeter bridge""" for data in self.eztf_config.get(my_resource, []): create_sc_perimeter_bridge(self, data)