# Copyright 2020-2024 Google, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import requests, configparser, time
from datetime import datetime, date
from datetime import time as dtime
import pytz
from operator import itemgetter
import pandas as pd
from pyarrow import parquet
import json
import os

from google.api_core.gapic_v1.client_info import ClientInfo
from google.protobuf.timestamp_pb2 import Timestamp
from google.cloud import datacatalog
from google.cloud.datacatalog_v1 import types
from google.cloud.datacatalog import DataCatalogClient
from google.cloud import bigquery
from google.cloud import storage

import Resources as res
import BigQueryUtils as bq
import constants
from common import log_error, log_error_tag_dict, log_info, log_info_tag_dict

config = configparser.ConfigParser()
config.read("tagengine.ini")

BIGQUERY_REGION = config['DEFAULT']['BIGQUERY_REGION']

USER_AGENT = 'cloud-solutions/datacatalog-tag-engine-v2'

class DataCatalogController:
    
    def __init__(self, credentials, tag_creator_account=None, tag_invoker_account=None, \
                 template_id=None, template_project=None, template_region=None):
        self.credentials = credentials
        self.tag_creator_account = tag_creator_account
        self.tag_invoker_account = tag_invoker_account
        self.template_id = template_id
        self.template_project = template_project
        self.template_region = template_region
        self.client = DataCatalogClient(credentials=self.credentials, client_info=ClientInfo(user_agent=USER_AGENT))
        
        if template_id != None and template_project != None and template_region != None:
            self.template_path = self.client.tag_template_path(template_project, template_region, template_id)
        else:
            self.template_path = None
            
        self.bq_client = bigquery.Client(credentials=self.credentials, location=BIGQUERY_REGION, client_info=ClientInfo(user_agent=USER_AGENT))
        self.gcs_client = storage.Client(credentials=self.credentials, client_info=ClientInfo(user_agent=USER_AGENT))
        self.ptm_client = datacatalog.PolicyTagManagerClient(credentials=self.credentials, client_info=ClientInfo(user_agent=USER_AGENT))
    
    def get_template(self, included_fields=None):
        
        fields = []
        
        try:
            tag_template = self.client.get_tag_template(name=self.template_path)
        
        except Exception as e:
            msg = 'Error retrieving tag template {}'.format(self.template_path)
            log_error(msg, e)
            return fields
        
        for field_id, field_value in tag_template.fields.items():
            
            field_id = str(field_id)
            
            if included_fields:
                match_found = False
                for included_field in included_fields:
                    if included_field['field_id'] == field_id:
                        match_found = True
                        
                        if 'field_value' in included_field:
                            assigned_value = included_field['field_value']
                        else:
                            assigned_value = None
                            
                        if 'query_expression' in included_field:
                            query_expression = included_field['query_expression']
                        else:
                            query_expression = None
                        
                        break
                
                if match_found == False:
                    continue
            
            display_name = field_value.display_name
            is_required = field_value.is_required
            order = field_value.order
     
            enum_values = []
            
            field_type = None
            
            if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.DOUBLE:
                field_type = "double"
            if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.STRING:
                field_type = "string"
            if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.BOOL:
                field_type = "bool"
            if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.TIMESTAMP:
                field_type = "datetime"
            if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.RICHTEXT:
                field_type = "richtext"
            if field_value.type_.primitive_type == datacatalog.FieldType.PrimitiveType.PRIMITIVE_TYPE_UNSPECIFIED:
                field_type = "enum"
                       
                index = 0
                enum_values_long = str(field_value.type_).split(":") 
                for long_value in enum_values_long:
                    if index > 0:
                        enum_value = long_value.split('"')[1]
                        #print("enum value: " + enum_value)
                        enum_values.append(enum_value)
                    index = index + 1
            
            # populate dict
            field = {}
            field['field_id'] = field_id
            field['display_name'] = display_name
            field['field_type'] = field_type
            field['is_required'] = is_required
            field['order'] = order
            
            if field_type == "enum":
                field['enum_values'] = enum_values
                
            if included_fields:
                if assigned_value:
                   field['field_value'] = assigned_value
                if query_expression:
                   field['query_expression'] = query_expression

            fields.append(field)
                          
        return sorted(fields, key=itemgetter('order'), reverse=True)
    
        
    def check_if_tag_exists(self, parent, column=None):
        
        print(f'enter check_if_tag_exists, parent: {parent}')
        
        tag_exists = False
        tag_id = ""
        
        tag_list = self.client.list_tags(parent=parent, timeout=120)
        
        for tag_instance in tag_list:
            
            tagged_column = tag_instance.column          
            tagged_template_project = tag_instance.template.split('/')[1]
            tagged_template_location = tag_instance.template.split('/')[3]
            tagged_template_id = tag_instance.template.split('/')[5]
            
            if column == '' or column == None:
                # looking for a table-level tag
                if tagged_template_id == self.template_id and tagged_template_project == self.template_project and \
                    tagged_template_location == self.template_region and tagged_column == "":
                    tag_exists = True
                    tag_id = tag_instance.name
                    break
            else:
                # looking for a column-level tag
                if column.lower() == tagged_column and tagged_template_id == self.template_id and tagged_template_project == self.template_project and \
                    tagged_template_location == self.template_region:
                    tag_exists = True
                    tag_id = tag_instance.name
                    break
         
        return tag_exists, tag_id
    
    
    def apply_static_asset_config(self, fields, uri, job_uuid, config_uuid, template_uuid, tag_history, overwrite=False):
        
        # uri is either a BQ table/view path or GCS file path       
        op_status = constants.SUCCESS
        column = ''
        
        is_gcs = False
        is_bq = False
        
        # look up the entry based on the resource type
        if isinstance(uri, list):
            is_gcs = True
            bucket = uri[0].replace('-', '_')
            filename = uri[1].split('.')[0].replace('/', '_') # extract the filename without extension, replace '/' with '_'
            gcs_resource = '//datacatalog.googleapis.com/projects/' + self.template_project + '/locations/' + self.template_region + '/entryGroups/' + bucket + '/entries/' + filename
            print('gcs_resource: ', gcs_resource)
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=gcs_resource
            uri = '/'.join(uri)
            #print('uri:', uri)
            
            try:
                entry = self.client.lookup_entry(request)
                print('GCS entry:', entry.name)
            except Exception as e:
                msg = 'Unable to find the entry in the catalog. Entry {} does not exist'.format(gcs_resource)
                log_error(msg, e)
                op_status = constants.ERROR
                return op_status
                
        elif isinstance(uri, str):
            is_bq = True
            bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
            print("bigquery_resource: " + bigquery_resource)
        
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=bigquery_resource
            entry = self.client.lookup_entry(request)
            print('entry: ', entry.name)
        
        try:    
            tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name)
            print('tag exists: ', tag_exists)
        
        except Exception as e:
            msg = 'Error during check_if_tag_exists {}'.format(entry.name)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status

        if tag_exists and overwrite == False:
            msg = 'Tag already exists and overwrite is False'
            log_info(msg)
            op_status = constants.SUCCESS
            return op_status
        
        op_status = self.create_update_delete_tag(fields, tag_exists, tag_id, job_uuid, config_uuid, 'STATIC_ASSET_TAG', tag_history, entry, uri)    
           
        return op_status


    def apply_dynamic_table_config(self, fields, uri, job_uuid, config_uuid, template_uuid, tag_history, batch_mode=False):
        
        print('*** apply_dynamic_table_config ***')

        op_status = constants.SUCCESS
        error_exists = False
        
        bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
        
        #print('bigquery_resource: ', bigquery_resource)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource=bigquery_resource
        entry = self.client.lookup_entry(request)

        tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name)
        print("tag_exists: " + str(tag_exists))
        
        # create new tag
        tag = datacatalog.Tag()
        tag.template = self.template_path
        verified_field_count = 0
        
        for field in fields:
            field_id = field['field_id']
            field_type = field['field_type']
            query_expression = field['query_expression']

            # parse and run query in BQ
            query_str = self.parse_query_expression(uri, query_expression)
            print('returned query_str: ' + query_str)
            
            # note: field_values is of type list
            field_values, error_exists = self.run_query(query_str, field_type, batch_mode, job_uuid)
            print('field_values: ', field_values)
            print('error_exists: ', error_exists)
    
            if error_exists or field_values == []:
                continue
            
            tag, error_exists = self.populate_tag_field(tag, field_id, field_type, field_values, job_uuid)
    
            if error_exists:
                continue
                                    
            verified_field_count = verified_field_count + 1
            #print('verified_field_count: ' + str(verified_field_count))    
            
            # store the value back in the dict, so that it can be accessed by the exporter
            #print('field_value: ' + str(field_value))
            if field_type == 'richtext':
                formatted_value = ', '.join(str(v) for v in field_values)
            else:
                formatted_value = field_values[0]
                
            field['field_value'] = formatted_value
            
        # for loop ends here
                
        if error_exists:
            # error was encountered while running SQL expression
            # proceed with tag creation / update, but return error to user
            op_status = constants.ERROR
            
        if verified_field_count == 0:
            # tag is empty due to errors, skip tag creation
            op_status = constants.ERROR
            return op_status
                        
        if tag_exists == True:
            tag.name = tag_id
            op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
        else:
            op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
            
        if op_status == constants.SUCCESS and tag_history:
            bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
            template_fields = self.get_template()
            bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, None, fields)
               
        return op_status

    
    def column_exists_in_table(self, target_column, entry_columns):
        
        column_exists = False
        
        for catalog_column in entry_columns:
            #print('column:', catalog_column.column)
            #print('subcolumns:', catalog_column.subcolumns)
            
            is_nested_column = False
            
            # figure out if column is nested
            if len(target_column.split('.')) > 1:
                is_nested_column = True
                parent_column = target_column.split('.')[0]
                nested_column = target_column.split('.')[1]
            
            if is_nested_column == True:
                if catalog_column.column == parent_column:
                    for subcolumn in catalog_column.subcolumns:
                        if nested_column == subcolumn.column:
                            column_exists = True
                            break 
            else:
                if catalog_column.column == target_column:
                    column_exists = True
                    break
                    
        return column_exists
            
    
    def apply_dynamic_column_config(self, fields, columns_query, uri, job_uuid, config_uuid, template_uuid, tag_history, batch_mode=False):
        
        print('*** apply_dynamic_column_config ***')

        tag_work_queue = [] # collection of Tag objects that will be passed to the API to be created or updated 
        
        op_status = constants.SUCCESS
        error_exists = False
        
        target_columns = [] # columns in the table which need to be tagged
        columns_query = self.parse_query_expression(uri, columns_query)
        print('columns_query:', columns_query)
        
        rows = self.bq_client.query(columns_query).result()

        num_columns = 0
        for row in rows:    
            for column in row:
                print('column:', column)
                target_columns.append(column)
                num_columns += 1
                 
        if num_columns == 0:
            # no columns to tag
            msg = f"Error could not find columns to tag. Please check column_query parameter in your config. Current value: {columns_query}"
            log_error(msg, None, job_uuid)
            op_status = constants.ERROR
            return op_status
                
        #print('columns to be tagged:', target_columns)
                    
        bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
        #print('bigquery_resource: ', bigquery_resource)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource=bigquery_resource
        entry = self.client.lookup_entry(request)
                    
        column_fields_list = [] # list<dictionaries> where dict = {column, fields}

        for target_column in target_columns:
            
            #print('target_column:', target_column)
            
            # fail quickly if a column is not found in the entry's schema
            column_exists = self.column_exists_in_table(target_column, entry.schema.columns)
            
            if column_exists != True:
                msg = f"Error could not find column {target_column} in {resource}"
                log_error(msg, None, job_uuid)
                op_status = constants.ERROR
                return op_status

            # initialize the new column-level tag
            tag = datacatalog.Tag()
            tag.template = self.template_path
            tag.column = target_column
            
            verified_field_count = 0
            query_strings = []
            
            for field in fields:
                query_expression = field['query_expression']
                query_str = self.parse_query_expression(uri, query_expression, target_column)
                query_strings.append(query_str)

            # combine query expressions 
            combined_query = self.combine_queries(query_strings)
            
            # run combined query, adding the results to the field_values for each field
            # Note: field_values is of type list
            fields, error_exists = self.run_combined_query(combined_query, target_column, fields, job_uuid)

            if error_exists:
                op_status = constants.ERROR
                continue
            
            # populate tag fields
            tag, error_exists = self.populate_tag_fields(tag, fields, job_uuid)
    
            if error_exists:
                op_status = constants.ERROR
                continue
                                                         
            column_fields_list.append({"column": target_column, "fields": fields})
            tag_work_queue.append(tag)

        # outer loop ends here
        if len(tag_work_queue) == 0:
            op_status = constants.ERROR
            return op_status
            
        # ready to create or update all the tags in work queue         
        rec_request = datacatalog.ReconcileTagsRequest(
            parent=entry.name,
            tag_template=self.template_path,
            tags=tag_work_queue
        )

        #print('rec_request:', rec_request)
        
        try:
            operation = self.client.reconcile_tags(request=rec_request)
            print("Waiting for operation to complete...")
            resp = operation.result()
            #print("resp:", resp)
        except Exception as e:
            msg = 'Error during reconcile_tags on entry {}'.format(entry.name)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status
                        
        if tag_history and op_status != constants.ERROR:
            bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
            success = bqu.copy_tags(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, self.get_template(), uri, column_fields_list)
            print('Tag history completed successfully:', success) 
            
            if success:
                op_status = constants.SUCCESS
            else:
                op_status = constants.ERROR
                                              
        return op_status

    
    def combine_queries(self, query_strings):
        
        large_query = "select "
        
        for query in query_strings:
             large_query += "({}), ".format(query)
        
        return large_query[0:-2]  
        
    
    def apply_entry_config(self, fields, uri, job_uuid, config_uuid, template_uuid, tag_history):
        
        print('** apply_entry_config **')
        
        op_status = constants.SUCCESS        
        bucket_name, filename = uri
        bucket = self.gcs_client.get_bucket(bucket_name)
        blob = bucket.get_blob(filename)
        
        entry_group_short_name = bucket_name.replace('-', '_')
        entry_group_full_name = 'projects/' + self.template_project + '/locations/' + self.template_region + '/entryGroups/' + bucket_name.replace('-', '_')
        
        # create the entry group    
        is_entry_group = self.entry_group_exists(entry_group_full_name)
        print('is_entry_group: ', is_entry_group)
        
        if is_entry_group != True:
            self.create_entry_group(entry_group_short_name)
        
        # generate the entry id, replace '/' with '_' and remove the file extension from the name
        entry_id = filename.split('.')[0].replace('/', '_')
         
        try:
            entry_name = entry_group_full_name + '/entries/' + entry_id
            print('Info: entry_name: ', entry_name)
            
            entry = self.client.get_entry(name=entry_name)
            print('Info: entry already exists: ', entry.name)
            
        except Exception as e: 
            msg = 'Entry does not exist {}'.format(entry_name)
            log_error(msg, e, job_uuid)
         
            # populate the entry
            entry = datacatalog.Entry()
            entry.name = filename
            
            entry.display_name = entry_id 
            entry.type_ = 'FILESET'
            entry.gcs_fileset_spec.file_patterns = ['gs://' + bucket_name + '/' + filename]
            entry.fully_qualified_name = 'gs://' + bucket_name + '/' + filename
            entry.source_system_timestamps.create_time = datetime.utcnow() 
            entry.source_system_timestamps.update_time = datetime.utcnow() 
            
            # get the file's schema
            # download the file to App Engine's tmp directory 
            tmp_file = '/tmp/' + entry_id
            blob.download_to_filename(filename=tmp_file)
        
            # validate that it's a parquet file
            try:
                parquet.ParquetFile(tmp_file)
            except Exception as e:
                # not a parquet file, ignore it
                msg = 'Error: {} is not a parquet file, ignoring it'.format(filename)
                log_error(msg, e, job_uuid)
                op_status = constants.ERROR
                return op_status   
        
            schema = parquet.read_schema(tmp_file, memory_map=True)
            df = pd.DataFrame(({"column": name, "datatype": str(pa_dtype)} for name, pa_dtype in zip(schema.names, schema.types)))
            df = df.reindex(columns=["column", "datatype"], fill_value=pd.NA)  
            #print('df: ', df)

            for index, row in df.iterrows():                            
                entry.schema.columns.append(
                   types.ColumnSchema(
                       column=row['column'],
                       type_=row['datatype'],
                       description=None,
                       mode=None
                   )
                ) 
                                         
            # create the entry
            #print('entry request: ', entry)            
            created_entry = self.client.create_entry(parent=entry_group_full_name, entry_id=entry_id, entry=entry)
            print('Info: created entry: ', created_entry.name)
            
            # get the number of rows in the file
            num_rows = parquet.ParquetFile(tmp_file).metadata.num_rows
            #print('num_rows: ', num_rows)
            
            # delete the tmp file ASAP to free up memory
            os.remove(tmp_file)
            
            # create the file metadata tag
            template_path = self.client.tag_template_path(self.template_project, self.template_region, self.template_id)
            tag = datacatalog.Tag()
            tag.template = template_path
    
            for field in fields:
                
                if field['field_id'] == 'name':
                    string_field = datacatalog.TagField()
                    string_field.string_value = filename
                    tag.fields['name'] = string_field
                    field['field_value'] = filename # field_value is used by the BQ exporter
                    
                if field['field_id'] == 'bucket':
                    string_field = datacatalog.TagField()
                    string_field.string_value = bucket_name
                    tag.fields['bucket'] = string_field
                    field['field_value'] = bucket_name # field_value is used by the BQ exporter
                    
                if field['field_id'] == 'path':
                    string_field = datacatalog.TagField()
                    string_field.string_value = 'gs://' + bucket_name + '/' + filename
                    tag.fields['path'] = string_field
                    field['field_value'] = 'gs://' + bucket_name + '/' + filename # field_value is used by the BQ exporter
    
                if field['field_id'] == 'type':
                    enum_field = datacatalog.TagField()
                    enum_field.enum_value.display_name = 'PARQUET' # hardcode file extension for now
                    tag.fields['type'] = enum_field
                    field['field_value'] = 'PARQUET' # field_value is used by the BQ exporter
    
                if field['field_id'] == 'size':
                    double_field = datacatalog.TagField()
                    double_field.double_value = blob.size
                    tag.fields['size'] = double_field
                    field['field_value'] = blob.size # field_value is used by the BQ exporter

                if field['field_id'] == 'num_rows':
                    double_field = datacatalog.TagField()
                    double_field.double_value = num_rows
                    tag.fields['num_rows'] = double_field
                    field['field_value'] = num_rows # field_value is used by the BQ exporter

                if field['field_id'] == 'created_time':
                     datetime_field = datacatalog.TagField()
                     datetime_field.timestamp_value = blob.time_created
                     tag.fields['created_time'] = datetime_field
                     field['field_value'] = blob.time_created # field_value is used by the BQ exporter

                if field['field_id'] == 'updated_time':    
                     datetime_field = datacatalog.TagField()
                     datetime_field.timestamp_value = blob.time_created
                     tag.fields['updated_time'] = datetime_field
                     field['field_value'] = blob.time_created # field_value is used by the BQ exporter
 
                if field['field_id'] == 'storage_class':              
                      string_field = datacatalog.TagField()
                      string_field.string_value = blob.storage_class
                      tag.fields['storage_class'] = string_field
                      field['field_value'] = blob.storage_class # field_value is used by the BQ exporter
            
                if field['field_id'] == 'content_encoding':   
                    if blob.content_encoding:
                        string_field = datacatalog.TagField()
                        string_field.string_value = blob.content_encoding
                        tag.fields['content_encoding'] = string_field
                        field['field_value'] = blob.content_encoding # field_value is used by the BQ exporter
            
                if field['field_id'] == 'content_language':
                    if blob.content_language:
                        string_field = datacatalog.TagField()
                        string_field.string_value = blob.content_language
                        tag.fields['content_language'] = string_field
                        field['field_value'] = blob.content_language # field_value is used by the BQ exporter

                if field['field_id'] == 'media_link':            
                    string_field = datacatalog.TagField()
                    string_field.string_value = blob.media_link
                    tag.fields['media_link'] = string_field
                    field['field_value'] = blob.media_link # field_value is used by the BQ exporter

            #print('tag request: ', tag)
            created_tag = self.client.create_tag(parent=entry_name, tag=tag)
            #print('created_tag: ', created_tag)
            
            if tag_history:
                bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
                template_fields = self.get_template()
                bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, '/'.join(uri), None, fields)
                                                
        return op_status


    def entry_group_exists(self, entry_group_full_name):
    
        request = datacatalog.GetEntryGroupRequest(name=entry_group_full_name)
        
        try:
            response = self.client.get_entry_group(request=request)
            return True
        except Exception as e:
            msg = 'Error entry goup does not exist {}'.format(entry_group_full_name)
            log_error(msg, e)
            return False
    
    
    def create_entry_group(self, entry_group_short_name):
    
        eg = datacatalog.EntryGroup()
        eg.display_name = entry_group_short_name
        
        entry_group = self.client.create_entry_group(
                    parent='projects/' + self.template_project + '/locations/' + self.template_region,
                    entry_group_id=entry_group_short_name,
                    entry_group=eg)
        
        print('created entry_group: ', entry_group.name)
        return entry_group.name
           

    def apply_glossary_asset_config(self, fields, mapping_table, uri, job_uuid, config_uuid, template_uuid, tag_history, overwrite=False):
        
        # uri is either a BQ table/view path or GCS file path    
        op_status = constants.SUCCESS
        
        is_gcs = False
        is_bq = False
        
        # look up the entry based on the resource type
        if isinstance(uri, list):
            is_gcs = True
            bucket = uri[0].replace('-', '_')
            filename = uri[1].split('.')[0].replace('/', '_') # extract the filename without the extension, replace '/' with '_'
            gcs_resource = '//datacatalog.googleapis.com/projects/' + self.template_project + '/locations/' + self.template_region + '/entryGroups/' + bucket + '/entries/' + filename
            #print('gcs_resource: ', gcs_resource)
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=gcs_resource
            
            try:
                entry = self.client.lookup_entry(request)
                print('entry: ', entry.name)
            except Exception as e:
                msg = 'Unable to find entry in the catalog. Entry {} does not exist: {}'.format(gcs_resource, e)
                log_error(msg, e, job_uuid)
                op_status = constants.ERROR
                return op_status
                #print('entry found: ', entry)
        
        elif isinstance(uri, str):
            is_bq = True        
            bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
            print("bigquery_resource: " + bigquery_resource)
        
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=bigquery_resource
            entry = self.client.lookup_entry(request)
            print('entry: ', entry.name)
        
        try:    
            tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name)
            print('tag_exists: ', tag_exists)
        
        except Exception as e:
            msg = 'Error during check_if_tag_exists: {}'.format(e)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status

        if tag_exists and overwrite == False:
            msg = 'Info: tag already exists and overwrite set to False'
            error = {'job_uuid': job_uuid, 'msg': msg}
            print(json.dumps(info))
            
            op_status = constants.SUCCESS
            return op_status
         
        if entry.schema == None:
            msg = 'Error entry {} does not have a schema in the catalog'.format(entry.name)
            error = {'job_uuid': job_uuid, 'msg': msg}
            print(json.dumps(info))
            
            op_status = constants.ERROR
            return op_status
        
        # retrieve the schema columns from the entry
        column_schema_str = ''
        for column_schema in entry.schema.columns: 
            column_schema_str += "'" + column_schema.column + "',"
        
        #print('column_schema_str: ', column_schema_str)
             
        mapping_table_formatted = mapping_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')
                
        query_str = 'select canonical_name from `' + mapping_table_formatted + '` where source_name in (' + column_schema_str[0:-1] + ')'
        #print('query_str: ', query_str)

        rows = self.bq_client.query(query_str).result()
        
        tag = datacatalog.Tag()
        tag.template = self.template_path
        
        tag_is_empty = True
        
        for row in rows:
            canonical_name = row['canonical_name']
            #print('canonical_name: ', canonical_name)
        
            for field in fields:
                if field['field_id'] == canonical_name:
                    #print('found match')
                    bool_field = datacatalog.TagField()
                    bool_field.bool_value = True
                    tag.fields[canonical_name] = bool_field
                    field['field_value'] = True
                    tag_is_empty = False
                    break
                    
        if tag_is_empty:
            print("Error: can't create the tag because it's empty")
            op_status = constants.ERROR
            return op_status
                            
        if tag_exists:
            # tag already exists and overwrite is True
            tag.name = tag_id
            op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
        else:
            op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
                    
        if tag_history:
            bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
            template_fields = self.get_template()
            if is_gcs:
                bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, '/'.join(uri), None, fields)
            if is_bq:
                bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, None, fields)
                   
        return op_status
      
                 
    def apply_sensitive_column_config(self, fields, dlp_dataset, infotype_selection_table, infotype_classification_table, \
                                      uri, create_policy_tags, taxonomy_id, job_uuid, config_uuid, template_uuid, \
                                      tag_history, overwrite=False):
        
        if create_policy_tags:

            request = datacatalog.ListPolicyTagsRequest(
                parent=taxonomy_id
            )

            try:
                page_result = self.ptm_client.list_policy_tags(request=request)
            except Exception as e:
                msg = 'Unable to retrieve the policy tag taxonomy for taxonomy_id {}'.format(taxonomy_id)
                log_error(msg, e, job_uuid)
                op_status = constants.ERROR
                return op_status    

            policy_tag_names = [] # list of fully qualified policy tag names and sensitive categories

            for response in page_result:
                policy_tag_names.append((response.name, response.display_name))

            policy_tag_requests = [] # stores the list of fully qualified policy tag names and table column names, 
                                     # so that we can create the policy tags on the various sensitive fields
 
        # uri is a BQ table path       
        op_status = constants.SUCCESS
        column = ''
        
        if isinstance(uri, str) == False:
            print('Error: url ' + str(url) + ' is not of type string.')
            op_status = constants.ERROR
            return op_status
            
        bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
        #print("bigquery_resource: ", bigquery_resource)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource=bigquery_resource
        
        try:
            entry = self.client.lookup_entry(request)
        except Exception as e:
            msg = 'Error looking up entry {} in the catalog: {}'.format(bigquery_resource, e)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status
           
        dlp_dataset = dlp_dataset.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')        
        infotype_selection_table = infotype_selection_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')
        infotype_classification_table = infotype_classification_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')
        dlp_table = dlp_dataset + '.' + uri.split('/')[4]
               
        infotype_fields = []
        notable_infotypes = []
    
        # get an array of infotypes associated with each field in the DLP findings table
        dlp_sql = 'select field, array_agg(infotype) infotypes '
        dlp_sql += 'from (select distinct cl.record_location.field_id.name as field, info_type.name as infotype '
        dlp_sql += 'from ' + dlp_table + ', unnest(location.content_locations) as cl '
        dlp_sql += 'order by cl.record_location.field_id.name) '
        dlp_sql += 'group by field'
        
        try:
            dlp_rows = self.bq_client.query(dlp_sql).result()
        
        except Exception as e:
            msg = 'Error querying DLP findings table: {}'.format(dlp_sql)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status

        dlp_row_count = 0
    
        for dlp_row in dlp_rows:
        
            dlp_row_count += 1
        
            field = dlp_row['field']
            infotype_fields.append(field)
            infotypes = dlp_row['infotypes']
        
            print('field ', field, ', infotypes [', infotypes, ']')
        
            is_sql = 'select notable_infotype '
            is_sql += 'from ' + infotype_selection_table + ' i, '
        
            infotype_count = len(infotypes)
        
            for i in range(0, infotype_count):
            
                is_sql += 'unnest(i.field_infotypes) as i' + str(i) + ', '
        
            is_sql = is_sql[:-2] + ' '
            
            for i, infotype in enumerate(infotypes):
            
                if i == 0:
                    is_sql += 'where i' + str(i) + ' = "' + infotype + '" ' 
                else:
                    is_sql += 'and i' + str(i) + ' = "' + infotype + '" ' 
        
            is_sql += 'order by array_length(i.field_infotypes) '
            is_sql += 'limit 1'
        
            #print('is_sql: ', is_sql)
            
            try:
                ni_rows = self.bq_client.query(is_sql).result()
            except Exception as e:
                msg = 'Error querying infotype selection table: {}'.format(is_sql)
                log_error(msg, e, job_uuid)
                op_status = constants.ERROR
                return op_status
        
            for ni_row in ni_rows:
                notable_infotypes.append(ni_row['notable_infotype']) # there should be just one notable infotype per field
    
        # there are no DLP findings
        if dlp_row_count == 0:
            op_status = constants.SUCCESS
            return op_status
    
        # remove duplicate infotypes from notable list
        final_set = list(set(notable_infotypes))
        print('final_set: ', final_set)
        
        # lookup classification using set of notable infotypes   
        c_sql = 'select classification_result '
        c_sql += 'from ' + infotype_classification_table + ' c, '
    
        for i in range(0, len(final_set)):
            c_sql += 'unnest(c.notable_infotypes) as c' + str(i) + ', '
    
        c_sql = c_sql[:-2] + ' '
    
        for i, notable_infotype in enumerate(final_set):
        
            if i == 0:
                c_sql += 'where c' + str(i) + ' = "' + notable_infotype + '" '
            else:
                c_sql += 'and c' + str(i) + ' = "' + notable_infotype + '" '

        c_sql += 'order by array_length(c.notable_infotypes) '
        c_sql += 'limit 1'  

        #print('c_sql: ', c_sql)
    
        try:
            c_rows = self.bq_client.query(c_sql).result()
        except Exception as e:
            msg = 'Error querying infotype classification table: {}'.format(c_sql)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status
        
        classification_result = None
    
        for c_row in c_rows:
            classification_result = c_row['classification_result'] # we should end up with one classification result per table
    
        print('classification_result: ', classification_result)
        
        tag = datacatalog.Tag()
        tag.template = self.template_path
        
        # each element represents a field which needs to be tagged
        for infotype_field in infotype_fields:
            
            for field in fields:
                if 'sensitive_field' in field['field_id']:
                    bool_field = datacatalog.TagField()
                    
                    if classification_result == 'Public_Information':
                        bool_field.bool_value = False
                        field['field_value'] = False
                    else:
                        bool_field.bool_value = True
                        field['field_value'] = True
                    
                    tag.fields['sensitive_field'] = bool_field
                    
                if 'sensitive_type' in field['field_id']:
                    enum_field = datacatalog.TagField()
                    enum_field.enum_value.display_name = classification_result
                    tag.fields['sensitive_type'] = enum_field
                    field['field_value'] = classification_result
           
            tag.column = infotype_field # DLP has a bug and sometimes the infotype field does not equal to the column name in the table
            print('tag.column: ', infotype_field)
            
            # check if a tag already exists on this column
            try:    
                tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=infotype_field)
        
            except Exception as e:
                msg = 'Error during check_if_tag_exists: {}'.format(entry.name)
                log_error(msg, e, job_uuid)
                op_status = constants.ERROR
                return op_status   
            
            # tag already exists    
            if tag_exists:
                
                if overwrite == False:
                    # skip this sensitive column because it is already tagged
                    continue
                
                tag.name = tag_id
                op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
            else:
                op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
                                    
            if op_status == constants.SUCCESS and create_policy_tags and classification_result != 'Public_Information':
                # add the column name and policy tag name to a list
                for policy_tag_name, policy_tag_category in policy_tag_names:
                    if policy_tag_category == classification_result:
                        policy_tag_requests.append((infotype_field, policy_tag_name))
                    
                            
            if op_status == constants.SUCCESS and tag_history:
                bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
                template_fields = self.get_template()
                bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, infotype_field, fields)
        
                        
        # once we have created the regular tags, we can create/update the policy tags
        if create_policy_tags and len(policy_tag_requests) > 0:
            table_id = uri.replace('/datasets/', '.').replace('/tables/', '.')
            op_status = self.apply_policy_tags(table_id, policy_tag_requests)
        
        if op_status != constants.SUCCESS:
            msg = 'Error occurred when tagging {}'.format(uri) 
            error = {'job_uuid': job_uuid, 'msg': msg}
            print(json.dumps(error))
                
        return op_status

    
    def apply_policy_tags(self, table_id, policy_tag_requests):
        op_status = constants.SUCCESS
        table = self.bq_client.get_table(table_id) 
        schema = table.schema

        new_schema = []
        
        for field in schema:
            
            field_match = False
            
            for column, policy_tag_name in policy_tag_requests:
                
                if field.name == column:
                    print('applying policy tag on', field.name)
                    policy = bigquery.schema.PolicyTagList(names=[policy_tag_name,])
                    new_schema.append(bigquery.schema.SchemaField(field.name, field.field_type, field.mode, policy_tags=policy)) 
                    field_match = True
                    break
        
            if field_match == False:    
                new_schema.append(field)
                
        table.schema = new_schema
        
        try:
            table = self.bq_client.update_table(table, ["schema"])  
        
        except Exception as e:
            msg = 'Error occurred while updating the schema of {}'.format(table_id)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
        
        return op_status
        
            
    def apply_export_config(self, config_uuid, target_project, target_dataset, target_region, uri):
        
        column_tag_records = []
        table_tag_records = []
        dataset_tag_records = []
        
        export_status = constants.SUCCESS
        bqu = bq.BigQueryUtils(self.credentials, target_region)
        
        if isinstance(uri, str) == False:
            print('Error: url ' + str(url) + ' is not of type string.')
            export_status = constants.ERROR
            return export_status
        
        tagged_project = uri.split('/')[0]
        tagged_dataset = uri.split('/')[2]
        
        if '/tables/' in uri:
            target_table_id = 'catalog_report_table_tags'
            tagged_table = uri.split('/')[4]
        else:
            target_table_id = 'catalog_report_dataset_tags'
            tagged_table = None
            
        bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
        #print("bigquery_resource: ", bigquery_resource)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource=bigquery_resource
        
        try:
            entry = self.client.lookup_entry(request)
        except Exception as e:
            msg = 'Error looking up entry {} in catalog'.format(bigquery_resource)
            log_error(msg, e, job_uuid)
            export_status = constants.ERROR
            return export_status

        tag_list = self.client.list_tags(parent=entry.name, timeout=120)
    
        for tag in tag_list:
            print('tag.template:', tag.template)
            print('tag.column:', tag.column)
            
            # get tag template fields
            self.template_id = tag.template.split('/')[5]
            self.template_project = tag.template.split('/')[1]
            self.template_region = tag.template.split('/')[3]
            self.template_path = tag.template
            template_fields = self.get_template()
            
            if tag.column and len(tag.column) > 1:
                tagged_column = tag.column
                target_table_id = 'catalog_report_column_tags'
            else:
                tagged_column = None
                target_table_id = 'catalog_report_table_tags'
            
            for template_field in template_fields:
    
                #print('template_field:', template_field)
                field_id = template_field['field_id']
                
                if field_id not in tag.fields:
                    continue
                    
                tagged_field = tag.fields[field_id]
                tagged_field_str = str(tagged_field)
                tagged_field_split = tagged_field_str.split('\n')
                #print('tagged_field_split:', tagged_field_split)
                
                split_index = 0
                
                for split in tagged_field_split:
                    if '_value:' in split:
                        start_index = split.index(':', 0) + 1
                        #print('start_index:', start_index)
                        field_value = split[start_index:].strip().replace('"', '').replace('<br>', ',')
                        print('extracted field_value:', field_value)
                        break
                    elif 'enum_value' in split:
                        field_value = tagged_field_split[split_index+1].replace('display_name:', '').replace('"', '').strip()
                        print('extracted field_value:', field_value)
                        break
                    
                    split_index += 1                    
                    
                # format record to be written
                current_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
                
                if target_table_id in 'catalog_report_column_tags':
                    column_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "table": tagged_table, "column": tagged_column, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts})
                
                elif target_table_id in 'catalog_report_table_tags':
                    table_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "table": tagged_table, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts})
                
                elif target_table_id in 'catalog_report_dataset_tags':
                    dataset_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts})
                      
        # write exported records to BQ
        if len(dataset_tag_records) > 0:
            target_table_id = target_project + '.' + target_dataset + '.catalog_report_dataset_tags'
            success = bqu.insert_exported_records(target_table_id, dataset_tag_records)
        
        if len(table_tag_records) > 0:
            target_table_id = target_project + '.' + target_dataset + '.catalog_report_table_tags'
            success = bqu.insert_exported_records(target_table_id, table_tag_records)
                    
        if len(column_tag_records) > 0:
            target_table_id = target_project + '.' + target_dataset + '.catalog_report_column_tags'
            success = bqu.insert_exported_records(target_table_id, column_tag_records)
                     
        return export_status
        
            
    def apply_import_config(self, job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history, overwrite=False):
    
        #print(f'apply_import_config: {job_uuid}, {config_uuid}, {data_asset_type}, {data_asset_region}, {tag_dict}, {tag_history}')
        
        op_status = constants.SUCCESS
        
        if 'project' in tag_dict:
            project = tag_dict['project']
        else:
            msg = "Error: project info missing from CSV"
            log_error_tag_dict(msg, None, job_uuid, tag_dict)
            op_status = constants.ERROR
            return op_status
        
        if data_asset_type == constants.BQ_ASSET:
            if 'dataset' not in tag_dict:
                msg = "Error: could not find the required dataset field in the CSV"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            else:
                entry_type = constants.DATASET
                dataset = tag_dict['dataset']
                
                if 'table' in tag_dict:
                    table = tag_dict['table']
                    entry_type = constants.BQ_TABLE
            
        if data_asset_type == constants.FILESET_ASSET:
            if 'entry_group' not in tag_dict or 'fileset' not in tag_dict:
                msg = "Error: could not find the required fields in the CSV. Missing entry_group or fileset or both"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            else:
                entry_type = constants.FILESET
                entry_group = tag_dict['entry_group']
                fileset = tag_dict['fileset']
        
        if data_asset_type == constants.SPAN_ASSET:
            if 'instance' not in tag_dict or 'database' not in tag_dict or 'table' not in tag_dict:
                msg = "Error: could not find the required fields in the CSV. The required fields for Spanner are instance, database, and table"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            else:
               entry_type = constants.SPAN_TABLE
               instance = tag_dict['instance']
               database = tag_dict['database']
               
               if 'schema' in tag_dict:
                   schema = tag_dict['schema']
                   table = tag_dict['table']
                   table = f"`{schema}.{table}`"
               else:
                   table = tag_dict['table']
                                
        if entry_type == constants.DATASET:
            resource = f'//bigquery.googleapis.com/projects/{project}/datasets/{dataset}'
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=resource
            
        if entry_type == constants.BQ_TABLE:
            resource = f'//bigquery.googleapis.com/projects/{project}/datasets/{dataset}/tables/{table}'
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=resource
         
        if entry_type == constants.FILESET:
            resource = f'//datacatalog.googleapis.com/projects/{project}/locations/{data_asset_region}/entryGroups/{entry_group}/entries/{fileset}'
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=resource
            
        if entry_type == constants.SPAN_TABLE:
            resource = f'spanner:{project}.regional-{data_asset_region}.{instance}.{database}.{table}'
            request = datacatalog.LookupEntryRequest()
            request.fully_qualified_name=resource
            request.project=project
            request.location=data_asset_region

        try:
            entry = self.client.lookup_entry(request)
        except Exception as e:
            msg = "Error could not find {} entry for {}".format(entry_type, resource)
            log_error_tag_dict(msg, e, job_uuid, tag_dict)
            op_status = constants.ERROR
            return op_status

        # format uri for storing in tag history table
        if data_asset_type == constants.BQ_ASSET:
            uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '')
        if data_asset_type == constants.SPAN_ASSET:
            uri = entry.linked_resource.replace('///projects/', '').replace('instances', 'instance').replace('databases', 'database') + '/table/' + table.replace('`', '')
        if data_asset_type == constants.FILESET_ASSET:
            uri = entry.linked_resource.replace('//datacatalog.googleapis.com/projects/', '').replace('locations', 'location').replace('entryGroups', 'entry_group').replace('entries', 'entry')
        
        target_column = None
        
        if 'column' in tag_dict:
            target_column = tag_dict['column'] 
            
            column_exists = self.column_exists_in_table(target_column, entry.schema.columns)
            
            if column_exists == False:
                msg = f"Error could not find column {target_column} in {resource}"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            
            uri = uri + '/column/' + target_column
  
        try:    
            tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=target_column)

        except Exception as e:
            msg = f"Error during check_if_tag_exists: {entry.name}"
            log_error_tag_dict(msg, e, job_uuid, tag_dict)
            op_status = constants.ERROR
            return op_status

        if tag_exists and overwrite == False:
            msg = "Info: Tag already exists and overwrite flag is False"
            log_info_tag_dict(msg, job_uuid, tag_dict)
            op_status = constants.SUCCESS
            return op_status
        
        tag_fields = []
        template_fields = self.get_template()
        
        for field_name in tag_dict:
           
            if field_name == 'project' or field_name == 'dataset' or field_name == 'table' or \
                field_name == 'column' or field_name == 'entry_group' or field_name == 'fileset' or \
                field_name == 'instance' or field_name == 'database' or field_name == 'schema':
                continue
        
            field_type = None
            field_value = tag_dict[field_name].strip()
            
            for template_field in template_fields:
                if template_field['field_id'] == field_name:
                    field_type = template_field['field_type']
                    break
    
            if field_type == None:
                print('Error while preparing the tag. The field ', field_name, ' was not found in the tag template ', self.template_id)
                op_status = constants.ERROR
                return op_status
    
            # this check allows for tags with empty enums to get created, otherwise the empty enum gets flagged because DC thinks that you are storing an empty string as the enum value
            if field_type == 'enum' and field_value == '':
                continue
                
            field = {'field_id': field_name, 'field_type': field_type, 'field_value': field_value}    
            tag_fields.append(field)
            
        
        op_status = self.create_update_delete_tag(tag_fields, tag_exists, tag_id, job_uuid, config_uuid, 'IMPORT_TAG', tag_history, \
                                                  entry, uri, target_column)
                                
        return op_status
    

    def apply_restore_config(self, job_uuid, config_uuid, tag_extract, tag_history, overwrite=False):
             
        op_status = constants.SUCCESS
        
        for json_obj in tag_extract:
            #print('json_obj: ', json_obj)
        
            entry_group = json_obj['entryGroupId']
            entry_id = json_obj['id']
            location_id = json_obj['locationId']
            project_id = json_obj['projectId']
    
            #print('entry_group: ', entry_group)
            #print('entry_id: ', entry_id)
        
            entry_name = 'projects/' + project_id + '/locations/' + location_id + '/entryGroups/' + entry_group + '/entries/' + entry_id
            print('entry_name: ', entry_name)
    
            try:
                entry = self.client.get_entry(name=entry_name)
                
            except Exception as e:
                msg = "Error couldn't find the entry: {}".format(entry_name)
                log_error(msg, e, job_uuid)
                op_status = constants.ERROR
                return op_status
            
            if 'columns' in json_obj:
                # column-level tag
                json_columns = json_obj['columns']
                #print('json_columns: ', json_columns)
                
                for column_obj in json_columns:
                
                    column_name = column_obj['name'].split(':')[1]
                    column_tags = column_obj['tags']
                    fields = column_tags[0]['fields']
                                     
                    try:    
                        tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=column_name)
        
                    except Exception as e:
                        msg = 'Error during check_if_tag_exists:{}'.format(entry.name)
                        log_error(msg, e, job_uuid)
                        op_status = constants.ERROR
                        return op_status

                    if tag_exists and overwrite == False:
                        msg = 'Info: Tag already exists and overwrite flag is False'
                        info = {'job_uuid': job_uuid, 'msg': msg}
                        print(json.dumps(info))
                        
                        op_status = constants.SUCCESS
                        return op_status
            
                    # create or update column-level tag
                    uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '') + '/column/' + column_name
                    op_status = self.create_update_delete_tag(fields, tag_exists, tag_id, job_uuid, config_uuid, 'RESTORE_TAG', tag_history, \
                                                                     entry, uri, column_name)
            
            if 'tags' in json_obj:
                # table-level tag
                json_tags = json_obj['tags'] 
                fields = json_tags[0]['fields']
                #print('fields: ', fields)  
                
                try:    
                    tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column='')
    
                except Exception as e:
                    msg = 'Error during check_if_tag_exists:{}'.format(entry.name)
                    log_error(msg, e, job_uuid)
                    op_status = constants.ERROR
                    return op_status

                if tag_exists and overwrite == False:
                    msg = 'Info: Tag already exists and overwrite flag is False'
                    info = {'job_uuid': job_uuid, 'msg': msg}
                    print(json.dumps(info))
                    
                    op_status = constants.SUCCESS
                    return op_status
                
                # create or update table-level tag
                uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '')
                op_status = self.create_update_delete_tag(fields, tag_exists, tag_id, job_uuid, config_uuid, 'RESTORE_TAG', tag_history, \
                                                          entry, uri)                     
                    
        return op_status
        
    # used by multiple apply methods
    def create_update_delete_tag(self, fields, tag_exists, tag_id, job_uuid, config_uuid, config_type, tag_history, entry, uri, column_name=''):
        
        op_status = constants.SUCCESS
        valid_field = False
        
        num_fields = len(fields)
        num_empty_values = 0
        
        tag = datacatalog.Tag()
        tag.template = self.template_path

        for field in fields:
            
            if 'name' in field:
                valid_field = True
                field_id = field['name']
                field_type = field['type']
                field_value = field['value']
                                
                # rename the keys, which will be used by tag history
                if tag_history:
                    field['field_id'] = field['name']
                    field['field_type'] = field['type']
                    field['field_value'] = field['value']
                    del field['name']
                    del field['type']
                    del field['value']
                
            elif 'field_id' in field:
                valid_field = True
                field_id = field['field_id']
                field_type = field['field_type'].upper()
                field_value = field['field_value']
                
            else:
                # export file contains invalid tags (e.g. a tagged field without a name)
                continue
            
            # keep track of empty values
            if field_value == '':
                num_empty_values += 1
            
            if field_type == 'BOOL':
                bool_field = datacatalog.TagField()

                if isinstance(field_value, str):
                    if field_value == 'TRUE':
                        bool_field.bool_value = True
                    else:
                        bool_field.bool_value = False
                else:
                    bool_field.bool_value = field_value

                tag.fields[field_id] = bool_field

            if field_type == 'STRING':
                string_field = datacatalog.TagField()
                string_field.string_value = str(field_value)
                tag.fields[field_id] = string_field
            if field_type == 'DOUBLE':
                float_field = datacatalog.TagField()
                float_field.double_value = float(field_value)
                tag.fields[field_id] = float_field
            if field_type == 'RICHTEXT':
                richtext_field = datacatalog.TagField()
                richtext_field.richtext_value = field_value.replace(',', '<br>')
                tag.fields[field_id] = richtext_field
                
                # For richtext values, replace '<br>' with ',' when exporting to BQ
                field['field_value'] = field_value.replace('<br>', ', ')
                
            if field_type == 'ENUM':
                enum_field = datacatalog.TagField()
                enum_field.enum_value.display_name = field_value
                tag.fields[field_id] = enum_field
            
            if field_type == 'DATETIME' or field_type == 'TIMESTAMP': 
                
                # field_value may be empty or date value e.g. "2022-05-08" or datetime value e.g. "2022-05-08 15:00:00"
                if field_value == '':
                    timestamp = ''
                
                else:
                
                    if len(field_value) == 10:
                        d = date(int(field_value[0:4]), int(field_value[5:7]), int(field_value[8:10]))
                        dt = datetime.combine(d, dtime(00, 00)) # when no time is supplied, default to 12:00:00 AM UTC  
                
                    else:
                        # raw timestamp format: 2022-05-11 21:18:20
                        d = date(int(field_value[0:4]), int(field_value[5:7]), int(field_value[8:10]))
                        t = dtime(int(field_value[11:13]), int(field_value[14:16]))
                        dt = datetime.combine(d, t)
            
                    utc = pytz.timezone('UTC')
                    timestamp = utc.localize(dt)
                
                    datetime_field = datacatalog.TagField()
                    datetime_field.timestamp_value = timestamp
                    tag.fields[field_id] = datetime_field
                
                field['field_value'] = timestamp  # store this value back in the field, so it can be recorded in tag history
    
        # exported file from DataCatalog can have invalid tags, skip tag creation if that's the case 
        if valid_field == False:
            msg = f"Invalid field {field}"
            log_error(msg, error='', job_uuid=job_uuid)
            op_status = constants.ERROR
            return op_status
        
        if column_name != '':
            tag.column = column_name 
    
        if tag_exists == True:
            tag.name = tag_id
            
            # delete tag if every field in it is empty
            if num_fields == num_empty_values:
                op_status = self.do_create_update_delete_action(job_uuid, 'delete', tag)
            else:
                op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
        else:
            # create the table only if it has at least one non-empty fields
            if num_fields != num_empty_values:
                op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
        
        # only write to tag history if the operation was successful
        if tag_history and op_status == constants.SUCCESS:
            bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
            template_fields = self.get_template()
            success = bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, column_name, fields)
            
            if success == False:
                msg = 'Error occurred while writing to tag history table'
                log_error(msg, error='', job_uuid=job_uuid)
                op_status = constants.ERROR
       
        return op_status
        
    
    def do_create_update_delete_action(self, job_uuid, action, tag, entry=None):
       
       op_status = constants.SUCCESS
       
       try:
           print('do {}, tag: {}'.format(action, tag))
           
           if action == 'delete':
               response = self.client.delete_tag(name=tag.name)
         
           if action == 'update':
               respect = self.client.update_tag(tag=tag)
            
           if action == 'create':
               response = self.client.create_tag(parent=entry.name, tag=tag)
                                        
       except Exception as e:
           msg = f'Error occurred during tag {action}: {tag}'
           log_error(msg, e, job_uuid)
         
           # if it's a quota issue, sleep and retry the operation 
           if '429' in str(e) or '503' in str(e):
               msg = 'Info: sleep for 2 minutes due to {}'.format(e)
               log_info(msg, job_uuid)
               time.sleep(120)
 
               try:
                   if action == 'delete':
                       response = self.client.delete_tag(name=tag.name)
         
                   if action == 'update':
                       respect = self.client.update_tag(tag=tag)
            
                   if action == 'create':
                       response = self.client.create_tag(parent=entry.name, tag=tag)
             
               except Exception as e:
                   msg = f'Error occurred during tag {action} after sleep: {tag}'
                   log_error(msg, e, job_uuid)
                   op_status = constants.ERROR
                   return op_status
           else:
               op_status = constants.ERROR
         
       return op_status
       
            
    def search_catalog(self, bigquery_project, bigquery_dataset):
        
        linked_resources = {}
        
        scope = datacatalog.SearchCatalogRequest.Scope()
        scope.include_project_ids.append(bigquery_project)
        
        request = datacatalog.SearchCatalogRequest()
        request.scope = scope
    
        query = 'parent:' + bigquery_project + '.' + bigquery_dataset
        print('query string: ' + query)
    
        request.query = query
        request.page_size = 1
    
        for result in self.client.search_catalog(request):
            print('result: ' + str(result))
            
            resp = self.client.list_tags(parent=result.relative_resource_name)
            tags = list(resp.tags)
            tag_count = len(tags)
            
            index = result.linked_resource.rfind('/')
            table_name = result.linked_resource[index+1:]
            linked_resources[table_name] = tag_count
            
        return linked_resources

  
    def parse_query_expression(self, uri, query_expression, column=None):
        
        query_str = None
        
        # analyze query expression
        from_index = query_expression.rfind(" from ", 0)
        where_index = query_expression.rfind(" where ", 0)
        project_index = query_expression.rfind("$project", 0)
        dataset_index = query_expression.rfind("$dataset", 0)
        table_index = query_expression.rfind("$table", 0)
        from_clause_table_index = query_expression.rfind(" from $table", 0)
        from_clause_backticks_table_index = query_expression.rfind(" from `$table`", 0)
        column_index = query_expression.rfind("$column", 0)
        
        #print('table_index: ', table_index)
        #print('column_index: ', column_index)
        
        if project_index != -1:
            project_end = uri.find('/') 
            project = uri[0:project_end]
            #print('project: ' + project)
            #print('project_index: ', project_index)
            
        if dataset_index != -1:
            dataset_start = uri.find('/datasets/') + 10
            dataset_string = uri[dataset_start:]
            dataset_end = dataset_string.find('/') 
            
            if dataset_end == -1:
                dataset = dataset_string[0:]
            else:
                dataset = dataset_string[0:dataset_end]
            print('dataset:', dataset)
            print('dataset_end:', dataset_end)
            print('dataset_index:', dataset_index)
        
        # $table referenced in from clause, use fully qualified table
        if from_clause_table_index > 0 or from_clause_backticks_table_index > 0:
             #print('$table referenced in from clause')
             qualified_table = uri.replace('/project/', '.').replace('/datasets/', '.').replace('/tables/', '.')
             #print('qualified_table:', qualified_table)
             #print('query_expression:', query_expression)
             query_str = query_expression.replace('$table', qualified_table)
             #print('query_str:', query_str)
             
        # $table is referenced somewhere in the expression, replace $table with actual table name
        else:
        
            if table_index != -1:
                #print('$table referenced somewhere, but not in the from clause')
                table_index = uri.rfind('/') + 1
                table_name = uri[table_index:]
                #print('table_name: ' + table_name)
                query_str = query_expression.replace('$table', table_name)
            
            # $project referenced in where clause too
            if project_index > -1:
                
                if query_str == None:
                    query_str = query_expression.replace('$project', project)
                else:
                    query_str = query_str.replace('$project', project)
                
                #print('query_str: ', query_str)
            
            # $dataset referenced in where clause too    
            if dataset_index > -1:

                if query_str == None:
                    query_str = query_expression.replace('$dataset', dataset)
                else:
                    query_str = query_str.replace('$dataset', dataset)
                    
                print('query_str: ', query_str)
            
        # table not in query expression (e.g. select 'string')
        if table_index == -1 and query_str == None:
            query_str = query_expression
            
        if column_index != -1:
            
            if query_str == None:
                query_str = query_expression.replace('$column', column)
            else:
                query_str = query_str.replace('$column', column)
        
        #print('returning query_str:', query_str)            
        return query_str
    
    
    def run_query(self, query_str, field_type, batch_mode, job_uuid):
        
        field_values = []
        error_exists = False
            
        try:
            
            if batch_mode:
                
                batch_config = bigquery.QueryJobConfig(
                    # run at batch priority which won't count toward concurrent rate limit
                    priority=bigquery.QueryPriority.BATCH
                )
                
                query_job = self.bq_client.query_and_wait(query_str, job_config=batch_config)
                job = self.bq_client.get_job(query_job.job_id, location=query_job.location)
                rows = job.result()
            
            else:
                print('query_str:', query_str)
                rows = self.bq_client.query_and_wait(query_str)
            
            # if query expression is well-formed, there should only be a single row returned with a single field_value
            # However, user may mistakenly run a query that returns a list of rows. In that case, grab only the top row.  
            row_count = 0

            for row in rows:
                row_count = row_count + 1
                field_values.append(row[0])
            
                if field_type != 'richtext' and row_count == 1:
                    return field_values, error_exists
        
            # check row_count
            if row_count == 0:
                #error_exists = True
                print('sql query returned nothing:', query_str)
        
        except Exception as e:
            error_exists = True
            msg = 'Error occurred during run_query {}'.format(query_str)
            log_error(msg, e, job_uuid)
            
        #print('field_values: ', field_values)
        
        return field_values, error_exists
        

    def run_combined_query(self, combined_query, column, fields, job_uuid):
        
        error_exists = False
            
        try:
            rows = self.bq_client.query_and_wait(combined_query)
            row_count = 0

            for row in rows:
                for i, field in enumerate(fields):
                    field['field_value'] = row[i]
            
                row_count += 1    
        
            if row_count == 0:
                error_exists = True
                print('sql query returned empty set:', combined_query)
        
        except Exception as e:
            error_exists = True
            msg = 'Error occurred during run_combined_query {}'.format(combined_query)
            log_error(msg, e, job_uuid)
            
        return fields, error_exists


    def populate_tag_fields(self, tag, fields, job_uuid=None):
        
        for field in fields:
            tag, error_exists = self.populate_tag_field(tag, field['field_id'], field['field_type'], field['field_value'], job_uuid)
        
        return tag, error_exists
        
        
    def populate_tag_field(self, tag, field_id, field_type, field_values, job_uuid=None):
        
        error_exists = False
        
        # handle richtext types
        if type(field_values) == list:
            field_value = field_values[0]
        else:
            field_value = field_values
        
        if field_values == None:
            print('Cannot store null value in tag field', field_id)
            return tag, error_exists
        
        try:             
            if field_type == "bool":
                bool_field = datacatalog.TagField()
                bool_field.bool_value = bool(field_value)
                tag.fields[field_id] = bool_field
            if field_type == "string":
                string_field = datacatalog.TagField()
                string_field.string_value = str(field_value)
                tag.fields[field_id] = string_field
            if field_type == "richtext":
                richtext_field = datacatalog.TagField()
                formatted_value = '<br>'.join(str(v) for v in field_values)
                richtext_field.richtext_value = str(formatted_value)
                tag.fields[field_id] = richtext_field
            if field_type == "double":
                float_field = datacatalog.TagField()
                float_field.double_value = float(field_value)
                tag.fields[field_id] = float_field
            if field_type == "enum":
                enum_field = datacatalog.TagField()
                enum_field.enum_value.display_name = field_value
                tag.fields[field_id] = enum_field
            if field_type == "datetime" or field_type == "timestamp":
                # expected format for datetime values in DC: 2020-12-02T16:34:14Z
                # however, field_value can be a date value e.g. "2022-05-08", a datetime value e.g. "2022-05-08 15:00:00"
                # or timestamp value e.g. datetime.datetime(2022, 9, 14, 18, 24, 31, 615000, tzinfo=datetime.timezone.utc)
                #print('field_value:', field_value)
                #print('field_value type:', type(field_value))
                
                # we have a datetime value
                # example: 2024-03-30 18:29:48.621617+00:00 
                if type(field_value) == datetime:
                    timestamp = Timestamp()
                    timestamp.FromDatetime(field_value)
                # we have a date value
                elif type(field_value) == date:
                    dt = datetime.combine(field_value, datetime.min.time())
                    timestamp = pytz.utc.localize(dt)
                # we have a date cast as a string
                elif len(str(field_value)) == 10:
                    utc = pytz.timezone('UTC')
                    d = date(int(field_value[0:4]), int(field_value[5:7]), int(field_value[8:10]))
                    dt = datetime.combine(d, dtime(00, 00)) # when no time is supplied, default to 12:00:00 AM UTC
                    timestamp = utc.localize(dt)
                # we have a timestamp with this format: '2022-12-05 15:05:26'
                elif len(str(field_value)) == 19:
                    year = int(field_value[0:4])
                    month = int(field_value[5:7])
                    day = int(field_value[8:10])
                    hour = int(field_value[11:13])
                    minute = int(field_value[14:16])
                    second = int(field_value[17:19])
                    dt = datetime(year, month, day, hour, minute, second)
                    timestamp = pytz.utc.localize(dt) 
                # we have a timestamp cast as a string
                else:
                    timestamp_value = field_value.isoformat()
                    field_value = timestamp_value[0:19] + timestamp_value[26:32] + "Z"
                    timestamp = Timestamp()
                    timestamp.FromJsonString(field_value[0])
                
                #print('timestamp:', timestamp)
                datetime_field = datacatalog.TagField()
                datetime_field.timestamp_value = timestamp
                tag.fields[field_id] = datetime_field
                
        except Exception as e:
            error_exists = True
            msg = "Error storing values {} into field {}".format(field_values, field_id)
            log_error(msg, e, job_uuid)
        
        return tag, error_exists
    
    
    def copy_tags(self, source_project, source_dataset, source_table, target_project, target_dataset, target_table, include_policy_tags=False):
        
        success = True
        
        # lookup the source entry
        linked_resource = '//bigquery.googleapis.com/projects/{0}/datasets/{1}/tables/{2}'.format(source_project, source_dataset, source_table)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource = linked_resource
        source_entry = self.client.lookup_entry(request)
        
        if source_entry.bigquery_table_spec.table_source_type != types.TableSourceType.BIGQUERY_TABLE:
            success = False
            msg = 'Error {} is not a BQ table'.format(source_table)
            log_info(msg, None)
            print(json.dumps(msg))
            return success
        
        # lookup the target entry
        linked_resource = '//bigquery.googleapis.com/projects/{0}/datasets/{1}/tables/{2}'.format(target_project, target_dataset, target_table)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource = linked_resource
        target_entry = self.client.lookup_entry(request)
        
        if target_entry.bigquery_table_spec.table_source_type != types.TableSourceType.BIGQUERY_TABLE:
            success = False
            msg = 'Error {} is not a BQ table'.format(target_table)
            log_info(msg, None)
            print(json.dumps(error))
            return success
        
        # look to see if the source table is tagged
        tag_list = self.client.list_tags(parent=source_entry.name, timeout=120)
    
        for source_tag in tag_list:
            print('source_tag.template:', source_tag.template)
            print('source_tag.column:', source_tag.column)
            
            # get tag template fields
            self.template_id = source_tag.template.split('/')[5]
            self.template_project = source_tag.template.split('/')[1]
            self.template_region = source_tag.template.split('/')[3]
            self.template_path = source_tag.template
            template_fields = self.get_template()
            
            # start a new target tag
            target_tag = datacatalog.Tag()
            target_tag.template = source_tag.template
            
            if source_tag.column:
                target_tag.column = source_tag.column
            
            for template_field in template_fields:
    
                #print('template_field:', template_field)
                
                if template_field['field_id'] in source_tag.fields:
                    field_id = template_field['field_id']
                    tagged_field = source_tag.fields[field_id]
                    
                    print('field_id:', field_id)
                    
                    if tagged_field.bool_value:
                        field_type = 'bool'
                        field_value = tagged_field.bool_value
                    if tagged_field.double_value:
                        field_type = 'double'
                        field_value = tagged_field.double_value
                    if tagged_field.string_value:
                        field_type = 'string'
                        field_value = tagged_field.string_value
                    if tagged_field.enum_value:
                        field_type = 'enum'
                        field_value = tagged_field.enum_value.display_name
                    if tagged_field.timestamp_value:
                        field_type = 'timestamp'
                        field_value = tagged_field.timestamp_value
                    if tagged_field.richtext_value:
                        field_type = 'richtext'
                        field_value = tagged_field.richtext_value
                        
                    target_tag, error_exists = self.populate_tag_field(target_tag, field_id, field_type, [field_value], None)
            
            # create the target tag            
            tag_exists, tag_id = self.check_if_tag_exists(parent=target_entry.name, column=source_tag.column)
		
            if tag_exists == True:
                target_tag.name = tag_id
            
                try:
                    print('tag update request: ', target_tag)
                    response = self.client.update_tag(tag=target_tag)
                except Exception as e:
                    success = False
                    msg = 'Error occurred during tag update: {}'.format(target_tag)
                    log_error(msg, e)
            
            else:
                try:
                    print('tag create request: ', target_tag)
                    response = self.client.create_tag(parent=target_entry.name, tag=target_tag)
                except Exception as e:
                    success = False
                    msg = 'Error occurred during tag create: {}'.format(target_tag)
                    log_error(msg, e)
                        
        # copy policy tags            
        success = self.copy_policy_tags(source_project, source_dataset, source_table, target_project, target_dataset, target_table)    
        
        return success

    
    def copy_policy_tags(self, source_project, source_dataset, source_table, target_project, target_dataset, target_table):
    
        success = True
        source_table_id = source_project + '.' + source_dataset + '.' + source_table
        target_table_id = target_project + '.' + target_dataset + '.' + target_table
    
        try:
            source_schema = self.bq_client.get_table(source_table_id).schema
        except Exception as e:
            success = False
            msg = 'Error occurred while retrieving the schema of {}'.format(source_table_id)
            log_error(msg, e)
            return success 
    
        policy_tag_list = []
    
        for field in source_schema:
            if field.policy_tags != None:
                policy_tag = field.policy_tags.names[0]
                pt_tuple = (field.name, policy_tag)
                policy_tag_list.append(pt_tuple)
	
        if len(policy_tag_list) == 0:
            return success
    
        print('policy_tag_list:', policy_tag_list)
        success = self.apply_policy_tags(target_table_id, policy_tag_list)
    
        return success
    
    # used to update the status of a data product tag as part of the product_registration_pipeline
    # https://github.com/GoogleCloudPlatform/datacatalog-tag-engine/tree/main/examples/product_registration_pipeline    
    def update_tag_subset(self, template_id, template_project, template_region, entry_name, changed_fields):
        
        success = True
        
        tag_list = self.client.list_tags(parent=entry_name, timeout=120)
    
        for tag in tag_list:
            print('tag.template:', tag.template)
            
            # get tag template fields
            tagged_template_id = tag.template.split('/')[5]
            tagged_template_project = tag.template.split('/')[1]
            tagged_template_region = tag.template.split('/')[3]
            
            if tagged_template_id != template_id:
                continue
            
            if tagged_template_project != template_project:
                continue
                
            if tagged_template_region != template_region:
                continue
                
            # start a new target tag to overwrite the existing one
            target_tag = datacatalog.Tag()
            target_tag.template = tag.template
            target_tag.name = tag.name
            
            self.template_path = tag.template
            template_fields = self.get_template()
            
            for template_field in template_fields:
    
                #print('template_field:', template_field)
                field_id = template_field['field_id']
                
                # skip this field if it's not in the tag
                if field_id not in tag.fields:
                    continue
                    
                tagged_field = tag.fields[field_id]
                    
                if tagged_field.bool_value:
                    field_type = 'bool'
                    field_value = str(tagged_field.bool_value)
                if tagged_field.double_value:
                    field_type = 'double'
                    field_value = str(tagged_field.double_value)
                if tagged_field.string_value:
                    field_type = 'string'
                    field_value = tagged_field.string_value
                if tagged_field.enum_value:
                    field_type = 'enum'
                    field_value = str(tagged_field.enum_value.display_name)
                if tagged_field.timestamp_value:
                    field_type = 'timestamp'
                    field_value = str(tagged_field.timestamp_value)
                    print('orig timestamp:', field_value)
                if tagged_field.richtext_value:
                    field_type = 'richtext'
                    field_value = str(tagged_field.richtext_value)
        		
                # overwrite logic
                for changed_field in changed_fields: 
                    if changed_field['field_id'] == field_id:
                        field_value = changed_field['field_value']
                        break
                
                target_tag, error_exists = self.populate_tag_field(target_tag, field_id, field_type, [field_value], None)
                
                if error_exists:
                    msg = 'Error while populating the tag field. Aborting tag update.'
                    error = {'msg': msg}
                    print(json.dumps(error))
                    
                    success = False
                    return success

            # update the tag
            try:
                print('tag update request: ', target_tag)
                response = self.client.update_tag(tag=target_tag)
            except Exception as e:
                success = False
                msg = 'Error occurred during tag update: {}'.format(tag)
                log_error(msg, e)
 
        return success 
                        
        
