def apply_glossary_asset_config()

in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]


    def apply_glossary_asset_config(self, fields, mapping_table, uri, job_uuid, config_uuid, template_uuid, tag_history, overwrite=False):
        
        # uri is either a BQ table/view path or GCS file path    
        op_status = constants.SUCCESS
        
        is_gcs = False
        is_bq = False
        
        # look up the entry based on the resource type
        if isinstance(uri, list):
            is_gcs = True
            bucket = uri[0].replace('-', '_')
            filename = uri[1].split('.')[0].replace('/', '_') # extract the filename without the extension, replace '/' with '_'
            gcs_resource = '//datacatalog.googleapis.com/projects/' + self.template_project + '/locations/' + self.template_region + '/entryGroups/' + bucket + '/entries/' + filename
            #print('gcs_resource: ', gcs_resource)
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=gcs_resource
            
            try:
                entry = self.client.lookup_entry(request)
                print('entry: ', entry.name)
            except Exception as e:
                msg = 'Unable to find entry in the catalog. Entry {} does not exist: {}'.format(gcs_resource, e)
                log_error(msg, e, job_uuid)
                op_status = constants.ERROR
                return op_status
                #print('entry found: ', entry)
        
        elif isinstance(uri, str):
            is_bq = True        
            bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
            print("bigquery_resource: " + bigquery_resource)
        
            request = datacatalog.LookupEntryRequest()
            request.linked_resource=bigquery_resource
            entry = self.client.lookup_entry(request)
            print('entry: ', entry.name)
        
        try:    
            tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name)
            print('tag_exists: ', tag_exists)
        
        except Exception as e:
            msg = 'Error during check_if_tag_exists: {}'.format(e)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status

        if tag_exists and overwrite == False:
            msg = 'Info: tag already exists and overwrite set to False'
            error = {'job_uuid': job_uuid, 'msg': msg}
            print(json.dumps(info))
            
            op_status = constants.SUCCESS
            return op_status
         
        if entry.schema == None:
            msg = 'Error entry {} does not have a schema in the catalog'.format(entry.name)
            error = {'job_uuid': job_uuid, 'msg': msg}
            print(json.dumps(info))
            
            op_status = constants.ERROR
            return op_status
        
        # retrieve the schema columns from the entry
        column_schema_str = ''
        for column_schema in entry.schema.columns: 
            column_schema_str += "'" + column_schema.column + "',"
        
        #print('column_schema_str: ', column_schema_str)
             
        mapping_table_formatted = mapping_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')
                
        query_str = 'select canonical_name from `' + mapping_table_formatted + '` where source_name in (' + column_schema_str[0:-1] + ')'
        #print('query_str: ', query_str)

        rows = self.bq_client.query(query_str).result()
        
        tag = datacatalog.Tag()
        tag.template = self.template_path
        
        tag_is_empty = True
        
        for row in rows:
            canonical_name = row['canonical_name']
            #print('canonical_name: ', canonical_name)
        
            for field in fields:
                if field['field_id'] == canonical_name:
                    #print('found match')
                    bool_field = datacatalog.TagField()
                    bool_field.bool_value = True
                    tag.fields[canonical_name] = bool_field
                    field['field_value'] = True
                    tag_is_empty = False
                    break
                    
        if tag_is_empty:
            print("Error: can't create the tag because it's empty")
            op_status = constants.ERROR
            return op_status
                            
        if tag_exists:
            # tag already exists and overwrite is True
            tag.name = tag_id
            op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
        else:
            op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
                    
        if tag_history:
            bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
            template_fields = self.get_template()
            if is_gcs:
                bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, '/'.join(uri), None, fields)
            if is_bq:
                bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, None, fields)
                   
        return op_status