5-app-infra/3-artifact-publish/docker/cdmc/report_engine/AssetsScope.py (43 lines of code) (raw):
# Copyright 2023 Google, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pubsub_handler import publishPubSubAvro,publishPubSubAvroBatch
from DataCatalogAPI import searchCatalogAssets
from BigQueryAPI import getBQAssets,extractTableId
import configparser
class AssetsScope:
def __init__(self,org_id,project_id,report_metadata,config_file) -> None:
self.org_id = org_id
self.project_id = project_id
self.report_metadata = report_metadata
self.config_file = config_file
def publishAssets(self, execution_timestamp):
config = configparser.ConfigParser()
config.read_string(self.config_file)
full_list = getBQAssets(self.project_id,str(config["ASSETS_SCOPE"]["region"]))
sensitive_list = []
result_list = []
dc_results = searchCatalogAssets(self.org_id,self.project_id, str(config["ASSETS_SCOPE"]["filter"]))
for result in dc_results:
sensitive_list.append(extractTableId(result.linked_resource))
for asset in full_list:
if asset in sensitive_list:
result_list.append(
{
"event_uuid":self.report_metadata["uuid"],
"asset_name":asset,
"sensitive": True,
"event_timestamp":execution_timestamp
})
else:
result_list.append(
{
"event_uuid":self.report_metadata["uuid"],
"asset_name":asset,
"sensitive": False,
"event_timestamp":execution_timestamp
})
publishPubSubAvroBatch(str(config["ASSETS_SCOPE"]["project_pubsub"]),
str(config["ASSETS_SCOPE"]["topic"]),
str(config["ASSETS_SCOPE"]["avsc"]),
str(config["ASSETS_SCOPE"]["batch_max_size"]),
str(config["ASSETS_SCOPE"]["batch_max_bytes"]),
str(config["ASSETS_SCOPE"]["batch_max_latency"]),
result_list)