def _split_work()

in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/main.py [0:0]


def _split_work():
    
    json_request = request.get_json(force=True)
    print('json_request: ', json_request)
    
    job_uuid = json_request['job_uuid']
    config_uuid = json_request['config_uuid']
    config_type = json_request['config_type']
    tag_creator_sa = json_request['tag_creator_account']
    tag_invoker_sa = json_request['tag_invoker_account']

    config = store.read_config(tag_creator_sa, config_uuid, config_type)    
    print('config: ', config)
    
    if config == {}:
       resp = jsonify(success=False)
       return resp 
    
    # get the credentials for the SA that is associated with this config
    credentials, success = get_target_credentials(tag_creator_sa)
    
    if success == False:
        print('Error acquiring credentials from', tag_creator_sa)
        update_job_status(self, config_uuid, config_type, 'ERROR')
        resp = jsonify(success=False)
        return resp
       
    re = res.Resources(credentials) 
    
    # dynamic table and dynamic column and sensitive column configs
    if 'included_tables_uris' in config:
        uris = list(re.get_resources(config.get('included_tables_uris'), config.get('excluded_tables_uris', None)))
        
        print('inside _split_work() uris: ', uris)
        
        jm.record_num_tasks(job_uuid, len(uris))
        jm.update_job_running(job_uuid) 
        tm.create_config_uuid_tasks(tag_creator_sa, tag_invoker_sa, job_uuid, config_uuid, config_type, uris)
    
    # static asset config and glossary asset config    
    if 'included_assets_uris' in config:
        uris = list(re.get_resources(config.get('included_assets_uris'), config.get('excluded_assets_uris', None)))
        
        print('inside _split_work() uris: ', uris)
        
        jm.record_num_tasks(job_uuid, len(uris))
        jm.update_job_running(job_uuid) 
        tm.create_config_uuid_tasks(tag_creator_sa, tag_invoker_sa, job_uuid, config_uuid, config_type, uris)
    
    # export tag config
    if config_type == 'TAG_EXPORT':
        
        bqu = bq.BigQueryUtils(credentials, config['target_region'])
        
        # create report tables if they don't exist
        tables_created = bqu.create_report_tables(config['target_project'], config['target_dataset'])
        print('Info: created report tables:', tables_created)
        
        if tables_created == False and config['write_option'] == 'truncate':
            bqu.truncate_report_tables(config['target_project'], config['target_dataset'])
            print('Info: truncated report tables')

        if config['source_folder']:
            uris = re.get_resources_by_folder(config['source_folder'])
        else:
            uris = re.get_resources_by_project(config['source_projects'])
        
        print('Info: Number of uris:', uris)
        print('Info: uris:', uris)
        
        jm.record_num_tasks(job_uuid, len(uris))
        jm.update_job_running(job_uuid) 
        tm.create_config_uuid_tasks(tag_creator_sa, tag_invoker_sa, job_uuid, config_uuid, config_type, uris)
    
    # import or restore tag config
    if config_type == 'TAG_IMPORT' or config_type == 'TAG_RESTORE':
                    
        if config_type == 'TAG_IMPORT':
            
            try:
                csv_files = list(re.get_resources(config.get('metadata_import_location'), None))
                print('csv_files: ', csv_files)
            except Exception as e:
                msg = 'Error: unable to read CSV from {}'.format(config.get('metadata_import_location'))
                log_error(msg, e, job_uuid)
                
                store.update_job_status(config_uuid, config_type, 'ERROR')
                jm.set_job_status(job_uuid, 'ERROR')
                resp = jsonify(success=False)
                return resp
            
            if len(csv_files) == 0:
                msg = 'Error: unable to read CSV from {}'.format(config.get('metadata_import_location'))
                error = {'job_uuid': job_uuid, 'msg': msg}
                print(json.dumps(error))
                
                store.update_job_status(config_uuid, config_type, 'ERROR')
                jm.set_job_status(job_uuid, 'ERROR')
                resp = jsonify(success=False)
                return resp
            
            extracted_tags = []
        
            for csv_file in csv_files:
                extracted_tags.extend(cp.CsvParser.extract_tags(credentials, csv_file))
                
            if len(extracted_tags) == 0:
                print('Error: unable to extract tags from CSV. Please verify the format of the CSV.') 
                store.update_job_status(config_uuid, config_type, 'ERROR')
                jm.set_job_status(job_uuid, 'ERROR')
                resp = jsonify(success=False)
                return resp
                
            # infer the data_asset_type if not present in the config
            if 'data_asset_type' not in config or config.get('data_asset_type') == None:
                if (extracted_tags[0].keys() >= {'dataset'}):
                    config['data_asset_type'] = 'bigquery'
                elif (extracted_tags[0].keys() >= {'entry_group', 'fileset'}):
                    config['data_asset_type'] = 'fileset'
                elif (extracted_tags[0].keys() >= {'instance', 'database'}):
                    config['data_asset_type'] = 'spanner'
                else:
                    print('Error: unable to determine the data asset type of your config (bigquery, fileset, or spanner). Please add data_asset_type to your config and verify the format of your CSV.') 
                    store.update_job_status(config_uuid, config_type, 'ERROR')
                    jm.set_job_status(job_uuid, 'ERROR')
                    resp = jsonify(success=False)
                    return resp
                
                # save the update to Firestore
                store.update_tag_import_config(config_uuid, config.get('data_asset_type'), None, None)     
             
            # infer the data_asset_region if not present in the config 
            if 'data_asset_region' not in config or config.get('data_asset_region') == None:
                if config.get('data_asset_type') == 'bigquery':
                    config['data_asset_region'] = BIGQUERY_REGION
                elif config.get('data_asset_type') == 'fileset':
                    config['data_asset_region'] = FILESET_REGION
                elif config.get('data_asset_type') == 'spanner':
                    config['data_asset_region'] = SPANNER_REGION    
                else:
                    print('Error: unable to determine the data asset region of your config (us-central1, etc.). Please add data_asset_region to your config or add the appropriate default region variable to tagengine.ini.') 
                    store.update_job_status(config_uuid, config_type, 'ERROR')
                    jm.set_job_status(job_uuid, 'ERROR')
                    resp = jsonify(success=False)
                    return resp    
                
                # save the update to Firestore
                store.update_tag_import_config(config_uuid, None, config.get('data_asset_region'), None)


        if config_type == 'TAG_RESTORE':
            bkp_files = list(re.get_resources(config.get('metadata_export_location'), None))
        
            #print('bkp_files: ', bkp_files)
            extracted_tags = []
        
            for bkp_file in bkp_files:
                extracted_tags.append(bfp.BackupFileParser.extract_tags(credentials, \
                                                                        config.get('source_template_id'), \
                                                                        config.get('source_template_project'), \
                                                                        bkp_file))
             
        # no tags were extracted from the CSV files
        if extracted_tags == [[]]:
           resp = jsonify(success=False)
           return resp
        
        jm.record_num_tasks(job_uuid, len(extracted_tags))
        jm.update_job_running(job_uuid) 
        tm.create_tag_extract_tasks(tag_creator_sa, tag_invoker_sa, job_uuid, config_uuid, config_type, extracted_tags)
    

    # update the status of the config, no matter which config type is running
    store.update_job_status(config_uuid, config_type, 'RUNNING')
    jm.set_job_status(job_uuid, 'RUNNING')
    resp = jsonify(success=True)
    return resp