def copy_tags()

in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/BigQueryUtils.py [0:0]


    def copy_tags(self, tag_creator_account, tag_invoker_account, job_uuid, table_name, table_fields, asset_table, column_fields_list):

        print('enter BigQueryUtils.copy_tags')
        print('asset_table:', asset_table)
        print('column_fields_list:', column_fields_list)

        store = tesh.TagEngineStoreHandler()
        enabled, settings = store.read_tag_history_settings()

        bigquery_project = settings['bigquery_project']
        bigquery_dataset = settings['bigquery_dataset']

        rows_to_insert = []
        success = True

        dataset_exists = self.dataset_exists(bigquery_project=bigquery_project, bigquery_dataset=bigquery_dataset)

        if not dataset_exists:
            success, dataset_id = self.create_dataset(project=bigquery_project, dataset=bigquery_dataset)

            if not success:
                print('Error creating tag_history dataset')
                return False
        else:
            dataset_id = DatasetReference(project=bigquery_project, dataset_id=bigquery_dataset)

        exists, table_id, settings = self.history_table_exists(table_name)
        
        if not exists:
            table_id = self.create_history_table(dataset_id, table_name, table_fields)
            if not table_id:
                print("Error creating history table.")
                return False  # Stop if table creation fails

        for column_fields in column_fields_list: 
            
            column = column_fields['column']
            fields = column_fields['fields']
            
            if column and column != "" and "/column/" not in asset_table:
                asset_name = ("{}/column/{}".format(asset_table, column))
            else:
                success = False
                print("Error: could not find the tagged column in column_fields_list, therefore skipping tag history.")
                return success
            
            asset_name = asset_name.replace("/datasets/", "/dataset/").replace("/tables/", "/table/")
            #print('asset_name: ', asset_name)
                
            row = {'event_time': datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f') + ' UTC', 'asset_name': asset_name, 
                   'tag_creator_account': tag_creator_account, 'tag_invoker_account': tag_invoker_account, 'job_uuid': job_uuid}
            
            for field in fields: 
                field_id = field['field_id']
                field_value = field['field_value']
                
                if isinstance(field_value, decimal.Decimal):
                    row[field_id] = float(field_value)
                elif isinstance(field_value, datetime.datetime) or isinstance(field_value, datetime.date):
                    row[field_id] = field_value.isoformat()
                else:
                    row[field_id]= json.dumps(field_value, default=str)
            
            #print('rows_to_insert:', row)
            rows_to_insert.append(row)
            
                   
        success = self.load_history_rows(tag_creator_account, tag_invoker_account, table_id, rows_to_insert, job_uuid)  
        
        return success