in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/main.py [0:0]
def _run_task():
creation_status = constants.ERROR
json_request = request.get_json(force=True)
job_uuid = json_request['job_uuid']
config_uuid = json_request['config_uuid']
config_type = json_request['config_type']
shard_uuid = json_request['shard_uuid']
task_uuid = json_request['task_uuid']
tag_creator_sa = json_request['tag_creator_account']
tag_invoker_sa = json_request['tag_invoker_account']
if 'uri' in json_request:
uri = json_request['uri']
else:
uri = None
#print('uri: ', uri)
if 'tag_extract' in json_request:
tag_extract = json_request['tag_extract']
#print('tag_extract: ', tag_extact)
else:
tag_extract = None
credentials, success = get_target_credentials(tag_creator_sa)
if success == False:
print('Error acquiring credentials from', tag_creator_sa)
tm.update_task_status(shard_uuid, task_uuid, 'ERROR')
# retrieve the config
tm.update_task_status(shard_uuid, task_uuid, 'RUNNING')
config = store.read_config(tag_creator_sa, config_uuid, config_type)
print('config: ', config)
if config_type == 'TAG_EXPORT':
dcc = controller.DataCatalogController(credentials)
elif config_type == 'TAG_IMPORT':
if 'template_id' not in config or 'template_project' not in config or 'template_region' not in config:
response = {
"status": "error",
"message": "Request JSON is missing required template parameters",
}
return jsonify(response), 400
dcc = controller.DataCatalogController(credentials, tag_creator_sa, tag_invoker_sa, \
config['template_id'], config['template_project'], config['template_region'])
elif config_type == 'TAG_RESTORE':
if 'target_template_id' not in config or 'target_template_project' not in config or 'target_template_region' not in config:
response = {
"status": "error",
"message": "Request JSON is missing some required target tag template parameters",
}
return jsonify(response), 400
if 'source_template_id' not in config or 'source_template_project' not in config or 'source_template_region' not in config:
response = {
"status": "error",
"message": "Request JSON is missing some required source tag template parameters",
}
return jsonify(response), 400
dcc = controller.DataCatalogController(credentials, tag_creator_sa, tag_invoker_sa, \
config['target_template_id'], config['target_template_project'],
config['target_template_region'])
else:
if 'template_uuid' not in config:
response = {
"status": "error",
"message": "Request JSON is missing some required template_uuid parameter",
}
return jsonify(response), 400
template_config = store.read_tag_template_config(config['template_uuid'])
dcc = controller.DataCatalogController(credentials, tag_creator_sa, tag_invoker_sa, \
template_config['template_id'], template_config['template_project'], \
template_config['template_region'])
if config_type == 'DYNAMIC_TAG_TABLE':
creation_status = dcc.apply_dynamic_table_config(config['fields'], uri, job_uuid, config_uuid, \
config['template_uuid'], config['tag_history'])
if config_type == 'DYNAMIC_TAG_COLUMN':
creation_status = dcc.apply_dynamic_column_config(config['fields'], config['included_columns_query'], uri, job_uuid, config_uuid, \
config['template_uuid'], config['tag_history'])
if config_type == 'STATIC_TAG_ASSET':
creation_status = dcc.apply_static_asset_config(config['fields'], uri, job_uuid, config_uuid, \
config['template_uuid'], config['tag_history'], \
config['overwrite'])
if config_type == 'ENTRY_CREATE':
creation_status = dcc.apply_entry_config(config['fields'], uri, job_uuid, config_uuid, \
config['template_uuid'], config['tag_history'])
if config_type == 'GLOSSARY_TAG_ASSET':
creation_status = dcc.apply_glossary_asset_config(config['fields'], config['mapping_table'], uri, job_uuid, config_uuid, \
config['template_uuid'], config['tag_history'], config['overwrite'])
if config_type == 'SENSITIVE_TAG_COLUMN':
creation_status = dcc.apply_sensitive_column_config(config['fields'], config['dlp_dataset'], config['infotype_selection_table'], \
config['infotype_classification_table'], uri, config['create_policy_tags'], \
config['taxonomy_id'], job_uuid, config_uuid, \
config['template_uuid'], config['tag_history'], \
config['overwrite'])
if config_type == 'TAG_EXPORT':
creation_status = dcc.apply_export_config(config['config_uuid'], config['target_project'], config['target_dataset'], config['target_region'], uri)
if config_type == 'TAG_IMPORT':
creation_status = dcc.apply_import_config(job_uuid, config_uuid, config['data_asset_type'], config['data_asset_region'], \
tag_extract, config['tag_history'], config['overwrite'])
if config_type == 'TAG_RESTORE':
creation_status = dcc.apply_restore_config(job_uuid, config_uuid, tag_extract, \
config['tag_history'], config['overwrite'])
if creation_status == constants.SUCCESS:
tm.update_task_status(shard_uuid, task_uuid, 'SUCCESS')
else:
tm.update_task_status(shard_uuid, task_uuid, 'ERROR')
# fan-in
tasks_success, tasks_failed, pct_complete = jm.calculate_job_completion(job_uuid)
print('tasks_success:', tasks_success)
print('tasks_failed:', tasks_failed)
print('pct_complete:', pct_complete)
if pct_complete == 100:
if tasks_failed > 0:
store.update_job_status(config_uuid, config_type, 'ERROR')
jm.set_job_status(job_uuid, 'ERROR')
store.update_scheduling_status(config_uuid, config_type, 'READY')
resp = jsonify(success=True)
else:
store.update_job_status(config_uuid, config_type, 'SUCCESS')
jm.set_job_status(job_uuid, 'SUCCESS')
resp = jsonify(success=False)
else:
store.update_job_status(config_uuid, config_type, 'RUNNING: {}% complete'.format(pct_complete))
jm.set_job_status(job_uuid, 'RUNNING: {}% complete'.format(pct_complete))
resp = jsonify(success=True)
return resp