in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def update_tag_subset(self, template_id, template_project, template_region, entry_name, changed_fields):
success = True
tag_list = self.client.list_tags(parent=entry_name, timeout=120)
for tag in tag_list:
print('tag.template:', tag.template)
# get tag template fields
tagged_template_id = tag.template.split('/')[5]
tagged_template_project = tag.template.split('/')[1]
tagged_template_region = tag.template.split('/')[3]
if tagged_template_id != template_id:
continue
if tagged_template_project != template_project:
continue
if tagged_template_region != template_region:
continue
# start a new target tag to overwrite the existing one
target_tag = datacatalog.Tag()
target_tag.template = tag.template
target_tag.name = tag.name
self.template_path = tag.template
template_fields = self.get_template()
for template_field in template_fields:
#print('template_field:', template_field)
field_id = template_field['field_id']
# skip this field if it's not in the tag
if field_id not in tag.fields:
continue
tagged_field = tag.fields[field_id]
if tagged_field.bool_value:
field_type = 'bool'
field_value = str(tagged_field.bool_value)
if tagged_field.double_value:
field_type = 'double'
field_value = str(tagged_field.double_value)
if tagged_field.string_value:
field_type = 'string'
field_value = tagged_field.string_value
if tagged_field.enum_value:
field_type = 'enum'
field_value = str(tagged_field.enum_value.display_name)
if tagged_field.timestamp_value:
field_type = 'timestamp'
field_value = str(tagged_field.timestamp_value)
print('orig timestamp:', field_value)
if tagged_field.richtext_value:
field_type = 'richtext'
field_value = str(tagged_field.richtext_value)
# overwrite logic
for changed_field in changed_fields:
if changed_field['field_id'] == field_id:
field_value = changed_field['field_value']
break
target_tag, error_exists = self.populate_tag_field(target_tag, field_id, field_type, [field_value], None)
if error_exists:
msg = 'Error while populating the tag field. Aborting tag update.'
error = {'msg': msg}
print(json.dumps(error))
success = False
return success
# update the tag
try:
print('tag update request: ', target_tag)
response = self.client.update_tag(tag=target_tag)
except Exception as e:
success = False
msg = 'Error occurred during tag update: {}'.format(tag)
log_error(msg, e)
return success