generate/resources/_logging.py (115 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from imports.log_export import LogExport
from imports.logbucket import Logbucket
from imports.logpubsub import Logpubsub
from imports.logstorage import Logstorage
from imports.logproject import Logproject
from imports.logbigquery import Logbigquery
log_destination_fn = {
"pubsub": Logpubsub,
"storage": Logstorage,
"logbucket": Logbucket,
"bigquery": Logbigquery,
"project": Logproject,
"logpubsub": Logpubsub,
"logstorage": Logstorage,
"logbigquery": Logbigquery,
"logproject": Logproject,
}
def create_centralized_logging(self, logconfig):
"""creates centralized logging"""
logsink = logconfig["logsink"]
log_destination_type = (set(logconfig.keys()) - {"logsink"}).pop()
log_destination = logconfig[log_destination_type]
logsink["parent_resource_id"] = self.tf_ref(
logsink["parent_resource_type"],
logsink.get("parent_resource_id", ""),
)
# log_sink_writer_identity not actually used, added because required field
log_destination["log_sink_writer_identity"] = (
"serviceAccount:cloud-logs@system.gserviceaccount.com"
)
sink_name = logsink["log_sink_name"]
sink_id = f"logsink-{sink_name}"
log_destination["project_id"] = self.tf_ref(
"project", log_destination["project_id"]
)
created_log_destination = log_destination_fn[log_destination_type](
self,
f"{log_destination_type}-{sink_name}",
**log_destination,
)
created_log_destination.add_override(
"log_sink_writer_identity", f"${{module.{sink_id}.writer_identity}}"
)
LogExport(
self,
sink_id,
destination_uri=created_log_destination.destination_uri_output,
**logsink,
)
def generate_logging(self, my_resource, resource):
for lx in self.eztf_config.get(my_resource, []):
create_centralized_logging(self, lx)
log_destination_name_map = {
"logpubsub": "topic_name",
"logstorage": "storage_bucket_name",
"logbucket": "name",
"logbigquery": "dataset_name",
"logproject": "project_id",
}
def create_log_destination(self, log_destination, log_dest_type):
writer_identity = log_destination.get("log_sink_writer_identity", "")
name_key = log_destination_name_map.get(log_dest_type)
dest_name = log_destination[name_key]
dest_id = f"{log_dest_type}-{dest_name}"
sink_name = self.added.get("log_destination", {}).get(dest_id)
sink_id = f"logsink-{sink_name}" if sink_name else None
log_destination["project_id"] = self.tf_ref(
"project", log_destination["project_id"]
)
if not writer_identity:
# backup log_sink_writer_identity required field
log_destination["log_sink_writer_identity"] = (
"serviceAccount:cloud-logs@system.gserviceaccount.com"
)
self.created["log_destination"][dest_id] = log_destination_fn[log_dest_type](
self,
dest_id,
**log_destination,
)
if not writer_identity and sink_id:
self.created["log_destination"][dest_id].add_override(
"log_sink_writer_identity", f"${{module.{sink_id}.writer_identity}}"
)
def create_logsink(self, logsink):
log_dest_type = logsink["log_destination_type"]
sink_name = logsink["log_sink_name"]
dest_uri = logsink["destination_uri"]
sink_id = f"logsink-{sink_name}"
dest_id = f"{log_dest_type}-{dest_uri}"
del logsink["log_destination_type"]
if log_dest_type == "logproject":
create_log_destination(self, {"project_id": dest_uri}, "logproject")
logsink["parent_resource_id"] = self.tf_ref(
logsink["parent_resource_type"],
logsink.get("parent_resource_id", ""),
)
logsink["destination_uri"] = self.tf_ref("log_destination", dest_id, dest_uri)
LogExport(
self,
sink_id,
**logsink,
)
def generate_logsink(self, my_resource, resource):
self.created["log_destination"] = self.created.get("log_destination", {})
for sink in self.eztf_config.get(my_resource, []):
create_logsink(self, sink)
def generate_log_destination(self, my_resource, resource):
self.created["log_destination"] = self.created.get("log_destination", {})
for log_dest in self.eztf_config.get(my_resource, []):
create_log_destination(self, log_dest, resource)
def add_dest_sink_map(self, my_resource, resource):
self.added["log_destination"] = self.added.get("log_destination", {})
for logsink in self.eztf_config.get(my_resource, []):
log_dest_type = logsink["log_destination_type"]
sink_name = logsink["log_sink_name"]
dest_uri = logsink["destination_uri"]
dest_id = f"{log_dest_type}-{dest_uri}"
self.added["log_destination"][dest_id] = sink_name