pipeline/translate/splitter.py (49 lines of code) (raw):
#!/usr/bin/env python3
"""
Splits a dataset to chunks. Generates files in format file.00.zst, file.01.zst etc.
Example:
python splitter.py \
--output_dir=test_data \
--num_parts=10 \
--output_suffix=.ref \
test_data/corpus.en.zst
"""
import argparse
import os
from contextlib import ExitStack
from typing import Optional
from pipeline.common.downloads import count_lines, read_lines, write_lines
from pipeline.common.logging import get_logger
logger = get_logger(__file__)
def split_file(mono_path: str, output_dir: str, num_parts: int, output_suffix: str = ""):
"""
Split a file into fixed number of chunks.
For instance with:
mono_path = "corpus.en.zst"
output_dir = "artifacts"
num_parts = 20
output_suffix = ".ref"
Outputs:
.
├── corpus.en.zst
└── artifacts
├── file.1.ref.zst
├── file.2.ref.zst
├── file.3.ref.zst
├── ...
└── file.20.ref.zst
"""
os.makedirs(output_dir, exist_ok=True)
total_lines = count_lines(mono_path)
lines_per_part = (total_lines + num_parts - 1) // num_parts
logger.info(f"Splitting {mono_path} to {num_parts} chunks x {total_lines:,} lines")
line_writer = None
line_count = 0
file_index = 1
with read_lines(mono_path) as lines:
with ExitStack() as chunk_stack:
for line in lines:
if not line_writer or line_count >= lines_per_part:
# The current file is full or doesn't exist, start a new one.
if line_writer:
chunk_stack.close()
chunk_name = f"{output_dir}/file.{file_index}{output_suffix}.zst"
logger.info(f"Writing to file chunk: {chunk_name}")
line_writer = chunk_stack.enter_context(write_lines(chunk_name))
file_index += 1
line_count = 0
line_writer.write(line)
line_count += 1
logger.info("Done writing to files.")
def main(args: Optional[list[str]] = None) -> None:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawTextHelpFormatter, # Preserves whitespace in the help text.
)
parser.add_argument("mono_path", type=str, help="Path to the compressed monolingual dataset")
parser.add_argument("--output_dir", type=str, help="Output directory to store split files")
parser.add_argument("--num_parts", type=int, help="Number of parts to split the file into")
parser.add_argument(
"--output_suffix", type=str, help="A suffix for output files, for example .ref", default=""
)
parsed_args = parser.parse_args(args)
split_file(
mono_path=parsed_args.mono_path,
output_dir=parsed_args.output_dir,
num_parts=parsed_args.num_parts,
output_suffix=parsed_args.output_suffix,
)
if __name__ == "__main__":
main()