def apply_sensitive_column_config()

in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]


    def apply_sensitive_column_config(self, fields, dlp_dataset, infotype_selection_table, infotype_classification_table, \
                                      uri, create_policy_tags, taxonomy_id, job_uuid, config_uuid, template_uuid, \
                                      tag_history, overwrite=False):
        
        if create_policy_tags:

            request = datacatalog.ListPolicyTagsRequest(
                parent=taxonomy_id
            )

            try:
                page_result = self.ptm_client.list_policy_tags(request=request)
            except Exception as e:
                msg = 'Unable to retrieve the policy tag taxonomy for taxonomy_id {}'.format(taxonomy_id)
                log_error(msg, e, job_uuid)
                op_status = constants.ERROR
                return op_status    

            policy_tag_names = [] # list of fully qualified policy tag names and sensitive categories

            for response in page_result:
                policy_tag_names.append((response.name, response.display_name))

            policy_tag_requests = [] # stores the list of fully qualified policy tag names and table column names, 
                                     # so that we can create the policy tags on the various sensitive fields
 
        # uri is a BQ table path       
        op_status = constants.SUCCESS
        column = ''
        
        if isinstance(uri, str) == False:
            print('Error: url ' + str(url) + ' is not of type string.')
            op_status = constants.ERROR
            return op_status
            
        bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
        #print("bigquery_resource: ", bigquery_resource)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource=bigquery_resource
        
        try:
            entry = self.client.lookup_entry(request)
        except Exception as e:
            msg = 'Error looking up entry {} in the catalog: {}'.format(bigquery_resource, e)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status
           
        dlp_dataset = dlp_dataset.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')        
        infotype_selection_table = infotype_selection_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')
        infotype_classification_table = infotype_classification_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')
        dlp_table = dlp_dataset + '.' + uri.split('/')[4]
               
        infotype_fields = []
        notable_infotypes = []
    
        # get an array of infotypes associated with each field in the DLP findings table
        dlp_sql = 'select field, array_agg(infotype) infotypes '
        dlp_sql += 'from (select distinct cl.record_location.field_id.name as field, info_type.name as infotype '
        dlp_sql += 'from `' + dlp_table + '`, unnest(location.content_locations) as cl '
        dlp_sql += 'order by cl.record_location.field_id.name) '
        dlp_sql += 'group by field'
        
        try:
            dlp_rows = self.bq_client.query(dlp_sql).result()
        
        except Exception as e:
            msg = 'Error querying DLP findings table: {}'.format(dlp_sql)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status

        dlp_row_count = 0
    
        for dlp_row in dlp_rows:
        
            dlp_row_count += 1
        
            field = dlp_row['field']
            infotype_fields.append(field)
            infotypes = dlp_row['infotypes']
        
            print('field ', field, ', infotypes [', infotypes, ']')
        
            is_sql = 'select notable_infotype '
            is_sql += 'from `' + infotype_selection_table + '` i, '
        
            infotype_count = len(infotypes)
        
            for i in range(0, infotype_count):
            
                is_sql += 'unnest(i.field_infotypes) as i' + str(i) + ', '
        
            is_sql = is_sql[:-2] + ' '
            
            for i, infotype in enumerate(infotypes):
            
                if i == 0:
                    is_sql += 'where i' + str(i) + ' = "' + infotype + '" ' 
                else:
                    is_sql += 'and i' + str(i) + ' = "' + infotype + '" ' 
        
            is_sql += 'order by array_length(i.field_infotypes) '
            is_sql += 'limit 1'
        
            #print('is_sql: ', is_sql)
            
            try:
                ni_rows = self.bq_client.query(is_sql).result()
            except Exception as e:
                msg = 'Error querying infotype selection table: {}'.format(is_sql)
                log_error(msg, e, job_uuid)
                op_status = constants.ERROR
                return op_status
        
            for ni_row in ni_rows:
                notable_infotypes.append(ni_row['notable_infotype']) # there should be just one notable infotype per field
    
        # there are no DLP findings
        if dlp_row_count == 0:
            op_status = constants.SUCCESS
            return op_status
    
        # remove duplicate infotypes from notable list
        final_set = list(set(notable_infotypes))
        print('final_set: ', final_set)
        
        # lookup classification using set of notable infotypes   
        c_sql = 'select classification_result '
        c_sql += 'from `' + infotype_classification_table + '` c, '
    
        for i in range(0, len(final_set)):
            c_sql += 'unnest(c.notable_infotypes) as c' + str(i) + ', '
    
        c_sql = c_sql[:-2] + ' '
    
        for i, notable_infotype in enumerate(final_set):
        
            if i == 0:
                c_sql += 'where c' + str(i) + ' = "' + notable_infotype + '" '
            else:
                c_sql += 'and c' + str(i) + ' = "' + notable_infotype + '" '

        c_sql += 'order by array_length(c.notable_infotypes) '
        c_sql += 'limit 1'  

        #print('c_sql: ', c_sql)
    
        try:
            c_rows = self.bq_client.query(c_sql).result()
        except Exception as e:
            msg = 'Error querying infotype classification table: {}'.format(c_sql)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status
        
        classification_result = None
    
        for c_row in c_rows:
            classification_result = c_row['classification_result'] # we should end up with one classification result per table
    
        print('classification_result: ', classification_result)
        
        tag = datacatalog.Tag()
        tag.template = self.template_path
        
        # each element represents a field which needs to be tagged
        for infotype_field in infotype_fields:
            
            for field in fields:
                if 'sensitive_field' in field['field_id']:
                    bool_field = datacatalog.TagField()
                    
                    if classification_result == 'Public_Information':
                        bool_field.bool_value = False
                        field['field_value'] = False
                    else:
                        bool_field.bool_value = True
                        field['field_value'] = True
                    
                    tag.fields['sensitive_field'] = bool_field
                    
                if 'sensitive_type' in field['field_id']:
                    enum_field = datacatalog.TagField()
                    enum_field.enum_value.display_name = classification_result
                    tag.fields['sensitive_type'] = enum_field
                    field['field_value'] = classification_result
           
            tag.column = infotype_field # DLP has a bug and sometimes the infotype field does not equal to the column name in the table
            print('tag.column: ', infotype_field)
            
            # check if a tag already exists on this column
            try:    
                tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=infotype_field)
        
            except Exception as e:
                msg = 'Error during check_if_tag_exists: {}'.format(entry.name)
                log_error(msg, e, job_uuid)
                op_status = constants.ERROR
                return op_status   
            
            # tag already exists    
            if tag_exists:
                
                if overwrite == False:
                    # skip this sensitive column because it is already tagged
                    continue
                
                tag.name = tag_id
                op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
            else:
                op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
                                    
            if op_status == constants.SUCCESS and create_policy_tags and classification_result != 'Public_Information':
                # add the column name and policy tag name to a list
                for policy_tag_name, policy_tag_category in policy_tag_names:
                    if policy_tag_category == classification_result:
                        policy_tag_requests.append((infotype_field, policy_tag_name))
                    
                            
            if op_status == constants.SUCCESS and tag_history:
                bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
                template_fields = self.get_template()
                bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, infotype_field, fields)
        
                        
        # once we have created the regular tags, we can create/update the policy tags
        if create_policy_tags and len(policy_tag_requests) > 0:
            table_id = uri.replace('/datasets/', '.').replace('/tables/', '.')
            op_status = self.apply_policy_tags(table_id, policy_tag_requests)
        
        if op_status != constants.SUCCESS:
            msg = 'Error occurred when tagging {}'.format(uri) 
            error = {'job_uuid': job_uuid, 'msg': msg}
            print(json.dumps(error))
                
        return op_status