in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def populate_tag_field(self, tag, field_id, field_type, field_values, job_uuid=None):
error_exists = False
# handle richtext types
if type(field_values) == list:
field_value = field_values[0]
else:
field_value = field_values
if field_values == None:
print('Cannot store null value in tag field', field_id)
return tag, error_exists
try:
if field_type == "bool":
bool_field = datacatalog.TagField()
bool_field.bool_value = bool(field_value)
tag.fields[field_id] = bool_field
if field_type == "string":
string_field = datacatalog.TagField()
string_field.string_value = str(field_value)
tag.fields[field_id] = string_field
if field_type == "richtext":
richtext_field = datacatalog.TagField()
formatted_value = '<br>'.join(str(v) for v in field_values)
richtext_field.richtext_value = str(formatted_value)
tag.fields[field_id] = richtext_field
if field_type == "double":
float_field = datacatalog.TagField()
float_field.double_value = float(field_value)
tag.fields[field_id] = float_field
if field_type == "enum":
enum_field = datacatalog.TagField()
enum_field.enum_value.display_name = field_value
tag.fields[field_id] = enum_field
if field_type == "datetime" or field_type == "timestamp":
# expected format for datetime values in DC: 2020-12-02T16:34:14Z
# however, field_value can be a date value e.g. "2022-05-08", a datetime value e.g. "2022-05-08 15:00:00"
# or timestamp value e.g. datetime.datetime(2022, 9, 14, 18, 24, 31, 615000, tzinfo=datetime.timezone.utc)
field_value = field_value
#print('field_value:', field_value)
#print('field_value type:', type(field_value))
# we have a datetime value
# example: 2024-03-30 18:29:48.621617+00:00
if type(field_value) == datetime:
timestamp = Timestamp()
timestamp.FromDatetime(field_value)
# we have a date value
elif type(field_value) == date:
dt = datetime.combine(field_value, datetime.min.time())
timestamp = pytz.utc.localize(dt)
# we have a date cast as a string
elif len(str(field_value)) == 10:
utc = pytz.timezone('UTC')
d = date(int(field_value[0:4]), int(field_value[5:7]), int(field_value[8:10]))
dt = datetime.combine(d, dtime(00, 00)) # when no time is supplied, default to 12:00:00 AM UTC
timestamp = utc.localize(dt)
# we have a timestamp with this format: '2022-12-05 15:05:26'
elif len(str(field_value)) == 19:
year = int(field_value[0:4])
month = int(field_value[5:7])
day = int(field_value[8:10])
hour = int(field_value[11:13])
minute = int(field_value[14:16])
second = int(field_value[17:19])
dt = datetime(year, month, day, hour, minute, second)
timestamp = pytz.utc.localize(dt)
# we have a timestamp cast as a string
else:
timestamp_value = field_value.isoformat()
field_value = timestamp_value[0:19] + timestamp_value[26:32] + "Z"
timestamp = Timestamp()
timestamp.FromJsonString(field_value[0])
#print('timestamp:', timestamp)
datetime_field = datacatalog.TagField()
datetime_field.timestamp_value = timestamp
tag.fields[field_id] = datetime_field
except Exception as e:
error_exists = True
msg = "Error storing values {} into field {}".format(field_values, field_id)
log_error(msg, e, job_uuid)
return tag, error_exists