def do_ngrams_in_buckets()

in scripts/clean_training_data/generate_13_grams.py [0:0]


def do_ngrams_in_buckets(n_value, working_directory, bucket_count):
    pile_statistics = json.load(open("pile_statistics.json", "r", encoding="utf-8"))
    pile_document_count = pile_statistics["Document Count"]
    start_offsets = pile_statistics["File Start Offsets"]

    output_directory = os.path.join(working_directory, "output")
    os.makedirs(output_directory, exist_ok=True)

    logger.info(f"Generating {n_value}-grams and bucketing.")

    # Done file
    done_file = os.path.join(output_directory, "ngram_buckets.done")
    if os.path.exists(done_file):
        logger.info("ngrams already generated and bucketed, skipping")
        return

    # Checkpoint
    checkpoint_file = os.path.join(working_directory, "pile_offset.ckpt")
    if os.path.exists(checkpoint_file):
        checkpoint_offset = pickle.load(open(checkpoint_file, "rb"))
        iterate = True
    else:
        checkpoint_offset = 0
        iterate = False

    logger.info(f"Starting at pile document index {checkpoint_offset}")
    buckets = Buckets(output_directory, bucket_count)

    janitor = Janitor()
    batch_size = 1000
    batch_counter = 0

    with tqdm(total=checkpoint_offset, dynamic_ncols=True, unit="docs") as progress:
        for offset, document in yield_pile(start_offsets, checkpoint_offset):
            if iterate:
                logger.info(f"Iterating to offset {checkpoint_offset} from {offset}")
                progress.update(offset)
                iterate = False

            if offset < checkpoint_offset:
                progress.update()

                if terminate:
                    return
                continue

            if offset == checkpoint_offset:
                progress.reset(total=pile_document_count)
                progress.update(checkpoint_offset)

            # Save checkpoint every "batch_size", only allow terminate after checkpoint
            if batch_counter == batch_size:
                progress.update(batch_size)
                batch_counter = 0
                buckets.save_checkpoint()
                pickle.dump(offset, open(checkpoint_file, "wb"))
                if terminate:
                    buckets.close_buckets()
                    return

            ngrams = word_ngrams(janitor.normalize_string(document), n_value)
            for ngram in ngrams:
                buckets.add_data(ngram, f"{ngram} {offset}")

            batch_counter += 1

    buckets.close_buckets()
    Path(done_file).touch()