# Copyright 2024-2025 Google, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import requests, configparser
from operator import itemgetter
import json
import os

from google.protobuf import struct_pb2
from google.protobuf import json_format

from google.api_core.gapic_v1.client_info import ClientInfo
from google.cloud import dataplex
from google.cloud.dataplex import CatalogServiceClient
from google.cloud import bigquery

import Resources as res
import BigQueryUtils as bq
import constants
from common import log_error, log_error_tag_dict, log_info, log_info_tag_dict

config = configparser.ConfigParser()
config.read("tagengine.ini")

BIGQUERY_REGION = config['DEFAULT']['BIGQUERY_REGION']
USER_AGENT = 'cloud-solutions/datacatalog-tag-engine-v3'

class DataplexController:
    
    def __init__(self, credentials, tag_creator_account=None, tag_invoker_account=None, \
                 aspect_type_id=None, aspect_type_project=None, aspect_type_region=None):
        self.credentials = credentials
        self.tag_creator_account = tag_creator_account
        self.tag_invoker_account = tag_invoker_account
        self.aspect_type_id = aspect_type_id
        self.aspect_type_project = aspect_type_project
        self.aspect_type_region = aspect_type_region
        self.client = CatalogServiceClient(credentials=self.credentials, client_info=ClientInfo(user_agent=USER_AGENT))
        
        if aspect_type_id != None and aspect_type_project != None and aspect_type_region != None:
            self.aspect_type_path = self.client.aspect_type_path(aspect_type_project, aspect_type_region, aspect_type_id)
        else:
            self.aspect_type_path = None
            
        self.bq_client = bigquery.Client(credentials=self.credentials, location=BIGQUERY_REGION, client_info=ClientInfo(user_agent=USER_AGENT))
        
    # note: included_fields can be populated or null
    # null = we want to return all the fields from the template
    def get_aspect_type(self, included_fields=None):
        
        print('enter get_aspect_type()')
        #print('included_fields:', included_fields)
              
        aspect_fields = []
        
        try:
            aspect_type = self.client.get_aspect_type(name=self.aspect_type_path)
            #print('aspect_type:', aspect_type)
        
        except Exception as e:
            msg = f'Error retrieving aspect type {self.aspect_type_path}'
            log_error(msg, e)
            return fields
        
        record_fields = aspect_type.metadata_template.record_fields
        #print('record_fields:', record_fields)

        for field in record_fields:
            
            match_found = False
            
            if included_fields != None:
                
                for included_field in included_fields:
                
                    if included_field['field_id'] == field.name:
                    
                        print('found_match:', field.name)
                    
                        match_found = True
                    
                        if 'field_value' in included_field:
                            assigned_value = included_field['field_value']
                        else:
                            assigned_value = None
                        
                        if 'query_expression' in included_field:
                            query_expression = included_field['query_expression']
                        else:
                            query_expression = None
                    
                        break
            
            if included_fields != None and match_found == False:
                continue
            
            enum_values = []

            if field.type_ == "enum":   
                for enum_value in field.enum_values:
                    enum_values.append(enum_value.name)
            
            # populate aspect_field dict
            aspect_field = {}
            aspect_field['field_id'] = field.name
            aspect_field['field_type'] = field.type_
            aspect_field['display_name'] = field.annotations.display_name
            aspect_field['is_required'] = field.constraints.required
            aspect_field['order'] = field.annotations.display_order
            
            if aspect_field['field_type'] == "enum":
                aspect_field['enum_values'] = enum_values
                
            if included_fields:
                if assigned_value:
                   aspect_field['field_value'] = assigned_value
                if query_expression:
                   aspect_field['query_expression'] = query_expression

            aspect_fields.append(aspect_field)
        
        sorted_aspect_fields = sorted(aspect_fields, key=itemgetter('order'))
        #print('sorted_aspect_fields:', sorted_aspect_fields)
                            
        return sorted_aspect_fields
    
    
    def check_column_exists(self, aspects, target_column):
        
        print('*** enter check_column_exists ***')
        print('target_column:', target_column)
        print('aspects:', aspects)
        
        # figure out if the target column is nested
        if len(target_column.split('.')) > 1:
            is_nested_column = True
            parent_column = target_column.split('.')[0]
            child_column = target_column.split('.')[1]
        else:
            is_nested_column = False
        
        column_exists = False
        
        for aspect_id, aspect_payload in aspects.items():
            if aspect_id.endswith('.global.schema'):
                break
        
        aspect_dict = json_format.MessageToDict(aspect_payload._pb)
        
        for field in aspect_dict['data']['fields']: 

            if field['name'] == target_column:
                column_exists = True
                break
            
            if is_nested_column and field['name'] == parent_column:
                subfields = field['fields']

                for subfield in subfields:
                    if subfield['name'] == child_column:
                        column_exists = True
                        break
                  
        return column_exists
    
    
    def check_aspect_exists(self, aspect_type_path, aspects):
        
        print('enter check_aspect_exists')
        print('aspect_type_path:', aspect_type_path)
        
        aspect_type_path_short = '.'.join(aspect_type_path.split('.')[1:])
                
        aspect_exists = False
        aspect_id = ""
        
        for aspect_id, aspect_data in aspects.items():
            if aspect_id.endswith(aspect_type_path_short):
                aspect_exists = True
                break
           
        return aspect_exists, aspect_id
    
    
    def apply_import_config(self, job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history, overwrite=False):
            
        print("*** apply_import_config ***")
        
        op_status = constants.SUCCESS
        
        if 'project' in tag_dict:
            project = tag_dict['project']
        else:
            msg = "Error: project info missing from CSV"
            log_error_tag_dict(msg, None, job_uuid, tag_dict)
            op_status = constants.ERROR
            return op_status
        
        if data_asset_type == constants.BQ_ASSET:
            if 'dataset' not in tag_dict:
                msg = "Error: could not find the required dataset field in the CSV"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            else:
                entry_type = constants.DATASET
                dataset = tag_dict['dataset']
                
                if 'table' in tag_dict:
                    table = tag_dict['table']
                    entry_type = constants.BQ_TABLE
            
        if data_asset_type == constants.FILESET_ASSET:
            if 'entry_group' not in tag_dict or 'fileset' not in tag_dict:
                msg = "Error: could not find the required fields in the CSV. Missing entry_group or fileset or both"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            else:
                entry_type = constants.FILESET
                entry_group = tag_dict['entry_group']
                entry_name = tag_dict['fileset']
        
        if data_asset_type == constants.SPAN_ASSET:
            if 'instance' not in tag_dict or 'database' not in tag_dict:
                msg = "Error: could not find the required fields in the CSV. The required fields for Spanner are instance and database"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            else:
               instance = tag_dict['instance']
               database = tag_dict['database']
               
               if 'table' in tag_dict:
                   table = tag_dict['table']
                   entry_type = constants.SPAN_TABLE
                   
                   if 'schema' in tag_dict:
                       schema = tag_dict['schema']
                   else:
                       schema = None
               else:
                   table = None
               
                   if 'schema' in tag_dict:
                       schema = tag_dict['schema']
                       entry_type = constants.SPAN_SCHEMA
                   else:
                       schema = None
                       entry_type = constants.SPAN_DATABASE
        
        if data_asset_type == constants.SQL_ASSET:
            if 'instance' not in tag_dict or 'database' not in tag_dict:
                msg = "Error: could not find the required fields in the CSV. The required fields for Cloud SQL are instance and database"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            else:
                instance = tag_dict['instance']
                database = tag_dict['database']
               
                if 'table' in tag_dict:
                   table = tag_dict['table']
                   entry_type = constants.SQL_TABLE
                   
                   if 'schema' in tag_dict:
                       schema = tag_dict['schema']
                   else:
                       schema = None
                   
                else:
                   table = None
                   if 'schema' in tag_dict:
                       schema = tag_dict['schema']
                       entry_type = constants.SQL_SCHEMA
                   else:
                       schema = None
                       entry_type = constants.SQL_DATABASE     


        # BQ entry types (table, dataset)                                     
        if entry_type == constants.BQ_TABLE:
            entry_name = f'bigquery.googleapis.com/projects/{project}/datasets/{dataset}/tables/{table}'
            entry_group = '@bigquery'
        
        if entry_type == constants.DATASET:
            entry_name = f'bigquery.googleapis.com/projects/{project}/datasets/{dataset}'
            entry_group = '@bigquery'
              
        # Spanner entry types (table, schema, database)    
        if entry_type == constants.SPAN_TABLE:
            entry_group = '@spanner'
            
            if schema:
                entry_name = f'spanner.googleapis.com/projects/{project}/instances/{instance}/databases/{database}/tables/{schema}.{table}'
            else:
                entry_name = f'spanner.googleapis.com/projects/{project}/instances/{instance}/databases/{database}/tables/{table}'
            
        if entry_type == constants.SPAN_SCHEMA:
            entry_group = '@spanner'
            entry_name = f'spanner.googleapis.com/projects/{project}/instances/{instance}/databases/{database}/tables/{schema}'
        
        if entry_type == constants.SPAN_DATABASE:
            entry_group = '@spanner'
            entry_name = f'spanner.googleapis.com/projects/{project}/instances/{instance}/databases/{database}'
            
        
        # Cloud SQL entry types (table, database)
        if entry_type == constants.SQL_TABLE:
            entry_group = '@cloudsql'
            
            if schema:
                entry_name = f'cloudsql.googleapis.com/projects/{project}/locations/{data_asset_region}/instances/{instance}/databases/{database}/schemas/{schema}/tables/{table}'
            else:
                entry_name = f'cloudsql.googleapis.com/projects/{project}/locations/{data_asset_region}/instances/{instance}/databases/{database}/tables/{table}'
                
        if entry_type == constants.SQL_SCHEMA:
            entry_group = '@cloudsql'
            entry_name = f'cloudsql.googleapis.com/projects/{project}/locations/{data_asset_region}/instances/{instance}/databases/{database}/schemas/{schema}'
        
        if entry_type == constants.SQL_DATABASE:
            entry_group = '@cloudsql'
            entry_name = f'cloudsql.googleapis.com/projects/{project}/locations/{data_asset_region}/instances/{instance}/databases/{database}'

        entry_path = f'projects/{project}/locations/{data_asset_region}/entryGroups/{entry_group}/entries/{entry_name}'
        
        entry_request = dataplex.GetEntryRequest(
            name=entry_path,
            view=dataplex.EntryView.ALL
        ) 
          
        try:
            entry = self.client.get_entry(request=entry_request)
            #print('entry:', entry)
        except Exception as e:
            msg = f"Error could not locate entry {entry_name}"
            log_error_tag_dict(msg, e, job_uuid, tag_dict)
            op_status = constants.ERROR
            return op_status

        # format uri for tag history table
        if data_asset_type == constants.BQ_ASSET:
            uri = entry.name.replace('bigquery.googleapis.com/projects/', '')
        
        if data_asset_type == constants.FILESET_ASSET:
            uri = entry.name.replace('projects/', '')
        
        if data_asset_type == constants.SPAN_ASSET:
            uri = entry.name.replace('spanner.googleapis.com/projects/', '')
        
        if data_asset_type == constants.SQL_ASSET:
            uri = entry.name.replace('cloudsql.googleapis.com/projects/', '')
                   
        target_column = None
        
        if 'column' in tag_dict:
            target_column = tag_dict['column'] 
            
            column_exists = self.check_column_exists(entry.aspects, target_column)
            print('column_exists:', column_exists)
            
            if column_exists == False:
                msg = f"Error could not find target column {target_column} in {entry.name}"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            
            uri = uri + '/column/' + target_column
            aspect_type_path = f'{self.aspect_type_project}.{self.aspect_type_region}.{self.aspect_type_id}@Schema.{target_column}'
        else:
            aspect_type_path = f'{self.aspect_type_project}.{self.aspect_type_region}.{self.aspect_type_id}'
        
        try:    
            aspect_exists, aspect_id = self.check_aspect_exists(aspect_type_path, entry.aspects)
            print('aspect_exists:', aspect_exists)
            
        except Exception as e:
            msg = f"Error during check_if_aspect_exists: {entry.name}"
            log_error_tag_dict(msg, e, job_uuid, tag_dict)
            op_status = constants.ERROR
            return op_status

        if aspect_exists and overwrite == False:
            msg = "Info: Aspect already exists and overwrite flag is False"
            log_info_tag_dict(msg, job_uuid, tag_dict)
            op_status = constants.SUCCESS
            return op_status
        
        aspect_fields = []
        aspect_type_fields = self.get_aspect_type()
        #print("aspect_type_fields:", aspect_type_fields)
        
        for field_name in tag_dict:
           
            if field_name == 'project' or field_name == 'dataset' or field_name == 'table' or \
                field_name == 'column' or field_name == 'entry_group' or field_name == 'fileset' or \
                field_name == 'instance' or field_name == 'database' or field_name == 'schema':
                continue
        
            found_field = False
            field_value = tag_dict[field_name]
            
            for aspect_type_field in aspect_type_fields:
                if field_name == aspect_type_field['field_id']:
                    field_type = aspect_type_field['field_type']
                    found_field = True
                    break
    
            if found_field != True:
                print('Error preparing the aspect. {field_name} was not found in {self.aspect_type_id}')
                op_status = constants.ERROR
                return op_status
    
            if field_type == 'bool':
                if field_value in ('True', 'TRUE', 'true'):
                    field_value = True
                else:
                    field_value = False
            
            elif field_type in ('datetime'):
                # timestamp needs to look like this: "2024-07-31T05:00:00.000Z"
                if len(field_value) == 10:
                    field_value = f"{field_value}T12:00:00.00Z"
                if len(field_value) == 19:
                    ts = field_value.replace(' ', 'T')
                    field_value = f"{ts}.00Z" 
                    
            elif field_type == 'double':
                    field_value = float(field_value)
            
            elif field_type == 'int':
                    field_value = int(field_value)
                    
            # this check allows for aspects with empty enums to get created, otherwise the empty enum gets flagged because DC thinks that you are storing an empty string as the enum value
            if field_type == 'enum' and field_value == '':
                continue

            aspect_fields.append({'field_id': field_name, 'field_type': field_type, 'field_value': field_value})
        

        op_status = self.create_update_delete_aspect(aspect_fields, aspect_type_path, entry_path, job_uuid, config_uuid, 'IMPORT_TAG', tag_history, uri, target_column)
                                
        return op_status
        
    
    def apply_dynamic_table_config(self, fields, uri, job_uuid, config_uuid, aspect_type_uuid, tag_history):
        
        print('*** apply_dynamic_table_config ***')
        #print('fields:', fields)
        #print('uri:', uri)
        #print('job_uuid:', job_uuid)
        #print('config_uuid:', config_uuid)
        #print('aspect_type_uuid:', aspect_type_uuid)
        #print('tag_history:', tag_history)
        
        op_status = constants.SUCCESS
        error_exists = False
        
        bigquery_project = uri.split('/')[0]

        # TO DO: allow user to overwrite default region with config value
        bigquery_region = BIGQUERY_REGION # default to the region from the ini file 
        
        entry_name = f'bigquery.googleapis.com/projects/{uri}'
        entry_path = f'projects/{bigquery_project}/locations/{bigquery_region}/entryGroups/@bigquery/entries/{entry_name}'
        
        entry_request = dataplex.GetEntryRequest(
            name=entry_path,
            view=dataplex.EntryView.ALL
        )

        try:
            entry = self.client.get_entry(request=entry_request)
            #print('entry:', entry)
        except Exception as e:
            msg = f"Error could not locate entry {entry_name}"
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status

        # run query expressions
        verified_field_count = 0
        
        for field in fields:
            field_id = field['field_id']
            field_type = field['field_type']
            query_expression = field['query_expression']

            # parse the query expression
            query_str = self.parse_query_expression(uri, query_expression)
            print('returned query_str: ' + query_str)
            
            # run the SQL query
            # note: field_values is of type list
            field_values, error_exists = self.run_query(query_str, field_type, job_uuid)
    
            if error_exists or field_values == []:
                continue
                      
            verified_field_count = verified_field_count + 1
            
            if field_type == 'richtext':
                formatted_value = ', '.join(str(v) for v in field_values)
            else:
                formatted_value = field_values[0]
               
            field['field_value'] = formatted_value
       
        if verified_field_count == 0:
            # aspect is empty due to SQL errors, skip aspect creation
            op_status = constants.ERROR
            return op_status
                           
        aspect_type_path = f'{self.aspect_type_project}.{self.aspect_type_region}.{self.aspect_type_id}'
        
        op_status = self.create_update_delete_aspect(fields, aspect_type_path, entry_path, job_uuid, \
                                                     config_uuid, 'DYNAMIC_TAG_TABLE', tag_history, uri, None)
                         
        return op_status
    
    
    def parse_query_expression(self, uri, query_expression, column=None):
        
        query_str = None
        
        # analyze query expression
        from_index = query_expression.rfind(" from ", 0)
        where_index = query_expression.rfind(" where ", 0)
        project_index = query_expression.rfind("$project", 0)
        dataset_index = query_expression.rfind("$dataset", 0)
        table_index = query_expression.rfind("$table", 0)
        from_clause_table_index = query_expression.rfind(" from $table", 0)
        from_clause_backticks_table_index = query_expression.rfind(" from `$table`", 0)
        column_index = query_expression.rfind("$column", 0)
        
        if project_index != -1:
            project_end = uri.find('/') 
            project = uri[0:project_end]
            
        if dataset_index != -1:
            dataset_start = uri.find('/datasets/') + 10
            dataset_string = uri[dataset_start:]
            dataset_end = dataset_string.find('/') 
            
            if dataset_end == -1:
                dataset = dataset_string[0:]
            else:
                dataset = dataset_string[0:dataset_end]
        
        # $table referenced in from clause, use fully qualified table
        if from_clause_table_index > 0 or from_clause_backticks_table_index > 0:
             qualified_table = uri.replace('/project/', '.').replace('/datasets/', '.').replace('/tables/', '.')
             query_str = query_expression.replace('$table', qualified_table)
             
        # $table is referenced somewhere in the expression, replace $table with actual table name
        else:
        
            if table_index != -1:
                table_index = uri.rfind('/') + 1
                table_name = uri[table_index:]
                query_str = query_expression.replace('$table', table_name)
            
            # $project referenced in where clause too
            if project_index > -1:
                
                if query_str == None:
                    query_str = query_expression.replace('$project', project)
                else:
                    query_str = query_str.replace('$project', project)
                
                #print('query_str: ', query_str)
            
            # $dataset referenced in where clause too    
            if dataset_index > -1:

                if query_str == None:
                    query_str = query_expression.replace('$dataset', dataset)
                else:
                    query_str = query_str.replace('$dataset', dataset)
                    
                #print('query_str: ', query_str)
            
        # table not in query expression (e.g. select 'string')
        if table_index == -1 and query_str == None:
            query_str = query_expression
            
        if column_index != -1:
            
            if query_str == None:
                query_str = query_expression.replace('$column', column)
            else:
                query_str = query_str.replace('$column', column)
        
        #print('returning query_str:', query_str)            
        return query_str
    
    
    def run_query(self, query_str, field_type, job_uuid):
        
        field_values = []
        error_exists = False
            
        try:
            #print('query_str:', query_str)
            rows = self.bq_client.query_and_wait(query_str)
            
            # if query expression is well-formed, there should only be a single row returned with a single field_value
            # However, user may mistakenly run a query that returns a list of rows. In that case, grab only the top row.  
            row_count = 0

            for row in rows:
                row_count = row_count + 1
                field_values.append(row[0])
            
                if field_type != 'richtext' and row_count == 1:
                    return field_values, error_exists
        
            # check row_count
            if row_count == 0:
                #error_exists = True
                print('sql query returned nothing:', query_str)
        
        except Exception as e:
            error_exists = True
            msg = 'Error occurred during run_query {}'.format(query_str)
            log_error(msg, e, job_uuid)
            
        #print('field_values: ', field_values)
        
        return field_values, error_exists
    
    
    def apply_dynamic_column_config(self, fields, columns_query, uri, job_uuid, config_uuid, aspect_type_uuid, tag_history):
        
        print('*** apply_dynamic_column_config ***')
        #print('fields:', fields)
        #print('columns_query:', columns_query)
        #print('uri:', uri)
        #print('job_uuid:', job_uuid)
        #print('config_uuid:', config_uuid)
        #print('aspect_type_uuid:', aspect_type_uuid)
        #print('tag_history:', tag_history)
        
        op_status = constants.SUCCESS
        error_exists = False
        
        bigquery_project = uri.split('/')[0]

        # TO DO: allow user to overwrite default region with config value
        bigquery_region = BIGQUERY_REGION # default to the region from the ini file 
        
        entry_name = f'bigquery.googleapis.com/projects/{uri}'
        entry_path = f'projects/{bigquery_project}/locations/{bigquery_region}/entryGroups/@bigquery/entries/{entry_name}'
        
        entry_request = dataplex.GetEntryRequest(
            name=entry_path,
            view=dataplex.EntryView.ALL
        )

        try:
            entry = self.client.get_entry(request=entry_request)
            #print('entry:', entry)
        except Exception as e:
            msg = f"Error could not locate entry {entry_name}"
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status
                         
        target_columns = [] # columns in the table which need to be tagged
        
        columns_query = self.parse_query_expression(uri, columns_query)
        #print('columns_query:', columns_query)
        
        rows = self.bq_client.query(columns_query).result()

        num_columns = 0
        for row in rows:    
            for column in row:
                print('column:', column)
                target_columns.append(column)
                num_columns += 1
                 
        if num_columns == 0:
            # no columns to tag
            msg = f"Error could not find columns to tag. Please check column_query parameter in your config. Current value: {columns_query}"
            log_error(msg, None, job_uuid)
            op_status = constants.ERROR
            return op_status
                                      
        column_fields_list = [] # list<dictionaries> where dict = {column, fields}

        for target_column in target_columns:
            
            #print('target_column:', target_column)
            
            # fail quickly if a column is not found in the entry's schema
            column_exists = self.check_column_exists(entry.aspects, target_column)
            
            if column_exists != True:
                msg = f"Error could not find column {target_column} in {entry.name}"
                log_error(msg, None, job_uuid)
                op_status = constants.ERROR
                return op_status

            verified_field_count = 0
            query_strings = []
            
            for field in fields:
                query_expression = field['query_expression']
                query_str = self.parse_query_expression(uri, query_expression, target_column)
                query_strings.append(query_str)
            
            print('query_strings:', query_strings)
                
            # combine query expressions 
            combined_query = self.combine_queries(query_strings)
            
            # run combined query, adding the results to the field_values for each field
            # Note: field_values is of type list
            fields, error_exists = self.run_combined_query(combined_query, target_column, fields, job_uuid)
            
            print('fields:', fields)

            if error_exists:
                op_status = constants.ERROR
                continue
                                                                     
            aspect_type_path = f'{self.aspect_type_project}.{self.aspect_type_region}.{self.aspect_type_id}@Schema.{target_column}'
            uri_column = f'{uri}/column/{target_column}' 
            
            op_status = self.create_update_delete_aspect(fields, aspect_type_path, entry_path, job_uuid, \
                                                         config_uuid, 'DYNAMIC_TAG_COLUMN', tag_history, uri_column, target_column)
                        
            # fail fast if aspect does not get created, updated or deleted 
            if op_status == constants.ERROR:
                return op_status
                                              
        return op_status
        
    
    def combine_queries(self, query_strings):
        
        large_query = "select "
        
        for query in query_strings:
             large_query += "({}), ".format(query)
        
        return large_query[0:-2]
        
     
    def run_combined_query(self, combined_query, column, fields, job_uuid):
        
        error_exists = False
            
        try:
            rows = self.bq_client.query_and_wait(combined_query)
            row_count = 0

            for row in rows:
                for i, field in enumerate(fields):
                    field['field_value'] = row[i]
            
                row_count += 1    
        
            if row_count == 0:
                error_exists = True
                print('sql query returned empty set:', combined_query)
        
        except Exception as e:
            error_exists = True
            msg = 'Error occurred during run_combined_query {}'.format(combined_query)
            log_error(msg, e, job_uuid)
            
        return fields, error_exists
        
           
    def create_update_delete_aspect(self, aspect_fields, aspect_type_path, entry_path, job_uuid, config_uuid, config_type, tag_history, uri, target_column):
        
        #print("enter create_update_delete_tag")
        #print("aspect_fields:", aspect_fields)
        #print("aspect_type_path:", aspect_type_path)
        #print("entry_path:", entry_path)
        #print("job_uuid:", job_uuid)
        #print("config_uuid:", config_uuid)
        #print("config_type:", config_type)
        #print("tag_history:", tag_history)
        #print("uri:", uri)
        
        op_status = constants.SUCCESS
        valid_field = False
        
        num_fields = len(aspect_fields)
        num_empty_values = 0
        
        aspect = dataplex.Aspect()
        aspect.path = "Schema.descriptor"
        aspect_data_dict = {}
        
        for field in aspect_fields:
            aspect_data_dict[field['field_id']] = field['field_value']
                
        aspect_data_struct = struct_pb2.Struct()
        json_format.ParseDict(aspect_data_dict, aspect_data_struct, ignore_unknown_fields=False)
        aspect.data = aspect_data_struct
        
        entry = dataplex.Entry()
        entry.name = entry_path
        entry.aspects = {aspect_type_path: aspect}
                
        #print("submitting entry for update:", entry)
        
        try:
            update_entry_request = dataplex.UpdateEntryRequest(
                entry=entry,
                update_mask="aspects",
            )

            resp = self.client.update_entry(request=update_entry_request)
            #print('update entry resp:', resp)
 
        except Exception as e:
            msg = f"Error while updating the entry"
            log_error(msg, error=str(e), job_uuid=job_uuid)
            op_status = constants.ERROR
            return op_status
        
        if tag_history and op_status == constants.SUCCESS:
            bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
            aspect_type_fields = self.get_aspect_type()
            success = bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.aspect_type_id, aspect_type_fields, uri, target_column, aspect_fields)
            
            if success == False:
                msg = 'Error occurred while writing to tag history table'
                log_error(msg, error='', job_uuid=job_uuid)
                op_status = constants.ERROR
       
        return op_status
    
                            
