def apply_dynamic_table_config()

in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]


    def apply_dynamic_table_config(self, fields, uri, job_uuid, config_uuid, template_uuid, tag_history, batch_mode=False):
        
        print('*** apply_dynamic_table_config ***')

        op_status = constants.SUCCESS
        error_exists = False
        
        bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
        
        #print('bigquery_resource: ', bigquery_resource)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource=bigquery_resource
        entry = self.client.lookup_entry(request)

        tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name)
        print("tag_exists: " + str(tag_exists))
        
        # create new tag
        tag = datacatalog.Tag()
        tag.template = self.template_path
        verified_field_count = 0
        
        for field in fields:
            field_id = field['field_id']
            field_type = field['field_type']
            query_expression = field['query_expression']

            # parse and run query in BQ
            query_str = self.parse_query_expression(uri, query_expression)
            print('returned query_str: ' + query_str)
            
            # note: field_values is of type list
            field_values, error_exists = self.run_query(query_str, field_type, batch_mode, job_uuid)
            print('field_values: ', field_values)
            print('error_exists: ', error_exists)
    
            if error_exists or field_values == []:
                continue
            
            tag, error_exists = self.populate_tag_field(tag, field_id, field_type, field_values, job_uuid)
    
            if error_exists:
                continue
                                    
            verified_field_count = verified_field_count + 1
            #print('verified_field_count: ' + str(verified_field_count))    
            
            # store the value back in the dict, so that it can be accessed by the exporter
            #print('field_value: ' + str(field_value))
            if field_type == 'richtext':
                formatted_value = ', '.join(str(v) for v in field_values)
            else:
                formatted_value = field_values[0]
                
            field['field_value'] = formatted_value
            
        # for loop ends here
                
        if error_exists:
            # error was encountered while running SQL expression
            # proceed with tag creation / update, but return error to user
            op_status = constants.ERROR
            
        if verified_field_count == 0:
            # tag is empty due to errors, skip tag creation
            op_status = constants.ERROR
            return op_status
                        
        if tag_exists == True:
            tag.name = tag_id
            op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
        else:
            op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
            
        if op_status == constants.SUCCESS and tag_history:
            bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
            template_fields = self.get_template()
            bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, None, fields)
               
        return op_status