in scripts/clean_training_data/generate_13_grams.py [0:0]
def do_ngrams_in_buckets(n_value, working_directory, bucket_count):
pile_statistics = json.load(open("pile_statistics.json", "r", encoding="utf-8"))
pile_document_count = pile_statistics["Document Count"]
start_offsets = pile_statistics["File Start Offsets"]
output_directory = os.path.join(working_directory, "output")
os.makedirs(output_directory, exist_ok=True)
logger.info(f"Generating {n_value}-grams and bucketing.")
# Done file
done_file = os.path.join(output_directory, "ngram_buckets.done")
if os.path.exists(done_file):
logger.info("ngrams already generated and bucketed, skipping")
return
# Checkpoint
checkpoint_file = os.path.join(working_directory, "pile_offset.ckpt")
if os.path.exists(checkpoint_file):
checkpoint_offset = pickle.load(open(checkpoint_file, "rb"))
iterate = True
else:
checkpoint_offset = 0
iterate = False
logger.info(f"Starting at pile document index {checkpoint_offset}")
buckets = Buckets(output_directory, bucket_count)
janitor = Janitor()
batch_size = 1000
batch_counter = 0
with tqdm(total=checkpoint_offset, dynamic_ncols=True, unit="docs") as progress:
for offset, document in yield_pile(start_offsets, checkpoint_offset):
if iterate:
logger.info(f"Iterating to offset {checkpoint_offset} from {offset}")
progress.update(offset)
iterate = False
if offset < checkpoint_offset:
progress.update()
if terminate:
return
continue
if offset == checkpoint_offset:
progress.reset(total=pile_document_count)
progress.update(checkpoint_offset)
# Save checkpoint every "batch_size", only allow terminate after checkpoint
if batch_counter == batch_size:
progress.update(batch_size)
batch_counter = 0
buckets.save_checkpoint()
pickle.dump(offset, open(checkpoint_file, "wb"))
if terminate:
buckets.close_buckets()
return
ngrams = word_ngrams(janitor.normalize_string(document), n_value)
for ngram in ngrams:
buckets.add_data(ngram, f"{ngram} {offset}")
batch_counter += 1
buckets.close_buckets()
Path(done_file).touch()