in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def apply_dynamic_table_config(self, fields, uri, job_uuid, config_uuid, template_uuid, tag_history, batch_mode=False):
print('*** apply_dynamic_table_config ***')
op_status = constants.SUCCESS
error_exists = False
bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
#print('bigquery_resource: ', bigquery_resource)
request = datacatalog.LookupEntryRequest()
request.linked_resource=bigquery_resource
entry = self.client.lookup_entry(request)
tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name)
print("tag_exists: " + str(tag_exists))
# create new tag
tag = datacatalog.Tag()
tag.template = self.template_path
verified_field_count = 0
for field in fields:
field_id = field['field_id']
field_type = field['field_type']
query_expression = field['query_expression']
# parse and run query in BQ
query_str = self.parse_query_expression(uri, query_expression)
print('returned query_str: ' + query_str)
# note: field_values is of type list
field_values, error_exists = self.run_query(query_str, field_type, batch_mode, job_uuid)
print('field_values: ', field_values)
print('error_exists: ', error_exists)
if error_exists or field_values == []:
continue
tag, error_exists = self.populate_tag_field(tag, field_id, field_type, field_values, job_uuid)
if error_exists:
continue
verified_field_count = verified_field_count + 1
#print('verified_field_count: ' + str(verified_field_count))
# store the value back in the dict, so that it can be accessed by the exporter
#print('field_value: ' + str(field_value))
if field_type == 'richtext':
formatted_value = ', '.join(str(v) for v in field_values)
else:
formatted_value = field_values[0]
field['field_value'] = formatted_value
# for loop ends here
if error_exists:
# error was encountered while running SQL expression
# proceed with tag creation / update, but return error to user
op_status = constants.ERROR
if verified_field_count == 0:
# tag is empty due to errors, skip tag creation
op_status = constants.ERROR
return op_status
if tag_exists == True:
tag.name = tag_id
op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
else:
op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
if op_status == constants.SUCCESS and tag_history:
bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
template_fields = self.get_template()
bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, None, fields)
return op_status