in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/BigQueryUtils.py [0:0]
def copy_tags(self, tag_creator_account, tag_invoker_account, job_uuid, table_name, table_fields, asset_table, column_fields_list):
print('enter BigQueryUtils.copy_tags')
print('asset_table:', asset_table)
print('column_fields_list:', column_fields_list)
store = tesh.TagEngineStoreHandler()
enabled, settings = store.read_tag_history_settings()
bigquery_project = settings['bigquery_project']
bigquery_dataset = settings['bigquery_dataset']
rows_to_insert = []
success = True
dataset_exists = self.dataset_exists(bigquery_project=bigquery_project, bigquery_dataset=bigquery_dataset)
if not dataset_exists:
success, dataset_id = self.create_dataset(project=bigquery_project, dataset=bigquery_dataset)
if not success:
print('Error creating tag_history dataset')
return False
else:
dataset_id = DatasetReference(project=bigquery_project, dataset_id=bigquery_dataset)
exists, table_id, settings = self.history_table_exists(table_name)
if not exists:
table_id = self.create_history_table(dataset_id, table_name, table_fields)
if not table_id:
print("Error creating history table.")
return False # Stop if table creation fails
for column_fields in column_fields_list:
column = column_fields['column']
fields = column_fields['fields']
if column and column != "" and "/column/" not in asset_table:
asset_name = ("{}/column/{}".format(asset_table, column))
else:
success = False
print("Error: could not find the tagged column in column_fields_list, therefore skipping tag history.")
return success
asset_name = asset_name.replace("/datasets/", "/dataset/").replace("/tables/", "/table/")
#print('asset_name: ', asset_name)
row = {'event_time': datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f') + ' UTC', 'asset_name': asset_name,
'tag_creator_account': tag_creator_account, 'tag_invoker_account': tag_invoker_account, 'job_uuid': job_uuid}
for field in fields:
field_id = field['field_id']
field_value = field['field_value']
if isinstance(field_value, decimal.Decimal):
row[field_id] = float(field_value)
elif isinstance(field_value, datetime.datetime) or isinstance(field_value, datetime.date):
row[field_id] = field_value.isoformat()
else:
row[field_id]= json.dumps(field_value, default=str)
#print('rows_to_insert:', row)
rows_to_insert.append(row)
success = self.load_history_rows(tag_creator_account, tag_invoker_account, table_id, rows_to_insert, job_uuid)
return success