def apply_import_config()

in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]


    def apply_import_config(self, job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history, overwrite=False):
    
        print(f'apply_import_config: {job_uuid}, {config_uuid}, {data_asset_type}, {data_asset_region}, {tag_dict}, {tag_history}')
        
        op_status = constants.SUCCESS
        
        if 'project' in tag_dict:
            project = tag_dict['project']
        else:
            msg = "Error: project info missing from CSV"
            log_error_tag_dict(msg, None, job_uuid, tag_dict)
            op_status = constants.ERROR
            return op_status
        
        if data_asset_type == constants.BQ_ASSET:
            if 'dataset' not in tag_dict:
                msg = "Error: could not find the required dataset field in the CSV"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            else:
                entry_type = constants.DATASET
                dataset = tag_dict['dataset']
                
                if 'table' in tag_dict:
                    table = tag_dict['table']
                    entry_type = constants.BQ_TABLE
            
        if data_asset_type == constants.FILESET_ASSET:
            if 'entry_group' not in tag_dict or 'fileset' not in tag_dict:
                msg = "Error: could not find the required fields in the CSV. Missing entry_group or fileset or both"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            else:
                entry_type = constants.FILESET
                entry_group = tag_dict['entry_group']
                fileset = tag_dict['fileset']
        
        if data_asset_type == constants.SPAN_ASSET:
            if 'instance' not in tag_dict or 'database' not in tag_dict or 'table' not in tag_dict:
                msg = "Error: could not find the required fields in the CSV. The required fields for Spanner are instance, database, and table"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            else:
               entry_type = constants.SPAN_TABLE
               instance = tag_dict['instance']
               database = tag_dict['database']
               
               if 'schema' in tag_dict:
                   schema = tag_dict['schema']
                   table = tag_dict['table']
                   table = f"`{schema}.{table}`"
               else:
                   table = tag_dict['table']
                                
        if entry_type == constants.DATASET:
            resource = f'//bigquery.googleapis.com/projects/{project}/datasets/{dataset}'
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=resource
            
        if entry_type == constants.BQ_TABLE:
            resource = f'//bigquery.googleapis.com/projects/{project}/datasets/{dataset}/tables/{table}'
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=resource
         
        if entry_type == constants.FILESET:
            resource = f'//datacatalog.googleapis.com/projects/{project}/locations/{data_asset_region}/entryGroups/{entry_group}/entries/{fileset}'
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=resource
            
        if entry_type == constants.SPAN_TABLE:
            resource = f'spanner:{project}.regional-{data_asset_region}.{instance}.{database}.{table}'
            request = datacatalog.LookupEntryRequest()
            request.fully_qualified_name=resource
            request.project=project
            request.location=data_asset_region

        try:
            entry = self.client.lookup_entry(request)
        except Exception as e:
            msg = "Error could not find {} entry for {}".format(entry_type, resource)
            log_error_tag_dict(msg, e, job_uuid, tag_dict)
            op_status = constants.ERROR
            return op_status

        # format uri for storing in tag history table
        if data_asset_type == constants.BQ_ASSET:
            uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '')
        if data_asset_type == constants.SPAN_ASSET:
            uri = entry.linked_resource.replace('///projects/', '').replace('instances', 'instance').replace('databases', 'database') + '/table/' + table.replace('`', '')
        if data_asset_type == constants.FILESET_ASSET:
            uri = entry.linked_resource.replace('//datacatalog.googleapis.com/projects/', '').replace('locations', 'location').replace('entryGroups', 'entry_group').replace('entries', 'entry')
        
        target_column = None
        
        if 'column' in tag_dict:
            target_column = tag_dict['column'] 
            
            column_exists = self.column_exists_in_table(target_column, entry.schema.columns)
            
            if column_exists == False:
                msg = f"Error could not find column {target_column} in {resource}"
                log_error_tag_dict(msg, None, job_uuid, tag_dict)
                op_status = constants.ERROR
                return op_status
            
            uri = uri + '/column/' + target_column
  
        try:    
            tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=target_column)

        except Exception as e:
            msg = f"Error during check_if_tag_exists: {entry.name}"
            log_error_tag_dict(msg, e, job_uuid, tag_dict)
            op_status = constants.ERROR
            return op_status

        if tag_exists and overwrite == False:
            msg = "Info: Tag already exists and overwrite flag is False"
            log_info_tag_dict(msg, job_uuid, tag_dict)
            op_status = constants.SUCCESS
            return op_status
        
        tag_fields = []
        template_fields = self.get_template()
        
        for field_name in tag_dict:
           
            if field_name == 'project' or field_name == 'dataset' or field_name == 'table' or \
                field_name == 'column' or field_name == 'entry_group' or field_name == 'fileset' or \
                field_name == 'instance' or field_name == 'database' or field_name == 'schema':
                continue
        
            field_type = None
            field_value = tag_dict[field_name].strip()
            
            for template_field in template_fields:
                if template_field['field_id'] == field_name:
                    field_type = template_field['field_type']
                    break
    
            if field_type == None:
                print('Error while preparing the tag. The field ', field_name, ' was not found in the tag template ', self.template_id)
                op_status = constants.ERROR
                return op_status
    
            # this check allows for tags with empty enums to get created, otherwise the empty enum gets flagged because DC thinks that you are storing an empty string as the enum value
            if field_type == 'enum' and field_value == '':
                continue
                
            field = {'field_id': field_name, 'field_type': field_type, 'field_value': field_value}    
            tag_fields.append(field)
            
        
        op_status = self.create_update_delete_tag(tag_fields, tag_exists, tag_id, job_uuid, config_uuid, 'IMPORT_TAG', tag_history, \
                                                  entry, uri, target_column)
                                
        return op_status