def _run_task()

in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/main.py [0:0]


def _run_task():
    
    creation_status = constants.ERROR
    
    json_request = request.get_json(force=True)
    job_uuid = json_request['job_uuid']
    config_uuid = json_request['config_uuid']
    config_type = json_request['config_type']
    shard_uuid = json_request['shard_uuid']
    task_uuid = json_request['task_uuid']
    tag_creator_sa = json_request['tag_creator_account']
    tag_invoker_sa = json_request['tag_invoker_account']
    
    if 'uri' in json_request:
        uri = json_request['uri']
    else:
        uri = None
        #print('uri: ', uri)
        
    if 'tag_extract' in json_request:
        tag_extract = json_request['tag_extract']
        #print('tag_extract: ', tag_extact)
    else:
        tag_extract = None
        
    
    credentials, success = get_target_credentials(tag_creator_sa)
    
    if success == False:
        print('Error acquiring credentials from', tag_creator_sa)
        tm.update_task_status(shard_uuid, task_uuid, 'ERROR')   
    
    # retrieve the config 
    tm.update_task_status(shard_uuid, task_uuid, 'RUNNING')
    
    config = store.read_config(tag_creator_sa, config_uuid, config_type)
    print('config: ', config)
           
    if config_type == 'TAG_EXPORT':
        dcc = controller.DataCatalogController(credentials)
    
    elif config_type == 'TAG_IMPORT':
        
        if 'template_id' not in config or 'template_project' not in config or 'template_region' not in config:
            response = {
                    "status": "error",
                    "message": "Request JSON is missing required template parameters",
            }
            return jsonify(response), 400
        
        dcc = controller.DataCatalogController(credentials, tag_creator_sa, tag_invoker_sa, \
                                               config['template_id'], config['template_project'], config['template_region'])
    
    elif config_type == 'TAG_RESTORE':
        
        if 'target_template_id' not in config or 'target_template_project' not in config or 'target_template_region' not in config:
            response = {
                    "status": "error",
                    "message": "Request JSON is missing some required target tag template parameters",
            }
            return jsonify(response), 400
        if 'source_template_id' not in config or 'source_template_project' not in config or 'source_template_region' not in config:
            response = {
                    "status": "error",
                    "message": "Request JSON is missing some required source tag template parameters",
            }
            return jsonify(response), 400
        
        dcc = controller.DataCatalogController(credentials, tag_creator_sa, tag_invoker_sa, \
                                                config['target_template_id'], config['target_template_project'], 
                                                config['target_template_region'])
    else:
        if 'template_uuid' not in config:
            response = {
                    "status": "error",
                    "message": "Request JSON is missing some required template_uuid parameter",
            }
            return jsonify(response), 400
            
        template_config = store.read_tag_template_config(config['template_uuid'])
        dcc = controller.DataCatalogController(credentials, tag_creator_sa, tag_invoker_sa, \
                                               template_config['template_id'], template_config['template_project'], \
                                               template_config['template_region'])
            
    
    if config_type == 'DYNAMIC_TAG_TABLE':
        creation_status = dcc.apply_dynamic_table_config(config['fields'], uri, job_uuid, config_uuid, \
                                                         config['template_uuid'], config['tag_history'])                                               
    if config_type == 'DYNAMIC_TAG_COLUMN':
        creation_status = dcc.apply_dynamic_column_config(config['fields'], config['included_columns_query'], uri, job_uuid, config_uuid, \
                                                          config['template_uuid'], config['tag_history'])
        
    if config_type == 'STATIC_TAG_ASSET':
        creation_status = dcc.apply_static_asset_config(config['fields'], uri, job_uuid, config_uuid, \
                                                        config['template_uuid'], config['tag_history'], \
                                                        config['overwrite'])                                                   
    if config_type == 'ENTRY_CREATE':
        creation_status = dcc.apply_entry_config(config['fields'], uri, job_uuid, config_uuid, \
                                                 config['template_uuid'], config['tag_history']) 
    if config_type == 'GLOSSARY_TAG_ASSET':
        creation_status = dcc.apply_glossary_asset_config(config['fields'], config['mapping_table'], uri, job_uuid, config_uuid, \
                                                    config['template_uuid'], config['tag_history'], config['overwrite'])
    if config_type == 'SENSITIVE_TAG_COLUMN':
        creation_status = dcc.apply_sensitive_column_config(config['fields'], config['dlp_dataset'], config['infotype_selection_table'], \
                                                            config['infotype_classification_table'], uri, config['create_policy_tags'], \
                                                            config['taxonomy_id'], job_uuid, config_uuid, \
                                                            config['template_uuid'], config['tag_history'], \
                                                            config['overwrite'])
    if config_type == 'TAG_EXPORT':
        creation_status = dcc.apply_export_config(config['config_uuid'], config['target_project'], config['target_dataset'], config['target_region'], uri)
    
    if config_type == 'TAG_IMPORT':
        creation_status = dcc.apply_import_config(job_uuid, config_uuid, config['data_asset_type'], config['data_asset_region'], \
                                                  tag_extract, config['tag_history'], config['overwrite'])
    if config_type == 'TAG_RESTORE':
        creation_status = dcc.apply_restore_config(job_uuid, config_uuid, tag_extract, \
                                                   config['tag_history'], config['overwrite'])
                                              
    if creation_status == constants.SUCCESS:
        tm.update_task_status(shard_uuid, task_uuid, 'SUCCESS')
    else:
        tm.update_task_status(shard_uuid, task_uuid, 'ERROR')
    
    # fan-in
    tasks_success, tasks_failed, pct_complete = jm.calculate_job_completion(job_uuid)
    print('tasks_success:', tasks_success)
    print('tasks_failed:', tasks_failed)
    print('pct_complete:', pct_complete)
        
    if pct_complete == 100:
        if tasks_failed > 0:
            store.update_job_status(config_uuid, config_type, 'ERROR')
            jm.set_job_status(job_uuid, 'ERROR')
            store.update_scheduling_status(config_uuid, config_type, 'READY')
            resp = jsonify(success=True)
        else:
            store.update_job_status(config_uuid, config_type, 'SUCCESS')
            jm.set_job_status(job_uuid, 'SUCCESS')
            resp = jsonify(success=False)
    else:
        store.update_job_status(config_uuid, config_type, 'RUNNING: {}% complete'.format(pct_complete))
        jm.set_job_status(job_uuid, 'RUNNING: {}% complete'.format(pct_complete))
        resp = jsonify(success=True)
    
    return resp