in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def apply_dynamic_column_config(self, fields, columns_query, uri, job_uuid, config_uuid, template_uuid, tag_history, batch_mode=False):
print('*** apply_dynamic_column_config ***')
tag_work_queue = [] # collection of Tag objects that will be passed to the API to be created or updated
op_status = constants.SUCCESS
error_exists = False
target_columns = [] # columns in the table which need to be tagged
columns_query = self.parse_query_expression(uri, columns_query)
print('columns_query:', columns_query)
rows = self.bq_client.query(columns_query).result()
num_columns = 0
for row in rows:
for column in row:
print('column:', column)
target_columns.append(column)
num_columns += 1
if num_columns == 0:
# no columns to tag
msg = f"Error could not find columns to tag. Please check column_query parameter in your config. Current value: {columns_query}"
log_error(msg, None, job_uuid)
op_status = constants.ERROR
return op_status
#print('columns to be tagged:', target_columns)
bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
#print('bigquery_resource: ', bigquery_resource)
request = datacatalog.LookupEntryRequest()
request.linked_resource=bigquery_resource
entry = self.client.lookup_entry(request)
column_fields_list = [] # list<dictionaries> where dict = {column, fields}
for target_column in target_columns:
#print('target_column:', target_column)
# fail quickly if a column is not found in the entry's schema
column_exists = self.column_exists_in_table(target_column, entry.schema.columns)
if column_exists != True:
msg = f"Error could not find column {target_column} in {resource}"
log_error(msg, None, job_uuid)
op_status = constants.ERROR
return op_status
# initialize the new column-level tag
tag = datacatalog.Tag()
tag.template = self.template_path
tag.column = target_column
verified_field_count = 0
query_strings = []
for field in fields:
query_expression = field['query_expression']
query_str = self.parse_query_expression(uri, query_expression, target_column)
query_strings.append(query_str)
# combine query expressions
combined_query = self.combine_queries(query_strings)
# run combined query, adding the results to the field_values for each field
# Note: field_values is of type list
fields, error_exists = self.run_combined_query(combined_query, target_column, fields, job_uuid)
if error_exists:
op_status = constants.ERROR
continue
# populate tag fields
tag, error_exists = self.populate_tag_fields(tag, fields, job_uuid)
if error_exists:
op_status = constants.ERROR
continue
column_fields_list.append({"column": target_column, "fields": fields})
tag_work_queue.append(tag)
# outer loop ends here
if len(tag_work_queue) == 0:
op_status = constants.ERROR
return op_status
# ready to create or update all the tags in work queue
rec_request = datacatalog.ReconcileTagsRequest(
parent=entry.name,
tag_template=self.template_path,
tags=tag_work_queue
)
#print('rec_request:', rec_request)
try:
operation = self.client.reconcile_tags(request=rec_request)
print("Waiting for operation to complete...")
resp = operation.result()
#print("resp:", resp)
except Exception as e:
msg = 'Error during reconcile_tags on entry {}'.format(entry.name)
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
if tag_history and op_status != constants.ERROR:
bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
success = bqu.copy_tags(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, self.get_template(), uri, column_fields_list)
print('Tag history completed successfully:', success)
if success:
op_status = constants.SUCCESS
else:
op_status = constants.ERROR
return op_status