in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/BigQueryUtils.py [0:0]
def copy_tag(self, tag_creator_account, tag_invoker_account, job_uuid, table_name, table_fields, tagged_table, tagged_column, tagged_values):
store = tesh.TagEngineStoreHandler()
enabled, settings = store.read_tag_history_settings()
bigquery_project = settings['bigquery_project']
bigquery_dataset = settings['bigquery_dataset']
dataset_exists = self.dataset_exists(bigquery_project=bigquery_project, bigquery_dataset=bigquery_dataset)
if not dataset_exists:
success, dataset_id = self.create_dataset(project=bigquery_project, dataset=bigquery_dataset)
if not success:
print('Error creating tag_history dataset')
return False
else:
dataset_id = DatasetReference(project=bigquery_project, dataset_id=bigquery_dataset)
exists, table_id, settings = self.history_table_exists(table_name)
if not exists:
table_id = self.create_history_table(dataset_id, table_name, table_fields)
if not table_id:
print("Error creating history table.")
return False # Stop if table creation fails
if tagged_column and tagged_column != "" and "/column/" not in tagged_table:
asset_name = ("{}/column/{}".format(tagged_table, tagged_column))
else:
asset_name = tagged_table
asset_name = asset_name.replace("/datasets/", "/dataset/").replace("/tables/", "/table/")
#print('asset_name: ', asset_name)
success = self.insert_history_row(tag_creator_account, tag_invoker_account, job_uuid, table_id, asset_name, tagged_values)
return success