scripts/clean_training_data/process_sorted_buckets.py (80 lines of code) (raw):

""" Processes each sorted bucket, creating a new file listing all ngrams that matched more then 10 unique documents with their unique document counts. Uses multiprocessing and very little memory as we stream from presorted buckets. Will use a lot of disk though. Arguments --------- --working_directory (-dir) Directory containing the sorted buckets, processed files will be deposited here. Default: current directory --move_dir (-move) Directory to move processed 13grams too. Default: Do nothing --process_count (-procs) Number of processes to use. Default: 4 """ import argparse import glob import logging import os import re import shutil from pathlib import Path from tqdm import tqdm from tqdm_multiprocess import TqdmMultiProcessPool from tqdm_multiprocess.logger import setup_logger_tqdm from scripts.clean_training_data.archiver import TextArchive, TextReader logger = logging.getLogger(__name__) # Multiprocessed def process_bucket( bucket_file_path, processed_directory, move_dir, tqdm_func, global_tqdm ): bucket_id = re.sub("\D", "", os.path.basename(bucket_file_path)) # noqa: W605 done_file = os.path.join( processed_directory, f"ngram_bucket_processing_{bucket_id}.done" ) if os.path.exists(done_file): logger.info(f"bucket {bucket_id} already processed, skipping") return # For managing tqdm file_size = os.path.getsize(bucket_file_path) bucket_progress = tqdm_func( total=file_size, dynamic_ncols=True, unit="byte", unit_scale=1 ) current_file_position = 0 update_frequency = 100 * 1000000 # 100mb update_counter = 0 # Iterate through and output ngrams which occur in more then 10 documents bucket = TextReader(bucket_file_path) output_file_path = bucket_file_path + ".processed" output_archive = TextArchive(output_file_path, mode="wb") current_ngram = "" current_ngram_document_ids = set() for line in bucket.read(): [ngram, document_id] = line.rsplit(" ", 1) # Write ngram if more then 10 unique document occurrences if ngram != current_ngram: if len(current_ngram_document_ids) > 10: output_archive.add_data( f"{current_ngram} {len(current_ngram_document_ids)}" ) current_ngram = ngram current_ngram_document_ids = set() current_ngram_document_ids.add(document_id) # Update tqdm update_counter += bucket.fh.tell() - current_file_position current_file_position = bucket.fh.tell() if update_counter > update_frequency: bucket_progress.update(update_counter) update_counter = 0 # Remainder if len(current_ngram_document_ids) > 10: output_archive.add_data(f"{current_ngram} {len(current_ngram_document_ids)}") output_archive.commit() Path(done_file).touch() if move_dir: shutil.move(output_file_path, move_dir) global_tqdm.update() def process_sorted_buckets(working_directory, move_dir, process_count): bucket_file_paths = glob.glob(os.path.join(working_directory, "*.bkt.txt.sorted")) processed_directory = os.path.join(working_directory, "processed") os.makedirs(processed_directory, exist_ok=True) pool = TqdmMultiProcessPool(process_count) tasks = [ (process_bucket, (bucket_file, processed_directory, move_dir)) for bucket_file in bucket_file_paths ] global_tqdm = tqdm(total=len(bucket_file_paths), dynamic_ncols=True, unit="bucket") def on_done(_): return None def on_error(_): return None _ = pool.map(global_tqdm, tasks, on_error, on_done) parser = argparse.ArgumentParser(description="Process 13 grams from sorted buckets.") parser.add_argument("-dir", "--working_directory", default="") parser.add_argument("-move", "--move_dir", default="") parser.add_argument("-procs", "--process_count", type=int, default=4) if __name__ == "__main__": logfile_path = "process13grams.log" setup_logger_tqdm(logfile_path) args = parser.parse_args() process_sorted_buckets(args.working_directory, args.move_dir, args.process_count)