DataCatalogController.py (1,493 lines of code) (raw):

# Copyright 2020-2024 Google, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import requests, configparser, time from datetime import datetime, date from datetime import time as dtime import pytz from operator import itemgetter import pandas as pd from pyarrow import parquet import json import os from google.api_core.gapic_v1.client_info import ClientInfo from google.protobuf.timestamp_pb2 import Timestamp from google.cloud import datacatalog from google.cloud.datacatalog_v1 import types from google.cloud.datacatalog import DataCatalogClient from google.cloud import bigquery from google.cloud import storage import Resources as res import BigQueryUtils as bq import constants from common import log_error, log_error_tag_dict, log_info, log_info_tag_dict config = configparser.ConfigParser() config.read("tagengine.ini") BIGQUERY_REGION = config['DEFAULT']['BIGQUERY_REGION'] USER_AGENT = 'cloud-solutions/datacatalog-tag-engine-v2' class DataCatalogController: def __init__(self, credentials, tag_creator_account=None, tag_invoker_account=None, \ template_id=None, template_project=None, template_region=None): self.credentials = credentials self.tag_creator_account = tag_creator_account self.tag_invoker_account = tag_invoker_account self.template_id = template_id self.template_project = template_project self.template_region = template_region self.client = DataCatalogClient(credentials=self.credentials, client_info=ClientInfo(user_agent=USER_AGENT)) if template_id != None and template_project != None and template_region != None: self.template_path = self.client.tag_template_path(template_project, template_region, template_id) else: self.template_path = None self.bq_client = bigquery.Client(credentials=self.credentials, location=BIGQUERY_REGION, client_info=ClientInfo(user_agent=USER_AGENT)) self.gcs_client = storage.Client(credentials=self.credentials, client_info=ClientInfo(user_agent=USER_AGENT)) self.ptm_client = datacatalog.PolicyTagManagerClient(credentials=self.credentials, client_info=ClientInfo(user_agent=USER_AGENT)) def get_template(self, included_fields=None): fields = [] try: tag_template = self.client.get_tag_template(name=self.template_path) except Exception as e: msg = 'Error retrieving tag template {}'.format(self.template_path) log_error(msg, e) return fields for field_id, field_value in tag_template.fields.items(): field_id = str(field_id) if included_fields: match_found = False for included_field in included_fields: if included_field['field_id'] == field_id: match_found = True if 'field_value' in included_field: assigned_value = included_field['field_value'] else: assigned_value = None if 'query_expression' in included_field: query_expression = included_field['query_expression'] else: query_expression = None break if match_found == False: continue display_name = field_value.display_name is_required = field_value.is_required order = field_value.order enum_values = [] field_type = None if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.DOUBLE: field_type = "double" if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.STRING: field_type = "string" if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.BOOL: field_type = "bool" if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.TIMESTAMP: field_type = "datetime" if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.RICHTEXT: field_type = "richtext" if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.PRIMITIVE_TYPE_UNSPECIFIED: field_type = "enum" index = 0 enum_values_long = str(field_value.type_).split(":") for long_value in enum_values_long: if index > 0: enum_value = long_value.split('"')[1] #print("enum value: " + enum_value) enum_values.append(enum_value) index = index + 1 # populate dict field = {} field['field_id'] = field_id field['display_name'] = display_name field['field_type'] = field_type field['is_required'] = is_required field['order'] = order if field_type == "enum": field['enum_values'] = enum_values if included_fields: if assigned_value: field['field_value'] = assigned_value if query_expression: field['query_expression'] = query_expression fields.append(field) return sorted(fields, key=itemgetter('order'), reverse=True) def check_if_tag_exists(self, parent, column=None): print(f'enter check_if_tag_exists, parent: {parent}') tag_exists = False tag_id = "" tag_list = self.client.list_tags(parent=parent, timeout=120) for tag_instance in tag_list: tagged_column = tag_instance.column tagged_template_project = tag_instance.template.split('/')[1] tagged_template_location = tag_instance.template.split('/')[3] tagged_template_id = tag_instance.template.split('/')[5] if column == '' or column == None: # looking for a table-level tag if tagged_template_id == self.template_id and tagged_template_project == self.template_project and \ tagged_template_location == self.template_region and tagged_column == "": tag_exists = True tag_id = tag_instance.name break else: # looking for a column-level tag if column.lower() == tagged_column and tagged_template_id == self.template_id and tagged_template_project == self.template_project and \ tagged_template_location == self.template_region: tag_exists = True tag_id = tag_instance.name break return tag_exists, tag_id def apply_static_asset_config(self, fields, uri, job_uuid, config_uuid, template_uuid, tag_history, overwrite=False): # uri is either a BQ table/view path or GCS file path op_status = constants.SUCCESS column = '' is_gcs = False is_bq = False # look up the entry based on the resource type if isinstance(uri, list): is_gcs = True bucket = uri[0].replace('-', '_') filename = uri[1].split('.')[0].replace('/', '_') # extract the filename without extension, replace '/' with '_' gcs_resource = '//datacatalog.googleapis.com/projects/' + self.template_project + '/locations/' + self.template_region + '/entryGroups/' + bucket + '/entries/' + filename print('gcs_resource: ', gcs_resource) request = datacatalog.LookupEntryRequest() request.linked_resource=gcs_resource uri = '/'.join(uri) #print('uri:', uri) try: entry = self.client.lookup_entry(request) print('GCS entry:', entry.name) except Exception as e: msg = 'Unable to find the entry in the catalog. Entry {} does not exist'.format(gcs_resource) log_error(msg, e) op_status = constants.ERROR return op_status elif isinstance(uri, str): is_bq = True bigquery_resource = '//bigquery.googleapis.com/projects/' + uri print("bigquery_resource: " + bigquery_resource) request = datacatalog.LookupEntryRequest() request.linked_resource=bigquery_resource entry = self.client.lookup_entry(request) print('entry: ', entry.name) try: tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name) print('tag exists: ', tag_exists) except Exception as e: msg = 'Error during check_if_tag_exists {}'.format(entry.name) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status if tag_exists and overwrite == False: msg = 'Tag already exists and overwrite is False' log_info(msg) op_status = constants.SUCCESS return op_status op_status = self.create_update_delete_tag(fields, tag_exists, tag_id, job_uuid, config_uuid, 'STATIC_ASSET_TAG', tag_history, entry, uri) return op_status def apply_dynamic_table_config(self, fields, uri, job_uuid, config_uuid, template_uuid, tag_history, batch_mode=False): print('*** apply_dynamic_table_config ***') op_status = constants.SUCCESS error_exists = False bigquery_resource = '//bigquery.googleapis.com/projects/' + uri #print('bigquery_resource: ', bigquery_resource) request = datacatalog.LookupEntryRequest() request.linked_resource=bigquery_resource entry = self.client.lookup_entry(request) tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name) print("tag_exists: " + str(tag_exists)) # create new tag tag = datacatalog.Tag() tag.template = self.template_path verified_field_count = 0 for field in fields: field_id = field['field_id'] field_type = field['field_type'] query_expression = field['query_expression'] # parse and run query in BQ query_str = self.parse_query_expression(uri, query_expression) print('returned query_str: ' + query_str) # note: field_values is of type list field_values, error_exists = self.run_query(query_str, field_type, batch_mode, job_uuid) print('field_values: ', field_values) print('error_exists: ', error_exists) if error_exists or field_values == []: continue tag, error_exists = self.populate_tag_field(tag, field_id, field_type, field_values, job_uuid) if error_exists: continue verified_field_count = verified_field_count + 1 #print('verified_field_count: ' + str(verified_field_count)) # store the value back in the dict, so that it can be accessed by the exporter #print('field_value: ' + str(field_value)) if field_type == 'richtext': formatted_value = ', '.join(str(v) for v in field_values) else: formatted_value = field_values[0] field['field_value'] = formatted_value # for loop ends here if error_exists: # error was encountered while running SQL expression # proceed with tag creation / update, but return error to user op_status = constants.ERROR if verified_field_count == 0: # tag is empty due to errors, skip tag creation op_status = constants.ERROR return op_status if tag_exists == True: tag.name = tag_id op_status = self.do_create_update_delete_action(job_uuid, 'update', tag) else: op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry) if op_status == constants.SUCCESS and tag_history: bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION) template_fields = self.get_template() bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, None, fields) return op_status def column_exists_in_table(self, target_column, entry_columns): column_exists = False for catalog_column in entry_columns: #print('column:', catalog_column.column) #print('subcolumns:', catalog_column.subcolumns) is_nested_column = False # figure out if column is nested if len(target_column.split('.')) > 1: is_nested_column = True parent_column = target_column.split('.')[0] nested_column = target_column.split('.')[1] if is_nested_column == True: if catalog_column.column == parent_column: for subcolumn in catalog_column.subcolumns: if nested_column == subcolumn.column: column_exists = True break else: if catalog_column.column == target_column: column_exists = True break return column_exists def apply_dynamic_column_config(self, fields, columns_query, uri, job_uuid, config_uuid, template_uuid, tag_history, batch_mode=False): print('*** apply_dynamic_column_config ***') tag_work_queue = [] # collection of Tag objects that will be passed to the API to be created or updated op_status = constants.SUCCESS error_exists = False target_columns = [] # columns in the table which need to be tagged columns_query = self.parse_query_expression(uri, columns_query) print('columns_query:', columns_query) rows = self.bq_client.query(columns_query).result() num_columns = 0 for row in rows: for column in row: print('column:', column) target_columns.append(column) num_columns += 1 if num_columns == 0: # no columns to tag msg = f"Error could not find columns to tag. Please check column_query parameter in your config. Current value: {columns_query}" log_error(msg, None, job_uuid) op_status = constants.ERROR return op_status #print('columns to be tagged:', target_columns) bigquery_resource = '//bigquery.googleapis.com/projects/' + uri #print('bigquery_resource: ', bigquery_resource) request = datacatalog.LookupEntryRequest() request.linked_resource=bigquery_resource entry = self.client.lookup_entry(request) column_fields_list = [] # list<dictionaries> where dict = {column, fields} for target_column in target_columns: #print('target_column:', target_column) # fail quickly if a column is not found in the entry's schema column_exists = self.column_exists_in_table(target_column, entry.schema.columns) if column_exists != True: msg = f"Error could not find column {target_column} in {resource}" log_error(msg, None, job_uuid) op_status = constants.ERROR return op_status # initialize the new column-level tag tag = datacatalog.Tag() tag.template = self.template_path tag.column = target_column verified_field_count = 0 query_strings = [] for field in fields: query_expression = field['query_expression'] query_str = self.parse_query_expression(uri, query_expression, target_column) query_strings.append(query_str) # combine query expressions combined_query = self.combine_queries(query_strings) # run combined query, adding the results to the field_values for each field # Note: field_values is of type list fields, error_exists = self.run_combined_query(combined_query, target_column, fields, job_uuid) if error_exists: op_status = constants.ERROR continue # populate tag fields tag, error_exists = self.populate_tag_fields(tag, fields, job_uuid) if error_exists: op_status = constants.ERROR continue column_fields_list.append({"column": target_column, "fields": fields}) tag_work_queue.append(tag) # outer loop ends here if len(tag_work_queue) == 0: op_status = constants.ERROR return op_status # ready to create or update all the tags in work queue rec_request = datacatalog.ReconcileTagsRequest( parent=entry.name, tag_template=self.template_path, tags=tag_work_queue ) #print('rec_request:', rec_request) try: operation = self.client.reconcile_tags(request=rec_request) print("Waiting for operation to complete...") resp = operation.result() #print("resp:", resp) except Exception as e: msg = 'Error during reconcile_tags on entry {}'.format(entry.name) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status if tag_history and op_status != constants.ERROR: bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION) success = bqu.copy_tags(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, self.get_template(), uri, column_fields_list) print('Tag history completed successfully:', success) if success: op_status = constants.SUCCESS else: op_status = constants.ERROR return op_status def combine_queries(self, query_strings): large_query = "select " for query in query_strings: large_query += "({}), ".format(query) return large_query[0:-2] def apply_entry_config(self, fields, uri, job_uuid, config_uuid, template_uuid, tag_history): print('** apply_entry_config **') op_status = constants.SUCCESS bucket_name, filename = uri bucket = self.gcs_client.get_bucket(bucket_name) blob = bucket.get_blob(filename) entry_group_short_name = bucket_name.replace('-', '_') entry_group_full_name = 'projects/' + self.template_project + '/locations/' + self.template_region + '/entryGroups/' + bucket_name.replace('-', '_') # create the entry group is_entry_group = self.entry_group_exists(entry_group_full_name) print('is_entry_group: ', is_entry_group) if is_entry_group != True: self.create_entry_group(entry_group_short_name) # generate the entry id, replace '/' with '_' and remove the file extension from the name entry_id = filename.split('.')[0].replace('/', '_') try: entry_name = entry_group_full_name + '/entries/' + entry_id print('Info: entry_name: ', entry_name) entry = self.client.get_entry(name=entry_name) print('Info: entry already exists: ', entry.name) except Exception as e: msg = 'Entry does not exist {}'.format(entry_name) log_error(msg, e, job_uuid) # populate the entry entry = datacatalog.Entry() entry.name = filename entry.display_name = entry_id entry.type_ = 'FILESET' entry.gcs_fileset_spec.file_patterns = ['gs://' + bucket_name + '/' + filename] entry.fully_qualified_name = 'gs://' + bucket_name + '/' + filename entry.source_system_timestamps.create_time = datetime.utcnow() entry.source_system_timestamps.update_time = datetime.utcnow() # get the file's schema # download the file to App Engine's tmp directory tmp_file = '/tmp/' + entry_id blob.download_to_filename(filename=tmp_file) # validate that it's a parquet file try: parquet.ParquetFile(tmp_file) except Exception as e: # not a parquet file, ignore it msg = 'Error: {} is not a parquet file, ignoring it'.format(filename) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status schema = parquet.read_schema(tmp_file, memory_map=True) df = pd.DataFrame(({"column": name, "datatype": str(pa_dtype)} for name, pa_dtype in zip(schema.names, schema.types))) df = df.reindex(columns=["column", "datatype"], fill_value=pd.NA) #print('df: ', df) for index, row in df.iterrows(): entry.schema.columns.append( types.ColumnSchema( column=row['column'], type_=row['datatype'], description=None, mode=None ) ) # create the entry #print('entry request: ', entry) created_entry = self.client.create_entry(parent=entry_group_full_name, entry_id=entry_id, entry=entry) print('Info: created entry: ', created_entry.name) # get the number of rows in the file num_rows = parquet.ParquetFile(tmp_file).metadata.num_rows #print('num_rows: ', num_rows) # delete the tmp file ASAP to free up memory os.remove(tmp_file) # create the file metadata tag template_path = self.client.tag_template_path(self.template_project, self.template_region, self.template_id) tag = datacatalog.Tag() tag.template = template_path for field in fields: if field['field_id'] == 'name': string_field = datacatalog.TagField() string_field.string_value = filename tag.fields['name'] = string_field field['field_value'] = filename # field_value is used by the BQ exporter if field['field_id'] == 'bucket': string_field = datacatalog.TagField() string_field.string_value = bucket_name tag.fields['bucket'] = string_field field['field_value'] = bucket_name # field_value is used by the BQ exporter if field['field_id'] == 'path': string_field = datacatalog.TagField() string_field.string_value = 'gs://' + bucket_name + '/' + filename tag.fields['path'] = string_field field['field_value'] = 'gs://' + bucket_name + '/' + filename # field_value is used by the BQ exporter if field['field_id'] == 'type': enum_field = datacatalog.TagField() enum_field.enum_value.display_name = 'PARQUET' # hardcode file extension for now tag.fields['type'] = enum_field field['field_value'] = 'PARQUET' # field_value is used by the BQ exporter if field['field_id'] == 'size': double_field = datacatalog.TagField() double_field.double_value = blob.size tag.fields['size'] = double_field field['field_value'] = blob.size # field_value is used by the BQ exporter if field['field_id'] == 'num_rows': double_field = datacatalog.TagField() double_field.double_value = num_rows tag.fields['num_rows'] = double_field field['field_value'] = num_rows # field_value is used by the BQ exporter if field['field_id'] == 'created_time': datetime_field = datacatalog.TagField() datetime_field.timestamp_value = blob.time_created tag.fields['created_time'] = datetime_field field['field_value'] = blob.time_created # field_value is used by the BQ exporter if field['field_id'] == 'updated_time': datetime_field = datacatalog.TagField() datetime_field.timestamp_value = blob.time_created tag.fields['updated_time'] = datetime_field field['field_value'] = blob.time_created # field_value is used by the BQ exporter if field['field_id'] == 'storage_class': string_field = datacatalog.TagField() string_field.string_value = blob.storage_class tag.fields['storage_class'] = string_field field['field_value'] = blob.storage_class # field_value is used by the BQ exporter if field['field_id'] == 'content_encoding': if blob.content_encoding: string_field = datacatalog.TagField() string_field.string_value = blob.content_encoding tag.fields['content_encoding'] = string_field field['field_value'] = blob.content_encoding # field_value is used by the BQ exporter if field['field_id'] == 'content_language': if blob.content_language: string_field = datacatalog.TagField() string_field.string_value = blob.content_language tag.fields['content_language'] = string_field field['field_value'] = blob.content_language # field_value is used by the BQ exporter if field['field_id'] == 'media_link': string_field = datacatalog.TagField() string_field.string_value = blob.media_link tag.fields['media_link'] = string_field field['field_value'] = blob.media_link # field_value is used by the BQ exporter #print('tag request: ', tag) created_tag = self.client.create_tag(parent=entry_name, tag=tag) #print('created_tag: ', created_tag) if tag_history: bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION) template_fields = self.get_template() bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, '/'.join(uri), None, fields) return op_status def entry_group_exists(self, entry_group_full_name): request = datacatalog.GetEntryGroupRequest(name=entry_group_full_name) try: response = self.client.get_entry_group(request=request) return True except Exception as e: msg = 'Error entry goup does not exist {}'.format(entry_group_full_name) log_error(msg, e) return False def create_entry_group(self, entry_group_short_name): eg = datacatalog.EntryGroup() eg.display_name = entry_group_short_name entry_group = self.client.create_entry_group( parent='projects/' + self.template_project + '/locations/' + self.template_region, entry_group_id=entry_group_short_name, entry_group=eg) print('created entry_group: ', entry_group.name) return entry_group.name def apply_glossary_asset_config(self, fields, mapping_table, uri, job_uuid, config_uuid, template_uuid, tag_history, overwrite=False): # uri is either a BQ table/view path or GCS file path op_status = constants.SUCCESS is_gcs = False is_bq = False # look up the entry based on the resource type if isinstance(uri, list): is_gcs = True bucket = uri[0].replace('-', '_') filename = uri[1].split('.')[0].replace('/', '_') # extract the filename without the extension, replace '/' with '_' gcs_resource = '//datacatalog.googleapis.com/projects/' + self.template_project + '/locations/' + self.template_region + '/entryGroups/' + bucket + '/entries/' + filename #print('gcs_resource: ', gcs_resource) request = datacatalog.LookupEntryRequest() request.linked_resource=gcs_resource try: entry = self.client.lookup_entry(request) print('entry: ', entry.name) except Exception as e: msg = 'Unable to find entry in the catalog. Entry {} does not exist: {}'.format(gcs_resource, e) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status #print('entry found: ', entry) elif isinstance(uri, str): is_bq = True bigquery_resource = '//bigquery.googleapis.com/projects/' + uri print("bigquery_resource: " + bigquery_resource) request = datacatalog.LookupEntryRequest() request.linked_resource=bigquery_resource entry = self.client.lookup_entry(request) print('entry: ', entry.name) try: tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name) print('tag_exists: ', tag_exists) except Exception as e: msg = 'Error during check_if_tag_exists: {}'.format(e) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status if tag_exists and overwrite == False: msg = 'Info: tag already exists and overwrite set to False' error = {'job_uuid': job_uuid, 'msg': msg} print(json.dumps(info)) op_status = constants.SUCCESS return op_status if entry.schema == None: msg = 'Error entry {} does not have a schema in the catalog'.format(entry.name) error = {'job_uuid': job_uuid, 'msg': msg} print(json.dumps(info)) op_status = constants.ERROR return op_status # retrieve the schema columns from the entry column_schema_str = '' for column_schema in entry.schema.columns: column_schema_str += "'" + column_schema.column + "'," #print('column_schema_str: ', column_schema_str) mapping_table_formatted = mapping_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.') query_str = 'select canonical_name from `' + mapping_table_formatted + '` where source_name in (' + column_schema_str[0:-1] + ')' #print('query_str: ', query_str) rows = self.bq_client.query(query_str).result() tag = datacatalog.Tag() tag.template = self.template_path tag_is_empty = True for row in rows: canonical_name = row['canonical_name'] #print('canonical_name: ', canonical_name) for field in fields: if field['field_id'] == canonical_name: #print('found match') bool_field = datacatalog.TagField() bool_field.bool_value = True tag.fields[canonical_name] = bool_field field['field_value'] = True tag_is_empty = False break if tag_is_empty: print("Error: can't create the tag because it's empty") op_status = constants.ERROR return op_status if tag_exists: # tag already exists and overwrite is True tag.name = tag_id op_status = self.do_create_update_delete_action(job_uuid, 'update', tag) else: op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry) if tag_history: bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION) template_fields = self.get_template() if is_gcs: bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, '/'.join(uri), None, fields) if is_bq: bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, None, fields) return op_status def apply_sensitive_column_config(self, fields, dlp_dataset, infotype_selection_table, infotype_classification_table, \ uri, create_policy_tags, taxonomy_id, job_uuid, config_uuid, template_uuid, \ tag_history, overwrite=False): if create_policy_tags: request = datacatalog.ListPolicyTagsRequest( parent=taxonomy_id ) try: page_result = self.ptm_client.list_policy_tags(request=request) except Exception as e: msg = 'Unable to retrieve the policy tag taxonomy for taxonomy_id {}'.format(taxonomy_id) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status policy_tag_names = [] # list of fully qualified policy tag names and sensitive categories for response in page_result: policy_tag_names.append((response.name, response.display_name)) policy_tag_requests = [] # stores the list of fully qualified policy tag names and table column names, # so that we can create the policy tags on the various sensitive fields # uri is a BQ table path op_status = constants.SUCCESS column = '' if isinstance(uri, str) == False: print('Error: url ' + str(url) + ' is not of type string.') op_status = constants.ERROR return op_status bigquery_resource = '//bigquery.googleapis.com/projects/' + uri #print("bigquery_resource: ", bigquery_resource) request = datacatalog.LookupEntryRequest() request.linked_resource=bigquery_resource try: entry = self.client.lookup_entry(request) except Exception as e: msg = 'Error looking up entry {} in the catalog: {}'.format(bigquery_resource, e) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status dlp_dataset = dlp_dataset.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.') infotype_selection_table = infotype_selection_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.') infotype_classification_table = infotype_classification_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.') dlp_table = dlp_dataset + '.' + uri.split('/')[4] infotype_fields = [] notable_infotypes = [] # get an array of infotypes associated with each field in the DLP findings table dlp_sql = 'select field, array_agg(infotype) infotypes ' dlp_sql += 'from (select distinct cl.record_location.field_id.name as field, info_type.name as infotype ' dlp_sql += 'from ' + dlp_table + ', unnest(location.content_locations) as cl ' dlp_sql += 'order by cl.record_location.field_id.name) ' dlp_sql += 'group by field' try: dlp_rows = self.bq_client.query(dlp_sql).result() except Exception as e: msg = 'Error querying DLP findings table: {}'.format(dlp_sql) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status dlp_row_count = 0 for dlp_row in dlp_rows: dlp_row_count += 1 field = dlp_row['field'] infotype_fields.append(field) infotypes = dlp_row['infotypes'] print('field ', field, ', infotypes [', infotypes, ']') is_sql = 'select notable_infotype ' is_sql += 'from ' + infotype_selection_table + ' i, ' infotype_count = len(infotypes) for i in range(0, infotype_count): is_sql += 'unnest(i.field_infotypes) as i' + str(i) + ', ' is_sql = is_sql[:-2] + ' ' for i, infotype in enumerate(infotypes): if i == 0: is_sql += 'where i' + str(i) + ' = "' + infotype + '" ' else: is_sql += 'and i' + str(i) + ' = "' + infotype + '" ' is_sql += 'order by array_length(i.field_infotypes) ' is_sql += 'limit 1' #print('is_sql: ', is_sql) try: ni_rows = self.bq_client.query(is_sql).result() except Exception as e: msg = 'Error querying infotype selection table: {}'.format(is_sql) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status for ni_row in ni_rows: notable_infotypes.append(ni_row['notable_infotype']) # there should be just one notable infotype per field # there are no DLP findings if dlp_row_count == 0: op_status = constants.SUCCESS return op_status # remove duplicate infotypes from notable list final_set = list(set(notable_infotypes)) print('final_set: ', final_set) # lookup classification using set of notable infotypes c_sql = 'select classification_result ' c_sql += 'from ' + infotype_classification_table + ' c, ' for i in range(0, len(final_set)): c_sql += 'unnest(c.notable_infotypes) as c' + str(i) + ', ' c_sql = c_sql[:-2] + ' ' for i, notable_infotype in enumerate(final_set): if i == 0: c_sql += 'where c' + str(i) + ' = "' + notable_infotype + '" ' else: c_sql += 'and c' + str(i) + ' = "' + notable_infotype + '" ' c_sql += 'order by array_length(c.notable_infotypes) ' c_sql += 'limit 1' #print('c_sql: ', c_sql) try: c_rows = self.bq_client.query(c_sql).result() except Exception as e: msg = 'Error querying infotype classification table: {}'.format(c_sql) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status classification_result = None for c_row in c_rows: classification_result = c_row['classification_result'] # we should end up with one classification result per table print('classification_result: ', classification_result) tag = datacatalog.Tag() tag.template = self.template_path # each element represents a field which needs to be tagged for infotype_field in infotype_fields: for field in fields: if 'sensitive_field' in field['field_id']: bool_field = datacatalog.TagField() if classification_result == 'Public_Information': bool_field.bool_value = False field['field_value'] = False else: bool_field.bool_value = True field['field_value'] = True tag.fields['sensitive_field'] = bool_field if 'sensitive_type' in field['field_id']: enum_field = datacatalog.TagField() enum_field.enum_value.display_name = classification_result tag.fields['sensitive_type'] = enum_field field['field_value'] = classification_result tag.column = infotype_field # DLP has a bug and sometimes the infotype field does not equal to the column name in the table print('tag.column: ', infotype_field) # check if a tag already exists on this column try: tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=infotype_field) except Exception as e: msg = 'Error during check_if_tag_exists: {}'.format(entry.name) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status # tag already exists if tag_exists: if overwrite == False: # skip this sensitive column because it is already tagged continue tag.name = tag_id op_status = self.do_create_update_delete_action(job_uuid, 'update', tag) else: op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry) if op_status == constants.SUCCESS and create_policy_tags and classification_result != 'Public_Information': # add the column name and policy tag name to a list for policy_tag_name, policy_tag_category in policy_tag_names: if policy_tag_category == classification_result: policy_tag_requests.append((infotype_field, policy_tag_name)) if op_status == constants.SUCCESS and tag_history: bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION) template_fields = self.get_template() bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, infotype_field, fields) # once we have created the regular tags, we can create/update the policy tags if create_policy_tags and len(policy_tag_requests) > 0: table_id = uri.replace('/datasets/', '.').replace('/tables/', '.') op_status = self.apply_policy_tags(table_id, policy_tag_requests) if op_status != constants.SUCCESS: msg = 'Error occurred when tagging {}'.format(uri) error = {'job_uuid': job_uuid, 'msg': msg} print(json.dumps(error)) return op_status def apply_policy_tags(self, table_id, policy_tag_requests): op_status = constants.SUCCESS table = self.bq_client.get_table(table_id) schema = table.schema new_schema = [] for field in schema: field_match = False for column, policy_tag_name in policy_tag_requests: if field.name == column: print('applying policy tag on', field.name) policy = bigquery.schema.PolicyTagList(names=[policy_tag_name,]) new_schema.append(bigquery.schema.SchemaField(field.name, field.field_type, field.mode, policy_tags=policy)) field_match = True break if field_match == False: new_schema.append(field) table.schema = new_schema try: table = self.bq_client.update_table(table, ["schema"]) except Exception as e: msg = 'Error occurred while updating the schema of {}'.format(table_id) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status def apply_export_config(self, config_uuid, target_project, target_dataset, target_region, uri): column_tag_records = [] table_tag_records = [] dataset_tag_records = [] export_status = constants.SUCCESS bqu = bq.BigQueryUtils(self.credentials, target_region) if isinstance(uri, str) == False: print('Error: url ' + str(url) + ' is not of type string.') export_status = constants.ERROR return export_status tagged_project = uri.split('/')[0] tagged_dataset = uri.split('/')[2] if '/tables/' in uri: target_table_id = 'catalog_report_table_tags' tagged_table = uri.split('/')[4] else: target_table_id = 'catalog_report_dataset_tags' tagged_table = None bigquery_resource = '//bigquery.googleapis.com/projects/' + uri #print("bigquery_resource: ", bigquery_resource) request = datacatalog.LookupEntryRequest() request.linked_resource=bigquery_resource try: entry = self.client.lookup_entry(request) except Exception as e: msg = 'Error looking up entry {} in catalog'.format(bigquery_resource) log_error(msg, e, job_uuid) export_status = constants.ERROR return export_status tag_list = self.client.list_tags(parent=entry.name, timeout=120) for tag in tag_list: print('tag.template:', tag.template) print('tag.column:', tag.column) # get tag template fields self.template_id = tag.template.split('/')[5] self.template_project = tag.template.split('/')[1] self.template_region = tag.template.split('/')[3] self.template_path = tag.template template_fields = self.get_template() if tag.column and len(tag.column) > 1: tagged_column = tag.column target_table_id = 'catalog_report_column_tags' else: tagged_column = None target_table_id = 'catalog_report_table_tags' for template_field in template_fields: #print('template_field:', template_field) field_id = template_field['field_id'] if field_id not in tag.fields: continue tagged_field = tag.fields[field_id] tagged_field_str = str(tagged_field) tagged_field_split = tagged_field_str.split('\n') #print('tagged_field_split:', tagged_field_split) split_index = 0 for split in tagged_field_split: if '_value:' in split: start_index = split.index(':', 0) + 1 #print('start_index:', start_index) field_value = split[start_index:].strip().replace('"', '').replace('<br>', ',') print('extracted field_value:', field_value) break elif 'enum_value' in split: field_value = tagged_field_split[split_index+1].replace('display_name:', '').replace('"', '').strip() print('extracted field_value:', field_value) break split_index += 1 # format record to be written current_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " UTC" if target_table_id in 'catalog_report_column_tags': column_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "table": tagged_table, "column": tagged_column, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts}) elif target_table_id in 'catalog_report_table_tags': table_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "table": tagged_table, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts}) elif target_table_id in 'catalog_report_dataset_tags': dataset_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts}) # write exported records to BQ if len(dataset_tag_records) > 0: target_table_id = target_project + '.' + target_dataset + '.catalog_report_dataset_tags' success = bqu.insert_exported_records(target_table_id, dataset_tag_records) if len(table_tag_records) > 0: target_table_id = target_project + '.' + target_dataset + '.catalog_report_table_tags' success = bqu.insert_exported_records(target_table_id, table_tag_records) if len(column_tag_records) > 0: target_table_id = target_project + '.' + target_dataset + '.catalog_report_column_tags' success = bqu.insert_exported_records(target_table_id, column_tag_records) return export_status def apply_import_config(self, job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history, overwrite=False): #print(f'apply_import_config: {job_uuid}, {config_uuid}, {data_asset_type}, {data_asset_region}, {tag_dict}, {tag_history}') op_status = constants.SUCCESS if 'project' in tag_dict: project = tag_dict['project'] else: msg = "Error: project info missing from CSV" log_error_tag_dict(msg, None, job_uuid, tag_dict) op_status = constants.ERROR return op_status if data_asset_type == constants.BQ_ASSET: if 'dataset' not in tag_dict: msg = "Error: could not find the required dataset field in the CSV" log_error_tag_dict(msg, None, job_uuid, tag_dict) op_status = constants.ERROR return op_status else: entry_type = constants.DATASET dataset = tag_dict['dataset'] if 'table' in tag_dict: table = tag_dict['table'] entry_type = constants.BQ_TABLE if data_asset_type == constants.FILESET_ASSET: if 'entry_group' not in tag_dict or 'fileset' not in tag_dict: msg = "Error: could not find the required fields in the CSV. Missing entry_group or fileset or both" log_error_tag_dict(msg, None, job_uuid, tag_dict) op_status = constants.ERROR return op_status else: entry_type = constants.FILESET entry_group = tag_dict['entry_group'] fileset = tag_dict['fileset'] if data_asset_type == constants.SPAN_ASSET: if 'instance' not in tag_dict or 'database' not in tag_dict or 'table' not in tag_dict: msg = "Error: could not find the required fields in the CSV. The required fields for Spanner are instance, database, and table" log_error_tag_dict(msg, None, job_uuid, tag_dict) op_status = constants.ERROR return op_status else: entry_type = constants.SPAN_TABLE instance = tag_dict['instance'] database = tag_dict['database'] if 'schema' in tag_dict: schema = tag_dict['schema'] table = tag_dict['table'] table = f"`{schema}.{table}`" else: table = tag_dict['table'] if entry_type == constants.DATASET: resource = f'//bigquery.googleapis.com/projects/{project}/datasets/{dataset}' request = datacatalog.LookupEntryRequest() request.linked_resource=resource if entry_type == constants.BQ_TABLE: resource = f'//bigquery.googleapis.com/projects/{project}/datasets/{dataset}/tables/{table}' request = datacatalog.LookupEntryRequest() request.linked_resource=resource if entry_type == constants.FILESET: resource = f'//datacatalog.googleapis.com/projects/{project}/locations/{data_asset_region}/entryGroups/{entry_group}/entries/{fileset}' request = datacatalog.LookupEntryRequest() request.linked_resource=resource if entry_type == constants.SPAN_TABLE: resource = f'spanner:{project}.regional-{data_asset_region}.{instance}.{database}.{table}' request = datacatalog.LookupEntryRequest() request.fully_qualified_name=resource request.project=project request.location=data_asset_region try: entry = self.client.lookup_entry(request) except Exception as e: msg = "Error could not find {} entry for {}".format(entry_type, resource) log_error_tag_dict(msg, e, job_uuid, tag_dict) op_status = constants.ERROR return op_status # format uri for storing in tag history table if data_asset_type == constants.BQ_ASSET: uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '') if data_asset_type == constants.SPAN_ASSET: uri = entry.linked_resource.replace('///projects/', '').replace('instances', 'instance').replace('databases', 'database') + '/table/' + table.replace('`', '') if data_asset_type == constants.FILESET_ASSET: uri = entry.linked_resource.replace('//datacatalog.googleapis.com/projects/', '').replace('locations', 'location').replace('entryGroups', 'entry_group').replace('entries', 'entry') target_column = None if 'column' in tag_dict: target_column = tag_dict['column'] column_exists = self.column_exists_in_table(target_column, entry.schema.columns) if column_exists == False: msg = f"Error could not find column {target_column} in {resource}" log_error_tag_dict(msg, None, job_uuid, tag_dict) op_status = constants.ERROR return op_status uri = uri + '/column/' + target_column try: tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=target_column) except Exception as e: msg = f"Error during check_if_tag_exists: {entry.name}" log_error_tag_dict(msg, e, job_uuid, tag_dict) op_status = constants.ERROR return op_status if tag_exists and overwrite == False: msg = "Info: Tag already exists and overwrite flag is False" log_info_tag_dict(msg, job_uuid, tag_dict) op_status = constants.SUCCESS return op_status tag_fields = [] template_fields = self.get_template() for field_name in tag_dict: if field_name == 'project' or field_name == 'dataset' or field_name == 'table' or \ field_name == 'column' or field_name == 'entry_group' or field_name == 'fileset' or \ field_name == 'instance' or field_name == 'database' or field_name == 'schema': continue field_type = None field_value = tag_dict[field_name].strip() for template_field in template_fields: if template_field['field_id'] == field_name: field_type = template_field['field_type'] break if field_type == None: print('Error while preparing the tag. The field ', field_name, ' was not found in the tag template ', self.template_id) op_status = constants.ERROR return op_status # this check allows for tags with empty enums to get created, otherwise the empty enum gets flagged because DC thinks that you are storing an empty string as the enum value if field_type == 'enum' and field_value == '': continue field = {'field_id': field_name, 'field_type': field_type, 'field_value': field_value} tag_fields.append(field) op_status = self.create_update_delete_tag(tag_fields, tag_exists, tag_id, job_uuid, config_uuid, 'IMPORT_TAG', tag_history, \ entry, uri, target_column) return op_status def apply_restore_config(self, job_uuid, config_uuid, tag_extract, tag_history, overwrite=False): op_status = constants.SUCCESS for json_obj in tag_extract: #print('json_obj: ', json_obj) entry_group = json_obj['entryGroupId'] entry_id = json_obj['id'] location_id = json_obj['locationId'] project_id = json_obj['projectId'] #print('entry_group: ', entry_group) #print('entry_id: ', entry_id) entry_name = 'projects/' + project_id + '/locations/' + location_id + '/entryGroups/' + entry_group + '/entries/' + entry_id print('entry_name: ', entry_name) try: entry = self.client.get_entry(name=entry_name) except Exception as e: msg = "Error couldn't find the entry: {}".format(entry_name) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status if 'columns' in json_obj: # column-level tag json_columns = json_obj['columns'] #print('json_columns: ', json_columns) for column_obj in json_columns: column_name = column_obj['name'].split(':')[1] column_tags = column_obj['tags'] fields = column_tags[0]['fields'] try: tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=column_name) except Exception as e: msg = 'Error during check_if_tag_exists:{}'.format(entry.name) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status if tag_exists and overwrite == False: msg = 'Info: Tag already exists and overwrite flag is False' info = {'job_uuid': job_uuid, 'msg': msg} print(json.dumps(info)) op_status = constants.SUCCESS return op_status # create or update column-level tag uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '') + '/column/' + column_name op_status = self.create_update_delete_tag(fields, tag_exists, tag_id, job_uuid, config_uuid, 'RESTORE_TAG', tag_history, \ entry, uri, column_name) if 'tags' in json_obj: # table-level tag json_tags = json_obj['tags'] fields = json_tags[0]['fields'] #print('fields: ', fields) try: tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column='') except Exception as e: msg = 'Error during check_if_tag_exists:{}'.format(entry.name) log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status if tag_exists and overwrite == False: msg = 'Info: Tag already exists and overwrite flag is False' info = {'job_uuid': job_uuid, 'msg': msg} print(json.dumps(info)) op_status = constants.SUCCESS return op_status # create or update table-level tag uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '') op_status = self.create_update_delete_tag(fields, tag_exists, tag_id, job_uuid, config_uuid, 'RESTORE_TAG', tag_history, \ entry, uri) return op_status # used by multiple apply methods def create_update_delete_tag(self, fields, tag_exists, tag_id, job_uuid, config_uuid, config_type, tag_history, entry, uri, column_name=''): op_status = constants.SUCCESS valid_field = False num_fields = len(fields) num_empty_values = 0 tag = datacatalog.Tag() tag.template = self.template_path for field in fields: if 'name' in field: valid_field = True field_id = field['name'] field_type = field['type'] field_value = field['value'] # rename the keys, which will be used by tag history if tag_history: field['field_id'] = field['name'] field['field_type'] = field['type'] field['field_value'] = field['value'] del field['name'] del field['type'] del field['value'] elif 'field_id' in field: valid_field = True field_id = field['field_id'] field_type = field['field_type'].upper() field_value = field['field_value'] else: # export file contains invalid tags (e.g. a tagged field without a name) continue # keep track of empty values if field_value == '': num_empty_values += 1 if field_type == 'BOOL': bool_field = datacatalog.TagField() if isinstance(field_value, str): if field_value == 'TRUE': bool_field.bool_value = True else: bool_field.bool_value = False else: bool_field.bool_value = field_value tag.fields[field_id] = bool_field if field_type == 'STRING': string_field = datacatalog.TagField() string_field.string_value = str(field_value) tag.fields[field_id] = string_field if field_type == 'DOUBLE': float_field = datacatalog.TagField() float_field.double_value = float(field_value) tag.fields[field_id] = float_field if field_type == 'RICHTEXT': richtext_field = datacatalog.TagField() richtext_field.richtext_value = field_value.replace(',', '<br>') tag.fields[field_id] = richtext_field # For richtext values, replace '<br>' with ',' when exporting to BQ field['field_value'] = field_value.replace('<br>', ', ') if field_type == 'ENUM': enum_field = datacatalog.TagField() enum_field.enum_value.display_name = field_value tag.fields[field_id] = enum_field if field_type == 'DATETIME' or field_type == 'TIMESTAMP': # field_value may be empty or date value e.g. "2022-05-08" or datetime value e.g. "2022-05-08 15:00:00" if field_value == '': timestamp = '' else: if len(field_value) == 10: d = date(int(field_value[0:4]), int(field_value[5:7]), int(field_value[8:10])) dt = datetime.combine(d, dtime(00, 00)) # when no time is supplied, default to 12:00:00 AM UTC else: # raw timestamp format: 2022-05-11 21:18:20 d = date(int(field_value[0:4]), int(field_value[5:7]), int(field_value[8:10])) t = dtime(int(field_value[11:13]), int(field_value[14:16])) dt = datetime.combine(d, t) utc = pytz.timezone('UTC') timestamp = utc.localize(dt) datetime_field = datacatalog.TagField() datetime_field.timestamp_value = timestamp tag.fields[field_id] = datetime_field field['field_value'] = timestamp # store this value back in the field, so it can be recorded in tag history # exported file from DataCatalog can have invalid tags, skip tag creation if that's the case if valid_field == False: msg = f"Invalid field {field}" log_error(msg, error='', job_uuid=job_uuid) op_status = constants.ERROR return op_status if column_name != '': tag.column = column_name if tag_exists == True: tag.name = tag_id # delete tag if every field in it is empty if num_fields == num_empty_values: op_status = self.do_create_update_delete_action(job_uuid, 'delete', tag) else: op_status = self.do_create_update_delete_action(job_uuid, 'update', tag) else: # create the table only if it has at least one non-empty fields if num_fields != num_empty_values: op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry) # only write to tag history if the operation was successful if tag_history and op_status == constants.SUCCESS: bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION) template_fields = self.get_template() success = bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, column_name, fields) if success == False: msg = 'Error occurred while writing to tag history table' log_error(msg, error='', job_uuid=job_uuid) op_status = constants.ERROR return op_status def do_create_update_delete_action(self, job_uuid, action, tag, entry=None): op_status = constants.SUCCESS try: print('do {}, tag: {}'.format(action, tag)) if action == 'delete': response = self.client.delete_tag(name=tag.name) if action == 'update': respect = self.client.update_tag(tag=tag) if action == 'create': response = self.client.create_tag(parent=entry.name, tag=tag) except Exception as e: msg = f'Error occurred during tag {action}: {tag}' log_error(msg, e, job_uuid) # if it's a quota issue, sleep and retry the operation if '429' in str(e) or '503' in str(e): msg = 'Info: sleep for 2 minutes due to {}'.format(e) log_info(msg, job_uuid) time.sleep(120) try: if action == 'delete': response = self.client.delete_tag(name=tag.name) if action == 'update': respect = self.client.update_tag(tag=tag) if action == 'create': response = self.client.create_tag(parent=entry.name, tag=tag) except Exception as e: msg = f'Error occurred during tag {action} after sleep: {tag}' log_error(msg, e, job_uuid) op_status = constants.ERROR return op_status else: op_status = constants.ERROR return op_status def search_catalog(self, bigquery_project, bigquery_dataset): linked_resources = {} scope = datacatalog.SearchCatalogRequest.Scope() scope.include_project_ids.append(bigquery_project) request = datacatalog.SearchCatalogRequest() request.scope = scope query = 'parent:' + bigquery_project + '.' + bigquery_dataset print('query string: ' + query) request.query = query request.page_size = 1 for result in self.client.search_catalog(request): print('result: ' + str(result)) resp = self.client.list_tags(parent=result.relative_resource_name) tags = list(resp.tags) tag_count = len(tags) index = result.linked_resource.rfind('/') table_name = result.linked_resource[index+1:] linked_resources[table_name] = tag_count return linked_resources def parse_query_expression(self, uri, query_expression, column=None): query_str = None # analyze query expression from_index = query_expression.rfind(" from ", 0) where_index = query_expression.rfind(" where ", 0) project_index = query_expression.rfind("$project", 0) dataset_index = query_expression.rfind("$dataset", 0) table_index = query_expression.rfind("$table", 0) from_clause_table_index = query_expression.rfind(" from $table", 0) from_clause_backticks_table_index = query_expression.rfind(" from `$table`", 0) column_index = query_expression.rfind("$column", 0) #print('table_index: ', table_index) #print('column_index: ', column_index) if project_index != -1: project_end = uri.find('/') project = uri[0:project_end] #print('project: ' + project) #print('project_index: ', project_index) if dataset_index != -1: dataset_start = uri.find('/datasets/') + 10 dataset_string = uri[dataset_start:] dataset_end = dataset_string.find('/') if dataset_end == -1: dataset = dataset_string[0:] else: dataset = dataset_string[0:dataset_end] print('dataset:', dataset) print('dataset_end:', dataset_end) print('dataset_index:', dataset_index) # $table referenced in from clause, use fully qualified table if from_clause_table_index > 0 or from_clause_backticks_table_index > 0: #print('$table referenced in from clause') qualified_table = uri.replace('/project/', '.').replace('/datasets/', '.').replace('/tables/', '.') #print('qualified_table:', qualified_table) #print('query_expression:', query_expression) query_str = query_expression.replace('$table', qualified_table) #print('query_str:', query_str) # $table is referenced somewhere in the expression, replace $table with actual table name else: if table_index != -1: #print('$table referenced somewhere, but not in the from clause') table_index = uri.rfind('/') + 1 table_name = uri[table_index:] #print('table_name: ' + table_name) query_str = query_expression.replace('$table', table_name) # $project referenced in where clause too if project_index > -1: if query_str == None: query_str = query_expression.replace('$project', project) else: query_str = query_str.replace('$project', project) #print('query_str: ', query_str) # $dataset referenced in where clause too if dataset_index > -1: if query_str == None: query_str = query_expression.replace('$dataset', dataset) else: query_str = query_str.replace('$dataset', dataset) print('query_str: ', query_str) # table not in query expression (e.g. select 'string') if table_index == -1 and query_str == None: query_str = query_expression if column_index != -1: if query_str == None: query_str = query_expression.replace('$column', column) else: query_str = query_str.replace('$column', column) #print('returning query_str:', query_str) return query_str def run_query(self, query_str, field_type, batch_mode, job_uuid): field_values = [] error_exists = False try: if batch_mode: batch_config = bigquery.QueryJobConfig( # run at batch priority which won't count toward concurrent rate limit priority=bigquery.QueryPriority.BATCH ) query_job = self.bq_client.query_and_wait(query_str, job_config=batch_config) job = self.bq_client.get_job(query_job.job_id, location=query_job.location) rows = job.result() else: print('query_str:', query_str) rows = self.bq_client.query_and_wait(query_str) # if query expression is well-formed, there should only be a single row returned with a single field_value # However, user may mistakenly run a query that returns a list of rows. In that case, grab only the top row. row_count = 0 for row in rows: row_count = row_count + 1 field_values.append(row[0]) if field_type != 'richtext' and row_count == 1: return field_values, error_exists # check row_count if row_count == 0: #error_exists = True print('sql query returned nothing:', query_str) except Exception as e: error_exists = True msg = 'Error occurred during run_query {}'.format(query_str) log_error(msg, e, job_uuid) #print('field_values: ', field_values) return field_values, error_exists def run_combined_query(self, combined_query, column, fields, job_uuid): error_exists = False try: rows = self.bq_client.query_and_wait(combined_query) row_count = 0 for row in rows: for i, field in enumerate(fields): field['field_value'] = row[i] row_count += 1 if row_count == 0: error_exists = True print('sql query returned empty set:', combined_query) except Exception as e: error_exists = True msg = 'Error occurred during run_combined_query {}'.format(combined_query) log_error(msg, e, job_uuid) return fields, error_exists def populate_tag_fields(self, tag, fields, job_uuid=None): for field in fields: tag, error_exists = self.populate_tag_field(tag, field['field_id'], field['field_type'], field['field_value'], job_uuid) return tag, error_exists def populate_tag_field(self, tag, field_id, field_type, field_values, job_uuid=None): error_exists = False # handle richtext types if type(field_values) == list: field_value = field_values[0] else: field_value = field_values if field_values == None: print('Cannot store null value in tag field', field_id) return tag, error_exists try: if field_type == "bool": bool_field = datacatalog.TagField() bool_field.bool_value = bool(field_value) tag.fields[field_id] = bool_field if field_type == "string": string_field = datacatalog.TagField() string_field.string_value = str(field_value) tag.fields[field_id] = string_field if field_type == "richtext": richtext_field = datacatalog.TagField() formatted_value = '<br>'.join(str(v) for v in field_values) richtext_field.richtext_value = str(formatted_value) tag.fields[field_id] = richtext_field if field_type == "double": float_field = datacatalog.TagField() float_field.double_value = float(field_value) tag.fields[field_id] = float_field if field_type == "enum": enum_field = datacatalog.TagField() enum_field.enum_value.display_name = field_value tag.fields[field_id] = enum_field if field_type == "datetime" or field_type == "timestamp": # expected format for datetime values in DC: 2020-12-02T16:34:14Z # however, field_value can be a date value e.g. "2022-05-08", a datetime value e.g. "2022-05-08 15:00:00" # or timestamp value e.g. datetime.datetime(2022, 9, 14, 18, 24, 31, 615000, tzinfo=datetime.timezone.utc) #print('field_value:', field_value) #print('field_value type:', type(field_value)) # we have a datetime value # example: 2024-03-30 18:29:48.621617+00:00 if type(field_value) == datetime: timestamp = Timestamp() timestamp.FromDatetime(field_value) # we have a date value elif type(field_value) == date: dt = datetime.combine(field_value, datetime.min.time()) timestamp = pytz.utc.localize(dt) # we have a date cast as a string elif len(str(field_value)) == 10: utc = pytz.timezone('UTC') d = date(int(field_value[0:4]), int(field_value[5:7]), int(field_value[8:10])) dt = datetime.combine(d, dtime(00, 00)) # when no time is supplied, default to 12:00:00 AM UTC timestamp = utc.localize(dt) # we have a timestamp with this format: '2022-12-05 15:05:26' elif len(str(field_value)) == 19: year = int(field_value[0:4]) month = int(field_value[5:7]) day = int(field_value[8:10]) hour = int(field_value[11:13]) minute = int(field_value[14:16]) second = int(field_value[17:19]) dt = datetime(year, month, day, hour, minute, second) timestamp = pytz.utc.localize(dt) # we have a timestamp cast as a string else: timestamp_value = field_value.isoformat() field_value = timestamp_value[0:19] + timestamp_value[26:32] + "Z" timestamp = Timestamp() timestamp.FromJsonString(field_value[0]) #print('timestamp:', timestamp) datetime_field = datacatalog.TagField() datetime_field.timestamp_value = timestamp tag.fields[field_id] = datetime_field except Exception as e: error_exists = True msg = "Error storing values {} into field {}".format(field_values, field_id) log_error(msg, e, job_uuid) return tag, error_exists def copy_tags(self, source_project, source_dataset, source_table, target_project, target_dataset, target_table, include_policy_tags=False): success = True # lookup the source entry linked_resource = '//bigquery.googleapis.com/projects/{0}/datasets/{1}/tables/{2}'.format(source_project, source_dataset, source_table) request = datacatalog.LookupEntryRequest() request.linked_resource = linked_resource source_entry = self.client.lookup_entry(request) if source_entry.bigquery_table_spec.table_source_type != types.TableSourceType.BIGQUERY_TABLE: success = False msg = 'Error {} is not a BQ table'.format(source_table) log_info(msg, None) print(json.dumps(msg)) return success # lookup the target entry linked_resource = '//bigquery.googleapis.com/projects/{0}/datasets/{1}/tables/{2}'.format(target_project, target_dataset, target_table) request = datacatalog.LookupEntryRequest() request.linked_resource = linked_resource target_entry = self.client.lookup_entry(request) if target_entry.bigquery_table_spec.table_source_type != types.TableSourceType.BIGQUERY_TABLE: success = False msg = 'Error {} is not a BQ table'.format(target_table) log_info(msg, None) print(json.dumps(error)) return success # look to see if the source table is tagged tag_list = self.client.list_tags(parent=source_entry.name, timeout=120) for source_tag in tag_list: print('source_tag.template:', source_tag.template) print('source_tag.column:', source_tag.column) # get tag template fields self.template_id = source_tag.template.split('/')[5] self.template_project = source_tag.template.split('/')[1] self.template_region = source_tag.template.split('/')[3] self.template_path = source_tag.template template_fields = self.get_template() # start a new target tag target_tag = datacatalog.Tag() target_tag.template = source_tag.template if source_tag.column: target_tag.column = source_tag.column for template_field in template_fields: #print('template_field:', template_field) if template_field['field_id'] in source_tag.fields: field_id = template_field['field_id'] tagged_field = source_tag.fields[field_id] print('field_id:', field_id) if tagged_field.bool_value: field_type = 'bool' field_value = tagged_field.bool_value if tagged_field.double_value: field_type = 'double' field_value = tagged_field.double_value if tagged_field.string_value: field_type = 'string' field_value = tagged_field.string_value if tagged_field.enum_value: field_type = 'enum' field_value = tagged_field.enum_value.display_name if tagged_field.timestamp_value: field_type = 'timestamp' field_value = tagged_field.timestamp_value if tagged_field.richtext_value: field_type = 'richtext' field_value = tagged_field.richtext_value target_tag, error_exists = self.populate_tag_field(target_tag, field_id, field_type, [field_value], None) # create the target tag tag_exists, tag_id = self.check_if_tag_exists(parent=target_entry.name, column=source_tag.column) if tag_exists == True: target_tag.name = tag_id try: print('tag update request: ', target_tag) response = self.client.update_tag(tag=target_tag) except Exception as e: success = False msg = 'Error occurred during tag update: {}'.format(target_tag) log_error(msg, e) else: try: print('tag create request: ', target_tag) response = self.client.create_tag(parent=target_entry.name, tag=target_tag) except Exception as e: success = False msg = 'Error occurred during tag create: {}'.format(target_tag) log_error(msg, e) # copy policy tags success = self.copy_policy_tags(source_project, source_dataset, source_table, target_project, target_dataset, target_table) return success def copy_policy_tags(self, source_project, source_dataset, source_table, target_project, target_dataset, target_table): success = True source_table_id = source_project + '.' + source_dataset + '.' + source_table target_table_id = target_project + '.' + target_dataset + '.' + target_table try: source_schema = self.bq_client.get_table(source_table_id).schema except Exception as e: success = False msg = 'Error occurred while retrieving the schema of {}'.format(source_table_id) log_error(msg, e) return success policy_tag_list = [] for field in source_schema: if field.policy_tags != None: policy_tag = field.policy_tags.names[0] pt_tuple = (field.name, policy_tag) policy_tag_list.append(pt_tuple) if len(policy_tag_list) == 0: return success print('policy_tag_list:', policy_tag_list) success = self.apply_policy_tags(target_table_id, policy_tag_list) return success # used to update the status of a data product tag as part of the product_registration_pipeline # https://github.com/GoogleCloudPlatform/datacatalog-tag-engine/tree/main/examples/product_registration_pipeline def update_tag_subset(self, template_id, template_project, template_region, entry_name, changed_fields): success = True tag_list = self.client.list_tags(parent=entry_name, timeout=120) for tag in tag_list: print('tag.template:', tag.template) # get tag template fields tagged_template_id = tag.template.split('/')[5] tagged_template_project = tag.template.split('/')[1] tagged_template_region = tag.template.split('/')[3] if tagged_template_id != template_id: continue if tagged_template_project != template_project: continue if tagged_template_region != template_region: continue # start a new target tag to overwrite the existing one target_tag = datacatalog.Tag() target_tag.template = tag.template target_tag.name = tag.name self.template_path = tag.template template_fields = self.get_template() for template_field in template_fields: #print('template_field:', template_field) field_id = template_field['field_id'] # skip this field if it's not in the tag if field_id not in tag.fields: continue tagged_field = tag.fields[field_id] if tagged_field.bool_value: field_type = 'bool' field_value = str(tagged_field.bool_value) if tagged_field.double_value: field_type = 'double' field_value = str(tagged_field.double_value) if tagged_field.string_value: field_type = 'string' field_value = tagged_field.string_value if tagged_field.enum_value: field_type = 'enum' field_value = str(tagged_field.enum_value.display_name) if tagged_field.timestamp_value: field_type = 'timestamp' field_value = str(tagged_field.timestamp_value) print('orig timestamp:', field_value) if tagged_field.richtext_value: field_type = 'richtext' field_value = str(tagged_field.richtext_value) # overwrite logic for changed_field in changed_fields: if changed_field['field_id'] == field_id: field_value = changed_field['field_value'] break target_tag, error_exists = self.populate_tag_field(target_tag, field_id, field_type, [field_value], None) if error_exists: msg = 'Error while populating the tag field. Aborting tag update.' error = {'msg': msg} print(json.dumps(error)) success = False return success # update the tag try: print('tag update request: ', target_tag) response = self.client.update_tag(tag=target_tag) except Exception as e: success = False msg = 'Error occurred during tag update: {}'.format(tag) log_error(msg, e) return success if __name__ == '__main__': import google.auth from google.auth import impersonated_credentials SCOPES = ['openid', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/userinfo.email'] source_credentials, _ = google.auth.default() target_service_account = config['DEFAULT']['TAG_CREATOR_SA'] credentials = impersonated_credentials.Credentials(source_credentials=source_credentials, target_principal=target_service_account, target_scopes=SCOPES, lifetime=1200) template_id = 'data_sensitivity' template_project = 'tag-engine-run' template_region = 'us-central1' job_uuid = 'df0ddb3e477511ef95dc42004e494300' config_uuid = '3404d03a477a11ef995442004e494300' data_asset_type = 'fileset' data_asset_region = 'us-central1' tag_dict = {'project': 'tag-engine-run', 'entry_group': 'sakila_eg', 'fileset': 'staff', 'column': 'first_name', 'sensitive_field': 'TRUE', 'sensitive_type': 'Sensitive_Personal_Identifiable_Information'} tag_history = True overwrite = True dcu = DataCatalogController(credentials, target_service_account, 'scohen@gcp.solutions', template_id, template_project, template_region) dcu.apply_import_config(job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history, overwrite)