in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def apply_import_config(self, job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history, overwrite=False):
print(f'apply_import_config: {job_uuid}, {config_uuid}, {data_asset_type}, {data_asset_region}, {tag_dict}, {tag_history}')
op_status = constants.SUCCESS
if 'project' in tag_dict:
project = tag_dict['project']
else:
msg = "Error: project info missing from CSV"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
if data_asset_type == constants.BQ_ASSET:
if 'dataset' not in tag_dict:
msg = "Error: could not find the required dataset field in the CSV"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
else:
entry_type = constants.DATASET
dataset = tag_dict['dataset']
if 'table' in tag_dict:
table = tag_dict['table']
entry_type = constants.BQ_TABLE
if data_asset_type == constants.FILESET_ASSET:
if 'entry_group' not in tag_dict or 'fileset' not in tag_dict:
msg = "Error: could not find the required fields in the CSV. Missing entry_group or fileset or both"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
else:
entry_type = constants.FILESET
entry_group = tag_dict['entry_group']
fileset = tag_dict['fileset']
if data_asset_type == constants.SPAN_ASSET:
if 'instance' not in tag_dict or 'database' not in tag_dict or 'table' not in tag_dict:
msg = "Error: could not find the required fields in the CSV. The required fields for Spanner are instance, database, and table"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
else:
entry_type = constants.SPAN_TABLE
instance = tag_dict['instance']
database = tag_dict['database']
if 'schema' in tag_dict:
schema = tag_dict['schema']
table = tag_dict['table']
table = f"`{schema}.{table}`"
else:
table = tag_dict['table']
if entry_type == constants.DATASET:
resource = f'//bigquery.googleapis.com/projects/{project}/datasets/{dataset}'
request = datacatalog.LookupEntryRequest()
request.linked_resource=resource
if entry_type == constants.BQ_TABLE:
resource = f'//bigquery.googleapis.com/projects/{project}/datasets/{dataset}/tables/{table}'
request = datacatalog.LookupEntryRequest()
request.linked_resource=resource
if entry_type == constants.FILESET:
resource = f'//datacatalog.googleapis.com/projects/{project}/locations/{data_asset_region}/entryGroups/{entry_group}/entries/{fileset}'
request = datacatalog.LookupEntryRequest()
request.linked_resource=resource
if entry_type == constants.SPAN_TABLE:
resource = f'spanner:{project}.regional-{data_asset_region}.{instance}.{database}.{table}'
request = datacatalog.LookupEntryRequest()
request.fully_qualified_name=resource
request.project=project
request.location=data_asset_region
try:
entry = self.client.lookup_entry(request)
except Exception as e:
msg = "Error could not find {} entry for {}".format(entry_type, resource)
log_error_tag_dict(msg, e, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
# format uri for storing in tag history table
if data_asset_type == constants.BQ_ASSET:
uri = entry.linked_resource.replace('//bigquery.googleapis.com/projects/', '')
if data_asset_type == constants.SPAN_ASSET:
uri = entry.linked_resource.replace('///projects/', '').replace('instances', 'instance').replace('databases', 'database') + '/table/' + table.replace('`', '')
if data_asset_type == constants.FILESET_ASSET:
uri = entry.linked_resource.replace('//datacatalog.googleapis.com/projects/', '').replace('locations', 'location').replace('entryGroups', 'entry_group').replace('entries', 'entry')
target_column = None
if 'column' in tag_dict:
target_column = tag_dict['column']
column_exists = self.column_exists_in_table(target_column, entry.schema.columns)
if column_exists == False:
msg = f"Error could not find column {target_column} in {resource}"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
uri = uri + '/column/' + target_column
try:
tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=target_column)
except Exception as e:
msg = f"Error during check_if_tag_exists: {entry.name}"
log_error_tag_dict(msg, e, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
if tag_exists and overwrite == False:
msg = "Info: Tag already exists and overwrite flag is False"
log_info_tag_dict(msg, job_uuid, tag_dict)
op_status = constants.SUCCESS
return op_status
tag_fields = []
template_fields = self.get_template()
for field_name in tag_dict:
if field_name == 'project' or field_name == 'dataset' or field_name == 'table' or \
field_name == 'column' or field_name == 'entry_group' or field_name == 'fileset' or \
field_name == 'instance' or field_name == 'database' or field_name == 'schema':
continue
field_type = None
field_value = tag_dict[field_name].strip()
for template_field in template_fields:
if template_field['field_id'] == field_name:
field_type = template_field['field_type']
break
if field_type == None:
print('Error while preparing the tag. The field ', field_name, ' was not found in the tag template ', self.template_id)
op_status = constants.ERROR
return op_status
# this check allows for tags with empty enums to get created, otherwise the empty enum gets flagged because DC thinks that you are storing an empty string as the enum value
if field_type == 'enum' and field_value == '':
continue
field = {'field_id': field_name, 'field_type': field_type, 'field_value': field_value}
tag_fields.append(field)
op_status = self.create_update_delete_tag(tag_fields, tag_exists, tag_id, job_uuid, config_uuid, 'IMPORT_TAG', tag_history, \
entry, uri, target_column)
return op_status