in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def apply_export_config(self, config_uuid, target_project, target_dataset, target_region, uri):
column_tag_records = []
table_tag_records = []
dataset_tag_records = []
export_status = constants.SUCCESS
bqu = bq.BigQueryUtils(self.credentials, target_region)
if isinstance(uri, str) == False:
print('Error: url ' + str(url) + ' is not of type string.')
export_status = constants.ERROR
return export_status
tagged_project = uri.split('/')[0]
tagged_dataset = uri.split('/')[2]
if '/tables/' in uri:
target_table_id = 'catalog_report_table_tags'
tagged_table = uri.split('/')[4]
else:
target_table_id = 'catalog_report_dataset_tags'
tagged_table = None
bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
#print("bigquery_resource: ", bigquery_resource)
request = datacatalog.LookupEntryRequest()
request.linked_resource=bigquery_resource
try:
entry = self.client.lookup_entry(request)
except Exception as e:
msg = 'Error looking up entry {} in catalog'.format(bigquery_resource)
log_error(msg, e, job_uuid)
export_status = constants.ERROR
return export_status
tag_list = self.client.list_tags(parent=entry.name, timeout=120)
for tag in tag_list:
print('tag.template:', tag.template)
print('tag.column:', tag.column)
# get tag template fields
self.template_id = tag.template.split('/')[5]
self.template_project = tag.template.split('/')[1]
self.template_region = tag.template.split('/')[3]
self.template_path = tag.template
template_fields = self.get_template()
if tag.column and len(tag.column) > 1:
tagged_column = tag.column
target_table_id = 'catalog_report_column_tags'
else:
tagged_column = None
target_table_id = 'catalog_report_table_tags'
for template_field in template_fields:
#print('template_field:', template_field)
field_id = template_field['field_id']
if field_id not in tag.fields:
continue
tagged_field = tag.fields[field_id]
tagged_field_str = str(tagged_field)
tagged_field_split = tagged_field_str.split('\n')
#print('tagged_field_split:', tagged_field_split)
split_index = 0
for split in tagged_field_split:
if '_value:' in split:
start_index = split.index(':', 0) + 1
#print('start_index:', start_index)
field_value = split[start_index:].strip().replace('"', '').replace('<br>', ',')
print('extracted field_value:', field_value)
break
elif 'enum_value' in split:
field_value = tagged_field_split[split_index+1].replace('display_name:', '').replace('"', '').strip()
print('extracted field_value:', field_value)
break
split_index += 1
# format record to be written
current_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
if target_table_id in 'catalog_report_column_tags':
column_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "table": tagged_table, "column": tagged_column, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts})
elif target_table_id in 'catalog_report_table_tags':
table_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "table": tagged_table, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts})
elif target_table_id in 'catalog_report_dataset_tags':
dataset_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts})
# write exported records to BQ
if len(dataset_tag_records) > 0:
target_table_id = target_project + '.' + target_dataset + '.catalog_report_dataset_tags'
success = bqu.insert_exported_records(target_table_id, dataset_tag_records)
if len(table_tag_records) > 0:
target_table_id = target_project + '.' + target_dataset + '.catalog_report_table_tags'
success = bqu.insert_exported_records(target_table_id, table_tag_records)
if len(column_tag_records) > 0:
target_table_id = target_project + '.' + target_dataset + '.catalog_report_column_tags'
success = bqu.insert_exported_records(target_table_id, column_tag_records)
return export_status