5-app-infra/3-artifact-publish/docker/cdmc/report_engine/Control9.py (76 lines of code) (raw):

# Copyright 2023 Google, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from pubsub_handler import publishPubSubAvro from DataCatalogAPI import searchCatalogAssets, getColumnTagDict, getTableTagValue from BigQueryAPI import queryTable, getTableLocation import configparser import time from collections import defaultdict class Control9: def __init__(self,org_id,project_id,topicProjectId,topic,avsc_file,report_metadata, config_file) -> None: self.org_id = org_id self.project_id = project_id self.topic_project_id = topicProjectId self.topic = topic self.avsc_file = avsc_file self.report_metadata = report_metadata self.config_file = config_file def generateReport(self): config = configparser.ConfigParser() config.read_string(self.config_file) print("Verifying Control 9" ) #Data asset is sensitive - table level tag dc_results = searchCatalogAssets(self.org_id,self.project_id, str(config["DC_FILTERS"]["Control9"])) bq_sec_encrypt = queryTable(str(config["SQL"]["project_id_9"]), str(config["SQL"]["dataset_9"]), str(config["SQL"]["sql_file_9_encrypt"])) dict_encrypt = defaultdict(list) for row in bq_sec_encrypt: dict_encrypt[row["sensitive_category"].upper()+ "-" +row["pm_geo"].upper()].append(row["encrypt_method"].upper()) dict_encrypt[row["sensitive_category"].upper()+ "-" +row["pm_geo"].upper()].append(row["default_encrypt_method"].upper()) bq_sec_dedid = queryTable(str(config["SQL"]["project_id_9"]), str(config["SQL"]["dataset_9"]), str(config["SQL"]["sql_file_9_deid"])) dict_deid = defaultdict(list) for row in bq_sec_dedid: dict_deid[row["sensitive_category"].upper()+ "-" +row["pm_geo"].upper()].append(row["deid_method"].upper()) dict_deid[row["sensitive_category"].upper()+ "-" +row["pm_geo"].upper()].append(row["default_deid_method"].upper()) for result in dc_results: print("Retrieving information for: ",result.fully_qualified_name) table_location = getTableLocation(result.linked_resource) table_sensitivity = getTableTagValue(result.relative_resource_name,str(config["TAGS"]["Control9_tag_table_sensitivity"]), str(config["TAGS"]["Control9_display_table_sensivity"]), "stringValue") asset_encrypt = getTableTagValue(result.relative_resource_name,str(config["TAGS"]["Control9_tag_encrypt"]), str(config["TAGS"]["Control9_display_encrypt"]), "stringValue") columns_sensitivity_dict = getColumnTagDict(result.relative_resource_name, str(config["TAGS"]["Control9_tag_column_sensitivity"]), str(config["TAGS"]["Control9_display_table_sensivity"]),"boolValue") columns_security_deid = getColumnTagDict(result.relative_resource_name, str(config["TAGS"]["Control9_tag_column_deid"]), str(config["TAGS"]["Control9_display_column_deid"]),"stringValue") #ASSET WITH INCORRECT ENCRYPTION if asset_encrypt not in dict_encrypt[table_sensitivity.upper() + "-" + table_location.upper()]: message_enc = { "reportMetadata":self.report_metadata, "CdmcControlNumber":9, "Findings":str(config["FINDINGS"]["Control9_1"]), "DataAsset":str(result.linked_resource), "RecommendedAdjustment":str(config["RECOMMENDATIONS"]["Control9_1"]), "ExecutionTimestamp":str(time.time()) } print("|---- Finding in asset:" + result.linked_resource) publishPubSubAvro(self.topic_project_id,self.topic,self.avsc_file,message_enc) for key in columns_sensitivity_dict: #IF SENSITIVE WITHOUT DEID OR SENSITIVE WITHOUT APP DEID OR if (key not in columns_security_deid.keys()): message_no_enc = { "reportMetadata":self.report_metadata, "CdmcControlNumber":9, "Findings":str(config["FINDINGS"]["Control9_4"]), "DataAsset":str(result.linked_resource), "RecommendedAdjustment":str(config["RECOMMENDATIONS"]["Control9_4"]), "ExecutionTimestamp":str(time.time()) } print("|---- Finding in asset:" + result.linked_resource) publishPubSubAvro(self.topic_project_id,self.topic,self.avsc_file,message_no_enc) else: #COLUMN ASSIGNED DEID IS NOT IN TABLE CONTROL IN BQ if (columns_security_deid[key] not in dict_deid[table_sensitivity.upper() + "-" + table_location.upper()]): message_deid = { "reportMetadata":self.report_metadata, "CdmcControlNumber":9, "Findings":str(config["FINDINGS"]["Control9_2"]), "DataAsset":str(result.linked_resource), "RecommendedAdjustment":str(config["RECOMMENDATIONS"]["Control9_2"]) + "for the field: " + key, "ExecutionTimestamp":str(time.time()) } print("|---- Finding in asset:" + result.linked_resource) publishPubSubAvro(self.topic_project_id,self.topic,self.avsc_file,message_deid)