BigQueryUtils.py (372 lines of code) (raw):

# Copyright 2020-2023 Google, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json, datetime, time, configparser import decimal from google.api_core.client_info import ClientInfo from google.cloud import bigquery from google.cloud.exceptions import NotFound import TagEngineStoreHandler as tesh USER_AGENT = 'cloud-solutions/datacatalog-tag-engine-v2' class BigQueryUtils: def __init__(self, credentials, region): self.region = region self.client = bigquery.Client(credentials=credentials, location=region, client_info=ClientInfo(user_agent=USER_AGENT)) # API method used by tag export function def create_report_tables(self, project, dataset): success, dataset_id = self.create_dataset(project, dataset) if success == False: return success created_dataset_table = self.report_table_create(project, dataset, 'catalog_report_dataset_tags', 'dataset') created_table_table = self.report_table_create(project, dataset, 'catalog_report_table_tags', 'table') created_column_table = self.report_table_create(project, dataset, 'catalog_report_column_tags', 'column') if created_dataset_table or created_table_table or created_column_table: print('Created report tables') return True else: return False # API method used by tag export function def truncate_report_tables(self, project, dataset): truncate_dataset_table = self.report_table_truncate(project, dataset, 'catalog_report_dataset_tags') truncate_table_table = self.report_table_truncate(project, dataset, 'catalog_report_table_tags') truncate_column_table = self.report_table_truncate(project, dataset, 'catalog_report_column_tags') if truncate_dataset_table and truncate_table_table and truncate_column_table: return True else: return False # API method used by tag export function to insert records def insert_exported_records(self, target_table_id, records): print('*** insert_exported_records into', target_table_id) success = True if target_table_id.endswith('catalog_report_column_tags'): schema = self.get_report_column_schema() rows_to_insert = records elif target_table_id.endswith('catalog_report_table_tags'): schema = self.get_report_table_schema() rows_to_insert = records elif target_table_id.endswith('catalog_report_dataset_tags'): schema = self.get_report_dataset_schema() rows_to_insert = records job_config = bigquery.LoadJobConfig(schema=schema, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON) table_ref = bigquery.table.TableReference.from_string(target_table_id) try: job = self.client.load_table_from_json(rows_to_insert, table_ref, job_config=job_config) print('Inserted record into reporting table') print('job errors:', job.errors) except Exception as e: print('Error occurred while writing record into report table ', e) if '404' in str(e): print('Report table not ready to be written to. Sleeping for 5 seconds.') time.sleep(5) try: errors = self.client.insert_rows_json(target_table_id, rows_to_insert) except Exception as e: print("Error occurred during report_table_insert: {}".format(e)) success = False return success # API method used by tag history functions (Data Catalog and Dataplex) def copy_tag(self, tag_creator_account, tag_invoker_account, job_uuid, table_name, table_fields, tagged_table, tagged_column, tagged_values): exists, table_id, settings = self.history_table_exists(table_name) if exists != True: success, dataset_id = self.create_dataset(settings['bigquery_project'], settings['bigquery_dataset']) #print('created_dataset:', success) if success: table_id = self.create_history_table(dataset_id, table_name, table_fields) else: print('Error creating tag_history dataset') if tagged_column and tagged_column != "" and "/column/" not in tagged_table: asset_name = ("{}/column/{}".format(tagged_table, tagged_column)) else: asset_name = tagged_table asset_name = asset_name.replace("/datasets/", "/dataset/").replace("/tables/", "/table/") #print('asset_name: ', asset_name) success = self.insert_history_row(tag_creator_account, tag_invoker_account, job_uuid, table_id, asset_name, tagged_values) return success # API method used by tag history function def copy_tags(self, tag_creator_account, tag_invoker_account, job_uuid, table_name, table_fields, asset_table, column_fields_list): print('enter BigQueryUtils.copy_tags') print('asset_table:', asset_table) print('column_fields_list:', column_fields_list) rows_to_insert = [] success = True exists, table_id, settings = self.history_table_exists(table_name) if exists != True: success, dataset_id = self.create_dataset(settings['bigquery_project'], settings['bigquery_dataset']) #print('created_dataset:', success) if success: table_id = self.create_history_table(dataset_id, table_name, table_fields) else: print('Error creating tag_history dataset') for column_fields in column_fields_list: column = column_fields['column'] fields = column_fields['fields'] if column and column != "" and "/column/" not in asset_table: asset_name = ("{}/column/{}".format(asset_table, column)) else: success = False print("Error: could not find the tagged column in column_fields_list, therefore skipping tag history.") return success asset_name = asset_name.replace("/datasets/", "/dataset/").replace("/tables/", "/table/") #print('asset_name: ', asset_name) row = {'event_time': datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f') + ' UTC', 'asset_name': asset_name, 'tag_creator_account': tag_creator_account, 'tag_invoker_account': tag_invoker_account, 'job_uuid': job_uuid} for field in fields: field_id = field['field_id'] field_value = field['field_value'] if isinstance(field_value, decimal.Decimal): row[field_id] = float(field_value) elif isinstance(field_value, datetime.datetime) or isinstance(field_value, datetime.date): row[field_id] = field_value.isoformat() else: row[field_id]= json.dumps(field_value, default=str) #print('rows_to_insert:', row) rows_to_insert.append(row) success = self.load_history_rows(tag_creator_account, tag_invoker_account, table_id, rows_to_insert, job_uuid) return success # API method used by job metadata function def write_job_metadata(self, job_uuid, table_name, metadata, tag_creator_sa, tag_invoker_sa): exists, table_id, settings = self.job_metadata_table_exists(table_name) if exists != True: success, dataset_id = self.create_dataset(settings['bigquery_project'], settings['bigquery_dataset']) #print('created_dataset:', success) if success: table_id = self.create_job_metadata_table(dataset_id, table_name) else: print('Error creating tag_history dataset') success = self.insert_job_metadata_row(table_id, job_uuid, metadata, tag_creator_sa, tag_invoker_sa) return success ############### Internal processing methods ############### # used by both tag history and tag export def create_dataset(self, project, dataset): success = True dataset_id = bigquery.Dataset(project + '.' + dataset) dataset_id.location = self.region try: dataset_status = self.client.create_dataset(dataset_id, exists_ok=True) print("Created dataset {}".format(dataset_status.dataset_id)) except Exception as e: print('Error occurred in create_dataset ', dataset_id, '. Error message: ', e) success = False return success, dataset_id # used by tag export function def report_table_create(self, project, dataset, table, table_type): created = True table_id = project + '.' + dataset + '.' + table table_ref = bigquery.Table.from_string(table_id) try: table = self.client.get_table(table_ref) created = False return created except NotFound: if table_type == 'dataset': schema = self.get_report_dataset_schema() elif table_type == 'table': schema = self.get_report_table_schema() elif table_type == 'column': schema = self.get_report_column_schema() table = bigquery.Table(table_id, schema=schema) table.time_partitioning = bigquery.TimePartitioning(type_=bigquery.TimePartitioningType.DAY, field="export_time") table = self.client.create_table(table) print("Created table {}".format(table.table_id)) return created # used by tag export function def get_report_dataset_schema(self): schema = [ bigquery.SchemaField("project", "STRING", mode="REQUIRED"), bigquery.SchemaField("dataset", "STRING", mode="REQUIRED"), bigquery.SchemaField("tag_template", "STRING", mode="REQUIRED"), bigquery.SchemaField("tag_field", "STRING", mode="REQUIRED"), bigquery.SchemaField("tag_value", "STRING", mode="REQUIRED"), bigquery.SchemaField("export_time", "TIMESTAMP", mode="REQUIRED"), ] return schema # used by tag export function def get_report_table_schema(self): schema = [ bigquery.SchemaField("project", "STRING", mode="REQUIRED"), bigquery.SchemaField("dataset", "STRING", mode="REQUIRED"), bigquery.SchemaField("table", "STRING", mode="REQUIRED"), bigquery.SchemaField("tag_template", "STRING", mode="REQUIRED"), bigquery.SchemaField("tag_field", "STRING", mode="REQUIRED"), bigquery.SchemaField("tag_value", "STRING", mode="REQUIRED"), bigquery.SchemaField("export_time", "TIMESTAMP", mode="REQUIRED"), ] return schema # used by tag export function def get_report_column_schema(self): schema = [ bigquery.SchemaField("project", "STRING", mode="REQUIRED"), bigquery.SchemaField("dataset", "STRING", mode="REQUIRED"), bigquery.SchemaField("table", "STRING", mode="REQUIRED"), bigquery.SchemaField("column", "STRING", mode="REQUIRED"), bigquery.SchemaField("tag_template", "STRING", mode="REQUIRED"), bigquery.SchemaField("tag_field", "STRING", mode="REQUIRED"), bigquery.SchemaField("tag_value", "STRING", mode="REQUIRED"), bigquery.SchemaField("export_time", "TIMESTAMP", mode="REQUIRED"), ] return schema # used by tag export function def report_table_truncate(self, project, dataset, table): try: self.client.query('truncate table ' + project + '.' + dataset + '.' + table).result() except Exception as e: print('Error occurred during report_table_truncate ', e) # used by tag history function def history_table_exists(self, table_name): store = tesh.TagEngineStoreHandler() enabled, settings = store.read_tag_history_settings() if enabled == False: return enabled, settings bigquery_project = settings['bigquery_project'] bigquery_region = settings['bigquery_region'] bigquery_dataset = settings['bigquery_dataset'] dataset_id = self.client.dataset(bigquery_dataset, project=bigquery_project) table_id = dataset_id.table(table_name) try: self.client.get_table(table_id) exists = True print("Tag history table {} already exists.".format(table_name)) except NotFound: exists = False print("Tag history table {} not found.".format(table_name)) return exists, table_id, settings # used by tag history function def create_history_table(self, dataset_id, table_name, fields): #print('enter create_history_table()') #print('dataset_id:', dataset_id) #print('table_name:', table_name) #print('fields:', fields) schema = [bigquery.SchemaField('event_time', 'TIMESTAMP', mode='REQUIRED'), bigquery.SchemaField('asset_name', 'STRING', mode='REQUIRED'), bigquery.SchemaField('tag_creator_account', 'STRING', mode='REQUIRED'), bigquery.SchemaField('tag_invoker_account', 'STRING', mode='REQUIRED'), bigquery.SchemaField('job_uuid', 'STRING', mode='REQUIRED')] for field in fields: col_name = field['field_id'] if field['field_type'] == 'string': col_type = 'STRING' if field['field_type'] == 'enum': col_type = 'STRING' if field['field_type'] == 'double': col_type = 'NUMERIC' if field['field_type'] == 'bool': col_type = 'BOOLEAN' if field['field_type'] == 'timestamp': col_type = 'TIMESTAMP' if field['field_type'] == 'datetime': col_type = 'TIMESTAMP' # datetime fields should be mapped to timestamps in BQ because they actually contain a timezone if field['field_type'] == 'richtext': col_type = 'STRING' schema.append(bigquery.SchemaField(col_name, col_type, mode='NULLABLE')) # mode is always set to NULLABLE to be able to represent deleted tags table_id = dataset_id.table(table_name) table = bigquery.Table(table_id, schema=schema) table.time_partitioning = bigquery.TimePartitioning(type_=bigquery.TimePartitioningType.DAY, field="event_time") try: table = self.client.create_table(table, exists_ok=True) except Exception as e: print("Error creating tag_history table:", e) print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)) table_id = ("{}.{}.{}".format(table.project, table.dataset_id, table.table_id)) return table_id # writes tag history record def insert_history_row(self, tag_creator_account, tag_invoker_account, job_uuid, table_id, asset_name, tagged_values): success = True row = {'event_time': datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f') + ' UTC', 'asset_name': asset_name, 'tag_creator_account': tag_creator_account, 'tag_invoker_account': tag_invoker_account, 'job_uuid': job_uuid} for tagged_value in tagged_values: #print('tagged_value: ' + str(tagged_value)) if 'field_value' not in tagged_value: continue if tagged_value['field_value'] == '': continue if isinstance(tagged_value['field_value'], decimal.Decimal): row[tagged_value['field_id']] = float(tagged_value['field_value']) elif isinstance(tagged_value['field_value'], datetime.datetime) or isinstance(tagged_value['field_value'], datetime.date): row[tagged_value['field_id']] = tagged_value['field_value'].isoformat() else: row[tagged_value['field_id']]= json.dumps(tagged_value['field_value'], default=str) row[tagged_value['field_id']]= tagged_value['field_value'] #print('insert row: ' + str(row)) row_to_insert = [row,] try: status = self.client.insert_rows_json(table_id, row_to_insert) if len(status) > 0: print('Inserted row into tag history table. Return status: ', status) else: print('Inserted row into tag history table.') except Exception as e: print('Error while writing to tag history table:', e) if '404' in str(e): # table isn't quite ready to be written to print('Tag history table not ready to be written to. Sleeping for 5 seconds.') time.sleep(5) try: status = self.client.insert_rows_json(table_id, row_to_insert) print('Retrying insert row into tag history table. Return status: ', status) except Exception as e: print('Error occurred while writing to tag history table: {}'.format(e)) success = False return success # writes tag history records def load_history_rows(self, tag_creator_account, tag_invoker_account, table_id, rows_to_insert, job_uuid): print('enter load_history_rows') success = True try: table = self.client.get_table(table_id) #print("table schema: {}".format(table.schema)) job_config = bigquery.LoadJobConfig(schema=table.schema, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, write_disposition='WRITE_APPEND') load_job = self.client.load_table_from_json(rows_to_insert, destination=table_id, job_config=job_config) load_job.result() if load_job.errors: print('Errors while writing to tag history table:', load_job.errors) except Exception as e: print('Error while writing to tag history table:', e) if '404' in str(e): # table isn't open for writes (it may have been just created) print('Tag history table not ready to be written to. Sleeping for 5 seconds.') time.sleep(5) try: load_job = self.client.load_table_from_json(rows_to_insert, table_ref, job_config=job_config) load_job.result() destination_table = self.client.get_table(table_id) except Exception as e: print('Error occurred while writing to tag history table: {}'.format(e)) success = False return success # used by job metadata function def job_metadata_table_exists(self, table_name): store = tesh.TagEngineStoreHandler() enabled, settings = store.read_job_metadata_settings() if enabled == False: return enabled, settings bigquery_project = settings['bigquery_project'] bigquery_region = settings['bigquery_region'] bigquery_dataset = settings['bigquery_dataset'] dataset_id = self.client.dataset(bigquery_dataset, project=bigquery_project) table_id = dataset_id.table(table_name) try: self.client.get_table(table_id) exists = True print("Job metadata table {} already exists.".format(table_name)) except NotFound: exists = False print("Job metadata table {} not found.".format(table_name)) return exists, table_id, settings # used by job metadata function def create_job_metadata_table(self, dataset_id, table_name): schema = [bigquery.SchemaField('event_time', 'TIMESTAMP', mode='REQUIRED'), bigquery.SchemaField('job_uuid', 'STRING', mode='REQUIRED'), bigquery.SchemaField('metadata', 'JSON', mode='REQUIRED'), bigquery.SchemaField('tag_creator_sa', 'STRING', mode='REQUIRED'), bigquery.SchemaField('tag_invoker_sa', 'STRING', mode='REQUIRED'), ] table_id = dataset_id.table(table_name) table = bigquery.Table(table_id, schema=schema) table.time_partitioning = bigquery.TimePartitioning(type_=bigquery.TimePartitioningType.DAY, field="event_time") table = self.client.create_table(table, exists_ok=True) print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)) table_id = ("{}.{}.{}".format(table.project, table.dataset_id, table.table_id)) return table_id # write job metadata record def insert_job_metadata_row(self, table_id, job_uuid, metadata, tag_creator_sa, tag_invoker_sa): print('enter insert_job_metadata_row') #print('job_uuid:', job_uuid) success = True row = {'event_time': datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f') + ' UTC', 'job_uuid': job_uuid, 'metadata': json.dumps(metadata), 'tag_creator_sa': tag_creator_sa, 'tag_invoker_sa': tag_invoker_sa} #print('row:', row) row_to_insert = [row,] try: status = self.client.insert_rows_json(table_id, row_to_insert) if len(status) > 0: print('Inserted row into job metadata table. Return status: ', status) except Exception as e: print('Error while writing to job metadata table:', e) if '404' in str(e): # table isn't quite ready to be written into print('Job metadata table not ready to be written to. Sleeping for 5 seconds.') time.sleep(5) try: status = self.client.insert_rows_json(table_id, row_to_insert) print('Retrying insert row into job metadata table. Return status: ', status) except Exception as e: print('Error occurred while writing into job metadata table: {}'.format(e)) success = False return success if __name__ == '__main__': import google.auth from google.auth import impersonated_credentials SCOPES = ['openid', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/userinfo.email'] source_credentials, _ = google.auth.default() config = configparser.ConfigParser() config.read("tagengine.ini") BIGQUERY_REGION = config['DEFAULT']['BIGQUERY_REGION'] target_service_account = config['DEFAULT']['TAG_CREATOR_SA'] credentials = impersonated_credentials.Credentials(source_credentials=source_credentials, target_principal=target_service_account, target_scopes=SCOPES, lifetime=1200) bqu = BigQueryUtils(credentials, BIGQUERY_REGION) job_uuid = '0890ccc8895d11eeb380af7e3e47c857' table_name = 'data_governance' metadata = {"source": "Collibra", "workflow": "process_sensitive_data"} bqu.write_job_metadata(job_uuid, table_name, metadata)