def apply_dynamic_column_config()

in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]


    def apply_dynamic_column_config(self, fields, columns_query, uri, job_uuid, config_uuid, template_uuid, tag_history, batch_mode=False):
        
        print('*** apply_dynamic_column_config ***')

        tag_work_queue = [] # collection of Tag objects that will be passed to the API to be created or updated 
        
        op_status = constants.SUCCESS
        error_exists = False
        
        target_columns = [] # columns in the table which need to be tagged
        columns_query = self.parse_query_expression(uri, columns_query)
        print('columns_query:', columns_query)
        
        rows = self.bq_client.query(columns_query).result()

        num_columns = 0
        for row in rows:    
            for column in row:
                print('column:', column)
                target_columns.append(column)
                num_columns += 1
                 
        if num_columns == 0:
            # no columns to tag
            msg = f"Error could not find columns to tag. Please check column_query parameter in your config. Current value: {columns_query}"
            log_error(msg, None, job_uuid)
            op_status = constants.ERROR
            return op_status
                
        #print('columns to be tagged:', target_columns)
                    
        bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
        #print('bigquery_resource: ', bigquery_resource)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource=bigquery_resource
        entry = self.client.lookup_entry(request)
                    
        column_fields_list = [] # list<dictionaries> where dict = {column, fields}

        for target_column in target_columns:
            
            #print('target_column:', target_column)
            
            # fail quickly if a column is not found in the entry's schema
            column_exists = self.column_exists_in_table(target_column, entry.schema.columns)
            
            if column_exists != True:
                msg = f"Error could not find column {target_column} in {resource}"
                log_error(msg, None, job_uuid)
                op_status = constants.ERROR
                return op_status

            # initialize the new column-level tag
            tag = datacatalog.Tag()
            tag.template = self.template_path
            tag.column = target_column
            
            verified_field_count = 0
            query_strings = []
            
            for field in fields:
                query_expression = field['query_expression']
                query_str = self.parse_query_expression(uri, query_expression, target_column)
                query_strings.append(query_str)

            # combine query expressions 
            combined_query = self.combine_queries(query_strings)
            
            # run combined query, adding the results to the field_values for each field
            # Note: field_values is of type list
            fields, error_exists = self.run_combined_query(combined_query, target_column, fields, job_uuid)

            if error_exists:
                op_status = constants.ERROR
                continue
            
            # populate tag fields
            tag, error_exists = self.populate_tag_fields(tag, fields, job_uuid)
    
            if error_exists:
                op_status = constants.ERROR
                continue
                                                         
            column_fields_list.append({"column": target_column, "fields": fields})
            tag_work_queue.append(tag)

        # outer loop ends here
        if len(tag_work_queue) == 0:
            op_status = constants.ERROR
            return op_status
            
        # ready to create or update all the tags in work queue         
        rec_request = datacatalog.ReconcileTagsRequest(
            parent=entry.name,
            tag_template=self.template_path,
            tags=tag_work_queue
        )

        #print('rec_request:', rec_request)
        
        try:
            operation = self.client.reconcile_tags(request=rec_request)
            print("Waiting for operation to complete...")
            resp = operation.result()
            #print("resp:", resp)
        except Exception as e:
            msg = 'Error during reconcile_tags on entry {}'.format(entry.name)
            log_error(msg, e, job_uuid)
            op_status = constants.ERROR
            return op_status
                        
        if tag_history and op_status != constants.ERROR:
            bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
            success = bqu.copy_tags(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, self.get_template(), uri, column_fields_list)
            print('Tag history completed successfully:', success) 
            
            if success:
                op_status = constants.SUCCESS
            else:
                op_status = constants.ERROR
                                              
        return op_status