in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def copy_tags(self, source_project, source_dataset, source_table, target_project, target_dataset, target_table, include_policy_tags=False):
success = True
# lookup the source entry
linked_resource = '//bigquery.googleapis.com/projects/{0}/datasets/{1}/tables/{2}'.format(source_project, source_dataset, source_table)
request = datacatalog.LookupEntryRequest()
request.linked_resource = linked_resource
source_entry = self.client.lookup_entry(request)
if source_entry.bigquery_table_spec.table_source_type != types.TableSourceType.BIGQUERY_TABLE:
success = False
msg = 'Error {} is not a BQ table'.format(source_table)
log_info(msg, None)
print(json.dumps(msg))
return success
# lookup the target entry
linked_resource = '//bigquery.googleapis.com/projects/{0}/datasets/{1}/tables/{2}'.format(target_project, target_dataset, target_table)
request = datacatalog.LookupEntryRequest()
request.linked_resource = linked_resource
target_entry = self.client.lookup_entry(request)
if target_entry.bigquery_table_spec.table_source_type != types.TableSourceType.BIGQUERY_TABLE:
success = False
msg = 'Error {} is not a BQ table'.format(target_table)
log_info(msg, None)
print(json.dumps(error))
return success
# look to see if the source table is tagged
tag_list = self.client.list_tags(parent=source_entry.name, timeout=120)
for source_tag in tag_list:
print('source_tag.template:', source_tag.template)
print('source_tag.column:', source_tag.column)
# get tag template fields
self.template_id = source_tag.template.split('/')[5]
self.template_project = source_tag.template.split('/')[1]
self.template_region = source_tag.template.split('/')[3]
self.template_path = source_tag.template
template_fields = self.get_template()
# start a new target tag
target_tag = datacatalog.Tag()
target_tag.template = source_tag.template
if source_tag.column:
target_tag.column = source_tag.column
for template_field in template_fields:
#print('template_field:', template_field)
if template_field['field_id'] in source_tag.fields:
field_id = template_field['field_id']
tagged_field = source_tag.fields[field_id]
print('field_id:', field_id)
if tagged_field.bool_value:
field_type = 'bool'
field_value = tagged_field.bool_value
if tagged_field.double_value:
field_type = 'double'
field_value = tagged_field.double_value
if tagged_field.string_value:
field_type = 'string'
field_value = tagged_field.string_value
if tagged_field.enum_value:
field_type = 'enum'
field_value = tagged_field.enum_value.display_name
if tagged_field.timestamp_value:
field_type = 'timestamp'
field_value = tagged_field.timestamp_value
if tagged_field.richtext_value:
field_type = 'richtext'
field_value = tagged_field.richtext_value
target_tag, error_exists = self.populate_tag_field(target_tag, field_id, field_type, [field_value], None)
# create the target tag
tag_exists, tag_id = self.check_if_tag_exists(parent=target_entry.name, column=source_tag.column)
if tag_exists == True:
target_tag.name = tag_id
try:
print('tag update request: ', target_tag)
response = self.client.update_tag(tag=target_tag)
except Exception as e:
success = False
msg = 'Error occurred during tag update: {}'.format(target_tag)
log_error(msg, e)
else:
try:
print('tag create request: ', target_tag)
response = self.client.create_tag(parent=target_entry.name, tag=target_tag)
except Exception as e:
success = False
msg = 'Error occurred during tag create: {}'.format(target_tag)
log_error(msg, e)
# copy policy tags
success = self.copy_policy_tags(source_project, source_dataset, source_table, target_project, target_dataset, target_table)
return success