in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def create_update_delete_tag(self, fields, tag_exists, tag_id, job_uuid, config_uuid, config_type, tag_history, entry, uri, column_name=''):
op_status = constants.SUCCESS
valid_field = False
num_fields = len(fields)
num_empty_values = 0
tag = datacatalog.Tag()
tag.template = self.template_path
for field in fields:
if 'name' in field:
valid_field = True
field_id = field['name']
field_type = field['type']
field_value = field['value']
# rename the keys, which will be used by tag history
if tag_history:
field['field_id'] = field['name']
field['field_type'] = field['type']
field['field_value'] = field['value']
del field['name']
del field['type']
del field['value']
elif 'field_id' in field:
valid_field = True
field_id = field['field_id']
field_type = field['field_type'].upper()
field_value = field['field_value']
else:
# export file contains invalid tags (e.g. a tagged field without a name)
continue
# keep track of empty values
if field_value == '':
num_empty_values += 1
if field_type == 'BOOL':
bool_field = datacatalog.TagField()
if isinstance(field_value, str):
if field_value == 'TRUE':
bool_field.bool_value = True
else:
bool_field.bool_value = False
else:
bool_field.bool_value = field_value
tag.fields[field_id] = bool_field
if field_type == 'STRING':
string_field = datacatalog.TagField()
string_field.string_value = str(field_value)
tag.fields[field_id] = string_field
if field_type == 'DOUBLE':
float_field = datacatalog.TagField()
float_field.double_value = float(field_value)
tag.fields[field_id] = float_field
if field_type == 'RICHTEXT':
richtext_field = datacatalog.TagField()
richtext_field.richtext_value = field_value.replace(',', '<br>')
tag.fields[field_id] = richtext_field
# For richtext values, replace '<br>' with ',' when exporting to BQ
field['field_value'] = field_value.replace('<br>', ', ')
if field_type == 'ENUM':
enum_field = datacatalog.TagField()
enum_field.enum_value.display_name = field_value
tag.fields[field_id] = enum_field
if field_type == 'DATETIME' or field_type == 'TIMESTAMP':
# field_value may be empty or date value e.g. "2022-05-08" or datetime value e.g. "2022-05-08 15:00:00"
if field_value == '':
timestamp = ''
else:
if len(field_value) == 10:
d = date(int(field_value[0:4]), int(field_value[5:7]), int(field_value[8:10]))
dt = datetime.combine(d, dtime(00, 00)) # when no time is supplied, default to 12:00:00 AM UTC
else:
# raw timestamp format: 2022-05-11 21:18:20
d = date(int(field_value[0:4]), int(field_value[5:7]), int(field_value[8:10]))
t = dtime(int(field_value[11:13]), int(field_value[14:16]))
dt = datetime.combine(d, t)
utc = pytz.timezone('UTC')
timestamp = utc.localize(dt)
datetime_field = datacatalog.TagField()
datetime_field.timestamp_value = timestamp
tag.fields[field_id] = datetime_field
field['field_value'] = timestamp # store this value back in the field, so it can be recorded in tag history
# exported file from DataCatalog can have invalid tags, skip tag creation if that's the case
if valid_field == False:
msg = f"Invalid field {field}"
log_error(msg, error='', job_uuid=job_uuid)
op_status = constants.ERROR
return op_status
if column_name != '':
tag.column = column_name
if tag_exists == True:
tag.name = tag_id
# delete tag if every field in it is empty
if num_fields == num_empty_values:
op_status = self.do_create_update_delete_action(job_uuid, 'delete', tag)
else:
op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
else:
# create the table only if it has at least one non-empty fields
if num_fields != num_empty_values:
op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
# only write to tag history if the operation was successful
if tag_history and op_status == constants.SUCCESS:
bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
template_fields = self.get_template()
success = bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, column_name, fields)
if success == False:
msg = 'Error occurred while writing to tag history table'
log_error(msg, error='', job_uuid=job_uuid)
op_status = constants.ERROR
return op_status