in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def apply_glossary_asset_config(self, fields, mapping_table, uri, job_uuid, config_uuid, template_uuid, tag_history, overwrite=False):
# uri is either a BQ table/view path or GCS file path
op_status = constants.SUCCESS
is_gcs = False
is_bq = False
# look up the entry based on the resource type
if isinstance(uri, list):
is_gcs = True
bucket = uri[0].replace('-', '_')
filename = uri[1].split('.')[0].replace('/', '_') # extract the filename without the extension, replace '/' with '_'
gcs_resource = '//datacatalog.googleapis.com/projects/' + self.template_project + '/locations/' + self.template_region + '/entryGroups/' + bucket + '/entries/' + filename
#print('gcs_resource: ', gcs_resource)
request = datacatalog.LookupEntryRequest()
request.linked_resource=gcs_resource
try:
entry = self.client.lookup_entry(request)
print('entry: ', entry.name)
except Exception as e:
msg = 'Unable to find entry in the catalog. Entry {} does not exist: {}'.format(gcs_resource, e)
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
#print('entry found: ', entry)
elif isinstance(uri, str):
is_bq = True
bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
print("bigquery_resource: " + bigquery_resource)
request = datacatalog.LookupEntryRequest()
request.linked_resource=bigquery_resource
entry = self.client.lookup_entry(request)
print('entry: ', entry.name)
try:
tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name)
print('tag_exists: ', tag_exists)
except Exception as e:
msg = 'Error during check_if_tag_exists: {}'.format(e)
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
if tag_exists and overwrite == False:
msg = 'Info: tag already exists and overwrite set to False'
error = {'job_uuid': job_uuid, 'msg': msg}
print(json.dumps(info))
op_status = constants.SUCCESS
return op_status
if entry.schema == None:
msg = 'Error entry {} does not have a schema in the catalog'.format(entry.name)
error = {'job_uuid': job_uuid, 'msg': msg}
print(json.dumps(info))
op_status = constants.ERROR
return op_status
# retrieve the schema columns from the entry
column_schema_str = ''
for column_schema in entry.schema.columns:
column_schema_str += "'" + column_schema.column + "',"
#print('column_schema_str: ', column_schema_str)
mapping_table_formatted = mapping_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')
query_str = 'select canonical_name from `' + mapping_table_formatted + '` where source_name in (' + column_schema_str[0:-1] + ')'
#print('query_str: ', query_str)
rows = self.bq_client.query(query_str).result()
tag = datacatalog.Tag()
tag.template = self.template_path
tag_is_empty = True
for row in rows:
canonical_name = row['canonical_name']
#print('canonical_name: ', canonical_name)
for field in fields:
if field['field_id'] == canonical_name:
#print('found match')
bool_field = datacatalog.TagField()
bool_field.bool_value = True
tag.fields[canonical_name] = bool_field
field['field_value'] = True
tag_is_empty = False
break
if tag_is_empty:
print("Error: can't create the tag because it's empty")
op_status = constants.ERROR
return op_status
if tag_exists:
# tag already exists and overwrite is True
tag.name = tag_id
op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
else:
op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
if tag_history:
bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
template_fields = self.get_template()
if is_gcs:
bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, '/'.join(uri), None, fields)
if is_bq:
bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, None, fields)
return op_status