if __name__ == '__main__':
    
    import google.auth
    from google.auth import impersonated_credentials
    SCOPES = ['openid', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/userinfo.email']
    
    source_credentials, _ = google.auth.default() 
    target_service_account = config['DEFAULT']['TAG_CREATOR_SA']
     
    credentials = impersonated_credentials.Credentials(source_credentials=source_credentials,
        target_principal=target_service_account,
        target_scopes=SCOPES,
        lifetime=1200)
        
    template_id = 'data_sensitivity'
    template_project = 'tag-engine-run'
    template_region = 'us-central1' 
    
    job_uuid = 'df0ddb3e477511ef95dc42004e494300' 
    config_uuid = '3404d03a477a11ef995442004e494300'
    data_asset_type = 'fileset'
    data_asset_region = 'us-central1'
    tag_dict = {'project': 'tag-engine-run', 'entry_group': 'sakila_eg', 'fileset': 'staff', 'column': 'first_name', 'sensitive_field': 'TRUE', 'sensitive_type': 'Sensitive_Personal_Identifiable_Information'}
    tag_history = True
    overwrite = True
    
    dcu = DataCatalogController(credentials, target_service_account, 'scohen@gcp.solutions', template_id, template_project, template_region)
    dcu.apply_import_config(job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history, overwrite)
    
        