in muss/preprocessing.py [0:0]
def get_parallel_file_pair_preprocessor(file_pair_preprocessor, n_jobs):
if n_jobs == 1:
return file_pair_preprocessor
n_jobs = get_real_n_jobs(n_jobs)
@wraps(file_pair_preprocessor)
def parallel_file_pair_preprocessor(
complex_filepath, simple_filepath, output_complex_filepath, output_simple_filepath
):
temp_complex_filepaths = get_temp_filepaths(n_jobs)
temp_simple_filepaths = get_temp_filepaths(n_jobs)
split_file(complex_filepath, temp_complex_filepaths, round_robin=True)
split_file(simple_filepath, temp_simple_filepaths, round_robin=True)
preprocessed_temp_complex_filepaths = get_temp_filepaths(n_jobs)
preprocessed_temp_simple_filepaths = get_temp_filepaths(n_jobs)
tasks = [
delayed(file_pair_preprocessor)(*paths)
for paths in zip(
temp_complex_filepaths,
temp_simple_filepaths,
preprocessed_temp_complex_filepaths,
preprocessed_temp_simple_filepaths,
)
]
Parallel(n_jobs=n_jobs)(tasks)
merge_files(preprocessed_temp_complex_filepaths, output_complex_filepath, round_robin=True)
merge_files(preprocessed_temp_simple_filepaths, output_simple_filepath, round_robin=True)
delete_files(temp_complex_filepaths)
delete_files(temp_simple_filepaths)
delete_files(preprocessed_temp_complex_filepaths)
delete_files(preprocessed_temp_simple_filepaths)
return parallel_file_pair_preprocessor