in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/main.py [0:0]
def _split_work():
json_request = request.get_json(force=True)
print('json_request: ', json_request)
job_uuid = json_request['job_uuid']
config_uuid = json_request['config_uuid']
config_type = json_request['config_type']
tag_creator_sa = json_request['tag_creator_account']
tag_invoker_sa = json_request['tag_invoker_account']
config = store.read_config(tag_creator_sa, config_uuid, config_type)
print('config: ', config)
if config == {}:
resp = jsonify(success=False)
return resp
# get the credentials for the SA that is associated with this config
credentials, success = get_target_credentials(tag_creator_sa)
if success == False:
print('Error acquiring credentials from', tag_creator_sa)
update_job_status(self, config_uuid, config_type, 'ERROR')
resp = jsonify(success=False)
return resp
re = res.Resources(credentials)
# dynamic table and dynamic column and sensitive column configs
if 'included_tables_uris' in config:
uris = list(re.get_resources(config.get('included_tables_uris'), config.get('excluded_tables_uris', None)))
print('inside _split_work() uris: ', uris)
jm.record_num_tasks(job_uuid, len(uris))
jm.update_job_running(job_uuid)
tm.create_config_uuid_tasks(tag_creator_sa, tag_invoker_sa, job_uuid, config_uuid, config_type, uris)
# static asset config and glossary asset config
if 'included_assets_uris' in config:
uris = list(re.get_resources(config.get('included_assets_uris'), config.get('excluded_assets_uris', None)))
print('inside _split_work() uris: ', uris)
jm.record_num_tasks(job_uuid, len(uris))
jm.update_job_running(job_uuid)
tm.create_config_uuid_tasks(tag_creator_sa, tag_invoker_sa, job_uuid, config_uuid, config_type, uris)
# export tag config
if config_type == 'TAG_EXPORT':
bqu = bq.BigQueryUtils(credentials, config['target_region'])
# create report tables if they don't exist
tables_created = bqu.create_report_tables(config['target_project'], config['target_dataset'])
print('Info: created report tables:', tables_created)
if tables_created == False and config['write_option'] == 'truncate':
bqu.truncate_report_tables(config['target_project'], config['target_dataset'])
print('Info: truncated report tables')
if config['source_folder']:
uris = re.get_resources_by_folder(config['source_folder'])
else:
uris = re.get_resources_by_project(config['source_projects'])
print('Info: Number of uris:', uris)
print('Info: uris:', uris)
jm.record_num_tasks(job_uuid, len(uris))
jm.update_job_running(job_uuid)
tm.create_config_uuid_tasks(tag_creator_sa, tag_invoker_sa, job_uuid, config_uuid, config_type, uris)
# import or restore tag config
if config_type == 'TAG_IMPORT' or config_type == 'TAG_RESTORE':
if config_type == 'TAG_IMPORT':
try:
csv_files = list(re.get_resources(config.get('metadata_import_location'), None))
print('csv_files: ', csv_files)
except Exception as e:
msg = 'Error: unable to read CSV from {}'.format(config.get('metadata_import_location'))
log_error(msg, e, job_uuid)
store.update_job_status(config_uuid, config_type, 'ERROR')
jm.set_job_status(job_uuid, 'ERROR')
resp = jsonify(success=False)
return resp
if len(csv_files) == 0:
msg = 'Error: unable to read CSV from {}'.format(config.get('metadata_import_location'))
error = {'job_uuid': job_uuid, 'msg': msg}
print(json.dumps(error))
store.update_job_status(config_uuid, config_type, 'ERROR')
jm.set_job_status(job_uuid, 'ERROR')
resp = jsonify(success=False)
return resp
extracted_tags = []
for csv_file in csv_files:
extracted_tags.extend(cp.CsvParser.extract_tags(credentials, csv_file))
if len(extracted_tags) == 0:
print('Error: unable to extract tags from CSV. Please verify the format of the CSV.')
store.update_job_status(config_uuid, config_type, 'ERROR')
jm.set_job_status(job_uuid, 'ERROR')
resp = jsonify(success=False)
return resp
# infer the data_asset_type if not present in the config
if 'data_asset_type' not in config or config.get('data_asset_type') == None:
if (extracted_tags[0].keys() >= {'dataset'}):
config['data_asset_type'] = 'bigquery'
elif (extracted_tags[0].keys() >= {'entry_group', 'fileset'}):
config['data_asset_type'] = 'fileset'
elif (extracted_tags[0].keys() >= {'instance', 'database'}):
config['data_asset_type'] = 'spanner'
else:
print('Error: unable to determine the data asset type of your config (bigquery, fileset, or spanner). Please add data_asset_type to your config and verify the format of your CSV.')
store.update_job_status(config_uuid, config_type, 'ERROR')
jm.set_job_status(job_uuid, 'ERROR')
resp = jsonify(success=False)
return resp
# save the update to Firestore
store.update_tag_import_config(config_uuid, config.get('data_asset_type'), None, None)
# infer the data_asset_region if not present in the config
if 'data_asset_region' not in config or config.get('data_asset_region') == None:
if config.get('data_asset_type') == 'bigquery':
config['data_asset_region'] = BIGQUERY_REGION
elif config.get('data_asset_type') == 'fileset':
config['data_asset_region'] = FILESET_REGION
elif config.get('data_asset_type') == 'spanner':
config['data_asset_region'] = SPANNER_REGION
else:
print('Error: unable to determine the data asset region of your config (us-central1, etc.). Please add data_asset_region to your config or add the appropriate default region variable to tagengine.ini.')
store.update_job_status(config_uuid, config_type, 'ERROR')
jm.set_job_status(job_uuid, 'ERROR')
resp = jsonify(success=False)
return resp
# save the update to Firestore
store.update_tag_import_config(config_uuid, None, config.get('data_asset_region'), None)
if config_type == 'TAG_RESTORE':
bkp_files = list(re.get_resources(config.get('metadata_export_location'), None))
#print('bkp_files: ', bkp_files)
extracted_tags = []
for bkp_file in bkp_files:
extracted_tags.append(bfp.BackupFileParser.extract_tags(credentials, \
config.get('source_template_id'), \
config.get('source_template_project'), \
bkp_file))
# no tags were extracted from the CSV files
if extracted_tags == [[]]:
resp = jsonify(success=False)
return resp
jm.record_num_tasks(job_uuid, len(extracted_tags))
jm.update_job_running(job_uuid)
tm.create_tag_extract_tasks(tag_creator_sa, tag_invoker_sa, job_uuid, config_uuid, config_type, extracted_tags)
# update the status of the config, no matter which config type is running
store.update_job_status(config_uuid, config_type, 'RUNNING')
jm.set_job_status(job_uuid, 'RUNNING')
resp = jsonify(success=True)
return resp