if __name__ == '__main__':
    
    import google.auth
    from google.auth import impersonated_credentials
    SCOPES = ['openid', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/userinfo.email']
    
    source_credentials, _ = google.auth.default() 
    target_service_account = config['DEFAULT']['TAG_CREATOR_SA']
     
    credentials = impersonated_credentials.Credentials(source_credentials=source_credentials,
        target_principal=target_service_account,
        target_scopes=SCOPES,
        lifetime=1200)
        
    aspect_type_id = 'data-governance'
    aspect_type_project = 'tag-engine-develop'
    aspect_type_region = 'us-central1'
    aspect_type_uuid = 'Bofcfg9kkkFz4d0Dk2SM'
    
    job_uuid = '238f7420f7a211ef915a42004e494300'
    config_uuid = 'b8a4616ef79e11efa14242004e494300'
    data_asset_type = 'fileset'
    data_asset_region = 'us-central1'
    tag_dict = {'project': 'tag-engine-develop', 'entry_group': 'sakila_eg', 'fileset': 'city', 'data_domain': 'LOGISTICS', 'broad_data_category': 'CONTENT', 'data_creation': '2023-11-10', 'data_ownership': 'THIRD_PARTY_OPS', 'data_asset_owner': 'John Smith', 'data_confidentiality': 'PUBLIC', 'data_retention': 'DAYS_90', 'data_asset_documentation': 'https://dev.mysql.com/doc/sakila/en/sakila-structure.html'}
    tag_history = True
    
    dpc = DataplexController(credentials, target_service_account, 'scohen@gcp.solutions', aspect_type_id, aspect_type_project, aspect_type_region)

    dpc.apply_import_config(job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history)
    
   