in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def apply_sensitive_column_config(self, fields, dlp_dataset, infotype_selection_table, infotype_classification_table, \
uri, create_policy_tags, taxonomy_id, job_uuid, config_uuid, template_uuid, \
tag_history, overwrite=False):
if create_policy_tags:
request = datacatalog.ListPolicyTagsRequest(
parent=taxonomy_id
)
try:
page_result = self.ptm_client.list_policy_tags(request=request)
except Exception as e:
msg = 'Unable to retrieve the policy tag taxonomy for taxonomy_id {}'.format(taxonomy_id)
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
policy_tag_names = [] # list of fully qualified policy tag names and sensitive categories
for response in page_result:
policy_tag_names.append((response.name, response.display_name))
policy_tag_requests = [] # stores the list of fully qualified policy tag names and table column names,
# so that we can create the policy tags on the various sensitive fields
# uri is a BQ table path
op_status = constants.SUCCESS
column = ''
if isinstance(uri, str) == False:
print('Error: url ' + str(url) + ' is not of type string.')
op_status = constants.ERROR
return op_status
bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
#print("bigquery_resource: ", bigquery_resource)
request = datacatalog.LookupEntryRequest()
request.linked_resource=bigquery_resource
try:
entry = self.client.lookup_entry(request)
except Exception as e:
msg = 'Error looking up entry {} in the catalog: {}'.format(bigquery_resource, e)
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
dlp_dataset = dlp_dataset.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')
infotype_selection_table = infotype_selection_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')
infotype_classification_table = infotype_classification_table.replace('bigquery/project/', '').replace('/dataset/', '.').replace('/', '.')
dlp_table = dlp_dataset + '.' + uri.split('/')[4]
infotype_fields = []
notable_infotypes = []
# get an array of infotypes associated with each field in the DLP findings table
dlp_sql = 'select field, array_agg(infotype) infotypes '
dlp_sql += 'from (select distinct cl.record_location.field_id.name as field, info_type.name as infotype '
dlp_sql += 'from `' + dlp_table + '`, unnest(location.content_locations) as cl '
dlp_sql += 'order by cl.record_location.field_id.name) '
dlp_sql += 'group by field'
try:
dlp_rows = self.bq_client.query(dlp_sql).result()
except Exception as e:
msg = 'Error querying DLP findings table: {}'.format(dlp_sql)
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
dlp_row_count = 0
for dlp_row in dlp_rows:
dlp_row_count += 1
field = dlp_row['field']
infotype_fields.append(field)
infotypes = dlp_row['infotypes']
print('field ', field, ', infotypes [', infotypes, ']')
is_sql = 'select notable_infotype '
is_sql += 'from `' + infotype_selection_table + '` i, '
infotype_count = len(infotypes)
for i in range(0, infotype_count):
is_sql += 'unnest(i.field_infotypes) as i' + str(i) + ', '
is_sql = is_sql[:-2] + ' '
for i, infotype in enumerate(infotypes):
if i == 0:
is_sql += 'where i' + str(i) + ' = "' + infotype + '" '
else:
is_sql += 'and i' + str(i) + ' = "' + infotype + '" '
is_sql += 'order by array_length(i.field_infotypes) '
is_sql += 'limit 1'
#print('is_sql: ', is_sql)
try:
ni_rows = self.bq_client.query(is_sql).result()
except Exception as e:
msg = 'Error querying infotype selection table: {}'.format(is_sql)
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
for ni_row in ni_rows:
notable_infotypes.append(ni_row['notable_infotype']) # there should be just one notable infotype per field
# there are no DLP findings
if dlp_row_count == 0:
op_status = constants.SUCCESS
return op_status
# remove duplicate infotypes from notable list
final_set = list(set(notable_infotypes))
print('final_set: ', final_set)
# lookup classification using set of notable infotypes
c_sql = 'select classification_result '
c_sql += 'from `' + infotype_classification_table + '` c, '
for i in range(0, len(final_set)):
c_sql += 'unnest(c.notable_infotypes) as c' + str(i) + ', '
c_sql = c_sql[:-2] + ' '
for i, notable_infotype in enumerate(final_set):
if i == 0:
c_sql += 'where c' + str(i) + ' = "' + notable_infotype + '" '
else:
c_sql += 'and c' + str(i) + ' = "' + notable_infotype + '" '
c_sql += 'order by array_length(c.notable_infotypes) '
c_sql += 'limit 1'
#print('c_sql: ', c_sql)
try:
c_rows = self.bq_client.query(c_sql).result()
except Exception as e:
msg = 'Error querying infotype classification table: {}'.format(c_sql)
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
classification_result = None
for c_row in c_rows:
classification_result = c_row['classification_result'] # we should end up with one classification result per table
print('classification_result: ', classification_result)
tag = datacatalog.Tag()
tag.template = self.template_path
# each element represents a field which needs to be tagged
for infotype_field in infotype_fields:
for field in fields:
if 'sensitive_field' in field['field_id']:
bool_field = datacatalog.TagField()
if classification_result == 'Public_Information':
bool_field.bool_value = False
field['field_value'] = False
else:
bool_field.bool_value = True
field['field_value'] = True
tag.fields['sensitive_field'] = bool_field
if 'sensitive_type' in field['field_id']:
enum_field = datacatalog.TagField()
enum_field.enum_value.display_name = classification_result
tag.fields['sensitive_type'] = enum_field
field['field_value'] = classification_result
tag.column = infotype_field # DLP has a bug and sometimes the infotype field does not equal to the column name in the table
print('tag.column: ', infotype_field)
# check if a tag already exists on this column
try:
tag_exists, tag_id = self.check_if_tag_exists(parent=entry.name, column=infotype_field)
except Exception as e:
msg = 'Error during check_if_tag_exists: {}'.format(entry.name)
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
# tag already exists
if tag_exists:
if overwrite == False:
# skip this sensitive column because it is already tagged
continue
tag.name = tag_id
op_status = self.do_create_update_delete_action(job_uuid, 'update', tag)
else:
op_status = self.do_create_update_delete_action(job_uuid, 'create', tag, entry)
if op_status == constants.SUCCESS and create_policy_tags and classification_result != 'Public_Information':
# add the column name and policy tag name to a list
for policy_tag_name, policy_tag_category in policy_tag_names:
if policy_tag_category == classification_result:
policy_tag_requests.append((infotype_field, policy_tag_name))
if op_status == constants.SUCCESS and tag_history:
bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
template_fields = self.get_template()
bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, uri, infotype_field, fields)
# once we have created the regular tags, we can create/update the policy tags
if create_policy_tags and len(policy_tag_requests) > 0:
table_id = uri.replace('/datasets/', '.').replace('/tables/', '.')
op_status = self.apply_policy_tags(table_id, policy_tag_requests)
if op_status != constants.SUCCESS:
msg = 'Error occurred when tagging {}'.format(uri)
error = {'job_uuid': job_uuid, 'msg': msg}
print(json.dumps(error))
return op_status