DataplexController.py (561 lines of code) (raw):
# Copyright 2024-2025 Google, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests, configparser
from operator import itemgetter
import json
import os
from google.protobuf import struct_pb2
from google.protobuf import json_format
from google.api_core.gapic_v1.client_info import ClientInfo
from google.cloud import dataplex
from google.cloud.dataplex import CatalogServiceClient
from google.cloud import bigquery
import Resources as res
import BigQueryUtils as bq
import constants
from common import log_error, log_error_tag_dict, log_info, log_info_tag_dict
config = configparser.ConfigParser()
config.read("tagengine.ini")
BIGQUERY_REGION = config['DEFAULT']['BIGQUERY_REGION']
USER_AGENT = 'cloud-solutions/datacatalog-tag-engine-v3'
class DataplexController:
def __init__(self, credentials, tag_creator_account=None, tag_invoker_account=None, \
aspect_type_id=None, aspect_type_project=None, aspect_type_region=None):
self.credentials = credentials
self.tag_creator_account = tag_creator_account
self.tag_invoker_account = tag_invoker_account
self.aspect_type_id = aspect_type_id
self.aspect_type_project = aspect_type_project
self.aspect_type_region = aspect_type_region
self.client = CatalogServiceClient(credentials=self.credentials, client_info=ClientInfo(user_agent=USER_AGENT))
if aspect_type_id != None and aspect_type_project != None and aspect_type_region != None:
self.aspect_type_path = self.client.aspect_type_path(aspect_type_project, aspect_type_region, aspect_type_id)
else:
self.aspect_type_path = None
self.bq_client = bigquery.Client(credentials=self.credentials, location=BIGQUERY_REGION, client_info=ClientInfo(user_agent=USER_AGENT))
# note: included_fields can be populated or null
# null = we want to return all the fields from the template
def get_aspect_type(self, included_fields=None):
print('enter get_aspect_type()')
#print('included_fields:', included_fields)
aspect_fields = []
try:
aspect_type = self.client.get_aspect_type(name=self.aspect_type_path)
#print('aspect_type:', aspect_type)
except Exception as e:
msg = f'Error retrieving aspect type {self.aspect_type_path}'
log_error(msg, e)
return fields
record_fields = aspect_type.metadata_template.record_fields
#print('record_fields:', record_fields)
for field in record_fields:
match_found = False
if included_fields != None:
for included_field in included_fields:
if included_field['field_id'] == field.name:
print('found_match:', field.name)
match_found = True
if 'field_value' in included_field:
assigned_value = included_field['field_value']
else:
assigned_value = None
if 'query_expression' in included_field:
query_expression = included_field['query_expression']
else:
query_expression = None
break
if included_fields != None and match_found == False:
continue
enum_values = []
if field.type_ == "enum":
for enum_value in field.enum_values:
enum_values.append(enum_value.name)
# populate aspect_field dict
aspect_field = {}
aspect_field['field_id'] = field.name
aspect_field['field_type'] = field.type_
aspect_field['display_name'] = field.annotations.display_name
aspect_field['is_required'] = field.constraints.required
aspect_field['order'] = field.annotations.display_order
if aspect_field['field_type'] == "enum":
aspect_field['enum_values'] = enum_values
if included_fields:
if assigned_value:
aspect_field['field_value'] = assigned_value
if query_expression:
aspect_field['query_expression'] = query_expression
aspect_fields.append(aspect_field)
sorted_aspect_fields = sorted(aspect_fields, key=itemgetter('order'))
#print('sorted_aspect_fields:', sorted_aspect_fields)
return sorted_aspect_fields
def check_column_exists(self, aspects, target_column):
print('*** enter check_column_exists ***')
print('target_column:', target_column)
print('aspects:', aspects)
# figure out if the target column is nested
if len(target_column.split('.')) > 1:
is_nested_column = True
parent_column = target_column.split('.')[0]
child_column = target_column.split('.')[1]
else:
is_nested_column = False
column_exists = False
for aspect_id, aspect_payload in aspects.items():
if aspect_id.endswith('.global.schema'):
break
aspect_dict = json_format.MessageToDict(aspect_payload._pb)
for field in aspect_dict['data']['fields']:
if field['name'] == target_column:
column_exists = True
break
if is_nested_column and field['name'] == parent_column:
subfields = field['fields']
for subfield in subfields:
if subfield['name'] == child_column:
column_exists = True
break
return column_exists
def check_aspect_exists(self, aspect_type_path, aspects):
print('enter check_aspect_exists')
print('aspect_type_path:', aspect_type_path)
aspect_type_path_short = '.'.join(aspect_type_path.split('.')[1:])
aspect_exists = False
aspect_id = ""
for aspect_id, aspect_data in aspects.items():
if aspect_id.endswith(aspect_type_path_short):
aspect_exists = True
break
return aspect_exists, aspect_id
def apply_import_config(self, job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history, overwrite=False):
print("*** apply_import_config ***")
op_status = constants.SUCCESS
if 'project' in tag_dict:
project = tag_dict['project']
else:
msg = "Error: project info missing from CSV"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
if data_asset_type == constants.BQ_ASSET:
if 'dataset' not in tag_dict:
msg = "Error: could not find the required dataset field in the CSV"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
else:
entry_type = constants.DATASET
dataset = tag_dict['dataset']
if 'table' in tag_dict:
table = tag_dict['table']
entry_type = constants.BQ_TABLE
if data_asset_type == constants.FILESET_ASSET:
if 'entry_group' not in tag_dict or 'fileset' not in tag_dict:
msg = "Error: could not find the required fields in the CSV. Missing entry_group or fileset or both"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
else:
entry_type = constants.FILESET
entry_group = tag_dict['entry_group']
entry_name = tag_dict['fileset']
if data_asset_type == constants.SPAN_ASSET:
if 'instance' not in tag_dict or 'database' not in tag_dict:
msg = "Error: could not find the required fields in the CSV. The required fields for Spanner are instance and database"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
else:
instance = tag_dict['instance']
database = tag_dict['database']
if 'table' in tag_dict:
table = tag_dict['table']
entry_type = constants.SPAN_TABLE
if 'schema' in tag_dict:
schema = tag_dict['schema']
else:
schema = None
else:
table = None
if 'schema' in tag_dict:
schema = tag_dict['schema']
entry_type = constants.SPAN_SCHEMA
else:
schema = None
entry_type = constants.SPAN_DATABASE
if data_asset_type == constants.SQL_ASSET:
if 'instance' not in tag_dict or 'database' not in tag_dict:
msg = "Error: could not find the required fields in the CSV. The required fields for Cloud SQL are instance and database"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
else:
instance = tag_dict['instance']
database = tag_dict['database']
if 'table' in tag_dict:
table = tag_dict['table']
entry_type = constants.SQL_TABLE
if 'schema' in tag_dict:
schema = tag_dict['schema']
else:
schema = None
else:
table = None
if 'schema' in tag_dict:
schema = tag_dict['schema']
entry_type = constants.SQL_SCHEMA
else:
schema = None
entry_type = constants.SQL_DATABASE
# BQ entry types (table, dataset)
if entry_type == constants.BQ_TABLE:
entry_name = f'bigquery.googleapis.com/projects/{project}/datasets/{dataset}/tables/{table}'
entry_group = '@bigquery'
if entry_type == constants.DATASET:
entry_name = f'bigquery.googleapis.com/projects/{project}/datasets/{dataset}'
entry_group = '@bigquery'
# Spanner entry types (table, schema, database)
if entry_type == constants.SPAN_TABLE:
entry_group = '@spanner'
if schema:
entry_name = f'spanner.googleapis.com/projects/{project}/instances/{instance}/databases/{database}/tables/{schema}.{table}'
else:
entry_name = f'spanner.googleapis.com/projects/{project}/instances/{instance}/databases/{database}/tables/{table}'
if entry_type == constants.SPAN_SCHEMA:
entry_group = '@spanner'
entry_name = f'spanner.googleapis.com/projects/{project}/instances/{instance}/databases/{database}/tables/{schema}'
if entry_type == constants.SPAN_DATABASE:
entry_group = '@spanner'
entry_name = f'spanner.googleapis.com/projects/{project}/instances/{instance}/databases/{database}'
# Cloud SQL entry types (table, database)
if entry_type == constants.SQL_TABLE:
entry_group = '@cloudsql'
if schema:
entry_name = f'cloudsql.googleapis.com/projects/{project}/locations/{data_asset_region}/instances/{instance}/databases/{database}/schemas/{schema}/tables/{table}'
else:
entry_name = f'cloudsql.googleapis.com/projects/{project}/locations/{data_asset_region}/instances/{instance}/databases/{database}/tables/{table}'
if entry_type == constants.SQL_SCHEMA:
entry_group = '@cloudsql'
entry_name = f'cloudsql.googleapis.com/projects/{project}/locations/{data_asset_region}/instances/{instance}/databases/{database}/schemas/{schema}'
if entry_type == constants.SQL_DATABASE:
entry_group = '@cloudsql'
entry_name = f'cloudsql.googleapis.com/projects/{project}/locations/{data_asset_region}/instances/{instance}/databases/{database}'
entry_path = f'projects/{project}/locations/{data_asset_region}/entryGroups/{entry_group}/entries/{entry_name}'
entry_request = dataplex.GetEntryRequest(
name=entry_path,
view=dataplex.EntryView.ALL
)
try:
entry = self.client.get_entry(request=entry_request)
#print('entry:', entry)
except Exception as e:
msg = f"Error could not locate entry {entry_name}"
log_error_tag_dict(msg, e, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
# format uri for tag history table
if data_asset_type == constants.BQ_ASSET:
uri = entry.name.replace('bigquery.googleapis.com/projects/', '')
if data_asset_type == constants.FILESET_ASSET:
uri = entry.name.replace('projects/', '')
if data_asset_type == constants.SPAN_ASSET:
uri = entry.name.replace('spanner.googleapis.com/projects/', '')
if data_asset_type == constants.SQL_ASSET:
uri = entry.name.replace('cloudsql.googleapis.com/projects/', '')
target_column = None
if 'column' in tag_dict:
target_column = tag_dict['column']
column_exists = self.check_column_exists(entry.aspects, target_column)
print('column_exists:', column_exists)
if column_exists == False:
msg = f"Error could not find target column {target_column} in {entry.name}"
log_error_tag_dict(msg, None, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
uri = uri + '/column/' + target_column
aspect_type_path = f'{self.aspect_type_project}.{self.aspect_type_region}.{self.aspect_type_id}@Schema.{target_column}'
else:
aspect_type_path = f'{self.aspect_type_project}.{self.aspect_type_region}.{self.aspect_type_id}'
try:
aspect_exists, aspect_id = self.check_aspect_exists(aspect_type_path, entry.aspects)
print('aspect_exists:', aspect_exists)
except Exception as e:
msg = f"Error during check_if_aspect_exists: {entry.name}"
log_error_tag_dict(msg, e, job_uuid, tag_dict)
op_status = constants.ERROR
return op_status
if aspect_exists and overwrite == False:
msg = "Info: Aspect already exists and overwrite flag is False"
log_info_tag_dict(msg, job_uuid, tag_dict)
op_status = constants.SUCCESS
return op_status
aspect_fields = []
aspect_type_fields = self.get_aspect_type()
#print("aspect_type_fields:", aspect_type_fields)
for field_name in tag_dict:
if field_name == 'project' or field_name == 'dataset' or field_name == 'table' or \
field_name == 'column' or field_name == 'entry_group' or field_name == 'fileset' or \
field_name == 'instance' or field_name == 'database' or field_name == 'schema':
continue
found_field = False
field_value = tag_dict[field_name]
for aspect_type_field in aspect_type_fields:
if field_name == aspect_type_field['field_id']:
field_type = aspect_type_field['field_type']
found_field = True
break
if found_field != True:
print('Error preparing the aspect. {field_name} was not found in {self.aspect_type_id}')
op_status = constants.ERROR
return op_status
if field_type == 'bool':
if field_value in ('True', 'TRUE', 'true'):
field_value = True
else:
field_value = False
elif field_type in ('datetime'):
# timestamp needs to look like this: "2024-07-31T05:00:00.000Z"
if len(field_value) == 10:
field_value = f"{field_value}T12:00:00.00Z"
if len(field_value) == 19:
ts = field_value.replace(' ', 'T')
field_value = f"{ts}.00Z"
elif field_type == 'double':
field_value = float(field_value)
elif field_type == 'int':
field_value = int(field_value)
# this check allows for aspects with empty enums to get created, otherwise the empty enum gets flagged because DC thinks that you are storing an empty string as the enum value
if field_type == 'enum' and field_value == '':
continue
aspect_fields.append({'field_id': field_name, 'field_type': field_type, 'field_value': field_value})
op_status = self.create_update_delete_aspect(aspect_fields, aspect_type_path, entry_path, job_uuid, config_uuid, 'IMPORT_TAG', tag_history, uri, target_column)
return op_status
def apply_dynamic_table_config(self, fields, uri, job_uuid, config_uuid, aspect_type_uuid, tag_history):
print('*** apply_dynamic_table_config ***')
#print('fields:', fields)
#print('uri:', uri)
#print('job_uuid:', job_uuid)
#print('config_uuid:', config_uuid)
#print('aspect_type_uuid:', aspect_type_uuid)
#print('tag_history:', tag_history)
op_status = constants.SUCCESS
error_exists = False
bigquery_project = uri.split('/')[0]
# TO DO: allow user to overwrite default region with config value
bigquery_region = BIGQUERY_REGION # default to the region from the ini file
entry_name = f'bigquery.googleapis.com/projects/{uri}'
entry_path = f'projects/{bigquery_project}/locations/{bigquery_region}/entryGroups/@bigquery/entries/{entry_name}'
entry_request = dataplex.GetEntryRequest(
name=entry_path,
view=dataplex.EntryView.ALL
)
try:
entry = self.client.get_entry(request=entry_request)
#print('entry:', entry)
except Exception as e:
msg = f"Error could not locate entry {entry_name}"
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
# run query expressions
verified_field_count = 0
for field in fields:
field_id = field['field_id']
field_type = field['field_type']
query_expression = field['query_expression']
# parse the query expression
query_str = self.parse_query_expression(uri, query_expression)
print('returned query_str: ' + query_str)
# run the SQL query
# note: field_values is of type list
field_values, error_exists = self.run_query(query_str, field_type, job_uuid)
if error_exists or field_values == []:
continue
verified_field_count = verified_field_count + 1
if field_type == 'richtext':
formatted_value = ', '.join(str(v) for v in field_values)
else:
formatted_value = field_values[0]
field['field_value'] = formatted_value
if verified_field_count == 0:
# aspect is empty due to SQL errors, skip aspect creation
op_status = constants.ERROR
return op_status
aspect_type_path = f'{self.aspect_type_project}.{self.aspect_type_region}.{self.aspect_type_id}'
op_status = self.create_update_delete_aspect(fields, aspect_type_path, entry_path, job_uuid, \
config_uuid, 'DYNAMIC_TAG_TABLE', tag_history, uri, None)
return op_status
def parse_query_expression(self, uri, query_expression, column=None):
query_str = None
# analyze query expression
from_index = query_expression.rfind(" from ", 0)
where_index = query_expression.rfind(" where ", 0)
project_index = query_expression.rfind("$project", 0)
dataset_index = query_expression.rfind("$dataset", 0)
table_index = query_expression.rfind("$table", 0)
from_clause_table_index = query_expression.rfind(" from $table", 0)
from_clause_backticks_table_index = query_expression.rfind(" from `$table`", 0)
column_index = query_expression.rfind("$column", 0)
if project_index != -1:
project_end = uri.find('/')
project = uri[0:project_end]
if dataset_index != -1:
dataset_start = uri.find('/datasets/') + 10
dataset_string = uri[dataset_start:]
dataset_end = dataset_string.find('/')
if dataset_end == -1:
dataset = dataset_string[0:]
else:
dataset = dataset_string[0:dataset_end]
# $table referenced in from clause, use fully qualified table
if from_clause_table_index > 0 or from_clause_backticks_table_index > 0:
qualified_table = uri.replace('/project/', '.').replace('/datasets/', '.').replace('/tables/', '.')
query_str = query_expression.replace('$table', qualified_table)
# $table is referenced somewhere in the expression, replace $table with actual table name
else:
if table_index != -1:
table_index = uri.rfind('/') + 1
table_name = uri[table_index:]
query_str = query_expression.replace('$table', table_name)
# $project referenced in where clause too
if project_index > -1:
if query_str == None:
query_str = query_expression.replace('$project', project)
else:
query_str = query_str.replace('$project', project)
#print('query_str: ', query_str)
# $dataset referenced in where clause too
if dataset_index > -1:
if query_str == None:
query_str = query_expression.replace('$dataset', dataset)
else:
query_str = query_str.replace('$dataset', dataset)
#print('query_str: ', query_str)
# table not in query expression (e.g. select 'string')
if table_index == -1 and query_str == None:
query_str = query_expression
if column_index != -1:
if query_str == None:
query_str = query_expression.replace('$column', column)
else:
query_str = query_str.replace('$column', column)
#print('returning query_str:', query_str)
return query_str
def run_query(self, query_str, field_type, job_uuid):
field_values = []
error_exists = False
try:
#print('query_str:', query_str)
rows = self.bq_client.query_and_wait(query_str)
# if query expression is well-formed, there should only be a single row returned with a single field_value
# However, user may mistakenly run a query that returns a list of rows. In that case, grab only the top row.
row_count = 0
for row in rows:
row_count = row_count + 1
field_values.append(row[0])
if field_type != 'richtext' and row_count == 1:
return field_values, error_exists
# check row_count
if row_count == 0:
#error_exists = True
print('sql query returned nothing:', query_str)
except Exception as e:
error_exists = True
msg = 'Error occurred during run_query {}'.format(query_str)
log_error(msg, e, job_uuid)
#print('field_values: ', field_values)
return field_values, error_exists
def apply_dynamic_column_config(self, fields, columns_query, uri, job_uuid, config_uuid, aspect_type_uuid, tag_history):
print('*** apply_dynamic_column_config ***')
#print('fields:', fields)
#print('columns_query:', columns_query)
#print('uri:', uri)
#print('job_uuid:', job_uuid)
#print('config_uuid:', config_uuid)
#print('aspect_type_uuid:', aspect_type_uuid)
#print('tag_history:', tag_history)
op_status = constants.SUCCESS
error_exists = False
bigquery_project = uri.split('/')[0]
# TO DO: allow user to overwrite default region with config value
bigquery_region = BIGQUERY_REGION # default to the region from the ini file
entry_name = f'bigquery.googleapis.com/projects/{uri}'
entry_path = f'projects/{bigquery_project}/locations/{bigquery_region}/entryGroups/@bigquery/entries/{entry_name}'
entry_request = dataplex.GetEntryRequest(
name=entry_path,
view=dataplex.EntryView.ALL
)
try:
entry = self.client.get_entry(request=entry_request)
#print('entry:', entry)
except Exception as e:
msg = f"Error could not locate entry {entry_name}"
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
target_columns = [] # columns in the table which need to be tagged
columns_query = self.parse_query_expression(uri, columns_query)
#print('columns_query:', columns_query)
rows = self.bq_client.query(columns_query).result()
num_columns = 0
for row in rows:
for column in row:
print('column:', column)
target_columns.append(column)
num_columns += 1
if num_columns == 0:
# no columns to tag
msg = f"Error could not find columns to tag. Please check column_query parameter in your config. Current value: {columns_query}"
log_error(msg, None, job_uuid)
op_status = constants.ERROR
return op_status
column_fields_list = [] # list<dictionaries> where dict = {column, fields}
for target_column in target_columns:
#print('target_column:', target_column)
# fail quickly if a column is not found in the entry's schema
column_exists = self.check_column_exists(entry.aspects, target_column)
if column_exists != True:
msg = f"Error could not find column {target_column} in {entry.name}"
log_error(msg, None, job_uuid)
op_status = constants.ERROR
return op_status
verified_field_count = 0
query_strings = []
for field in fields:
query_expression = field['query_expression']
query_str = self.parse_query_expression(uri, query_expression, target_column)
query_strings.append(query_str)
print('query_strings:', query_strings)
# combine query expressions
combined_query = self.combine_queries(query_strings)
# run combined query, adding the results to the field_values for each field
# Note: field_values is of type list
fields, error_exists = self.run_combined_query(combined_query, target_column, fields, job_uuid)
print('fields:', fields)
if error_exists:
op_status = constants.ERROR
continue
aspect_type_path = f'{self.aspect_type_project}.{self.aspect_type_region}.{self.aspect_type_id}@Schema.{target_column}'
uri_column = f'{uri}/column/{target_column}'
op_status = self.create_update_delete_aspect(fields, aspect_type_path, entry_path, job_uuid, \
config_uuid, 'DYNAMIC_TAG_COLUMN', tag_history, uri_column, target_column)
# fail fast if aspect does not get created, updated or deleted
if op_status == constants.ERROR:
return op_status
return op_status
def combine_queries(self, query_strings):
large_query = "select "
for query in query_strings:
large_query += "({}), ".format(query)
return large_query[0:-2]
def run_combined_query(self, combined_query, column, fields, job_uuid):
error_exists = False
try:
rows = self.bq_client.query_and_wait(combined_query)
row_count = 0
for row in rows:
for i, field in enumerate(fields):
field['field_value'] = row[i]
row_count += 1
if row_count == 0:
error_exists = True
print('sql query returned empty set:', combined_query)
except Exception as e:
error_exists = True
msg = 'Error occurred during run_combined_query {}'.format(combined_query)
log_error(msg, e, job_uuid)
return fields, error_exists
def create_update_delete_aspect(self, aspect_fields, aspect_type_path, entry_path, job_uuid, config_uuid, config_type, tag_history, uri, target_column):
#print("enter create_update_delete_tag")
#print("aspect_fields:", aspect_fields)
#print("aspect_type_path:", aspect_type_path)
#print("entry_path:", entry_path)
#print("job_uuid:", job_uuid)
#print("config_uuid:", config_uuid)
#print("config_type:", config_type)
#print("tag_history:", tag_history)
#print("uri:", uri)
op_status = constants.SUCCESS
valid_field = False
num_fields = len(aspect_fields)
num_empty_values = 0
aspect = dataplex.Aspect()
aspect.path = "Schema.descriptor"
aspect_data_dict = {}
for field in aspect_fields:
aspect_data_dict[field['field_id']] = field['field_value']
aspect_data_struct = struct_pb2.Struct()
json_format.ParseDict(aspect_data_dict, aspect_data_struct, ignore_unknown_fields=False)
aspect.data = aspect_data_struct
entry = dataplex.Entry()
entry.name = entry_path
entry.aspects = {aspect_type_path: aspect}
#print("submitting entry for update:", entry)
try:
update_entry_request = dataplex.UpdateEntryRequest(
entry=entry,
update_mask="aspects",
)
resp = self.client.update_entry(request=update_entry_request)
#print('update entry resp:', resp)
except Exception as e:
msg = f"Error while updating the entry"
log_error(msg, error=str(e), job_uuid=job_uuid)
op_status = constants.ERROR
return op_status
if tag_history and op_status == constants.SUCCESS:
bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
aspect_type_fields = self.get_aspect_type()
success = bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.aspect_type_id, aspect_type_fields, uri, target_column, aspect_fields)
if success == False:
msg = 'Error occurred while writing to tag history table'
log_error(msg, error='', job_uuid=job_uuid)
op_status = constants.ERROR
return op_status
if __name__ == '__main__':
import google.auth
from google.auth import impersonated_credentials
SCOPES = ['openid', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/userinfo.email']
source_credentials, _ = google.auth.default()
target_service_account = config['DEFAULT']['TAG_CREATOR_SA']
credentials = impersonated_credentials.Credentials(source_credentials=source_credentials,
target_principal=target_service_account,
target_scopes=SCOPES,
lifetime=1200)
aspect_type_id = 'data-governance'
aspect_type_project = 'tag-engine-develop'
aspect_type_region = 'us-central1'
aspect_type_uuid = 'Bofcfg9kkkFz4d0Dk2SM'
job_uuid = '238f7420f7a211ef915a42004e494300'
config_uuid = 'b8a4616ef79e11efa14242004e494300'
data_asset_type = 'fileset'
data_asset_region = 'us-central1'
tag_dict = {'project': 'tag-engine-develop', 'entry_group': 'sakila_eg', 'fileset': 'city', 'data_domain': 'LOGISTICS', 'broad_data_category': 'CONTENT', 'data_creation': '2023-11-10', 'data_ownership': 'THIRD_PARTY_OPS', 'data_asset_owner': 'John Smith', 'data_confidentiality': 'PUBLIC', 'data_retention': 'DAYS_90', 'data_asset_documentation': 'https://dev.mysql.com/doc/sakila/en/sakila-structure.html'}
tag_history = True
dpc = DataplexController(credentials, target_service_account, 'scohen@gcp.solutions', aspect_type_id, aspect_type_project, aspect_type_region)
dpc.apply_import_config(job_uuid, config_uuid, data_asset_type, data_asset_region, tag_dict, tag_history)