in src/datatrove/pipeline/tokens/tokenizer.py [0:0]
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
"""Main method to run the tokenization.
We first batch tokenize the documents and write them to a file.
Then we shuffle the documents and write them to a new file if self.shuffle is True (and remove the original file)
Args:
data: DocumentsPipeline
The data to be processed as a Generator typically created by a Reader initial pipeline step
rank: int
The rank of the process
world_size: int
The total number of processes
"""
unshuf_filename = get_output_filename(self.save_filename, rank, "unshuffled")
logger.info(f'Tokenizing in "{unshuf_filename}"...')
outputfile: TokenizedFile = self.write_unshuffled(data, unshuf_filename)
if len(outputfile) == 0:
logger.warning("No data saved.")
return
# document level shuffling
if self.shuffle_documents:
logger.info("Shuffling documents...")
# get new TokenizedFile, shuffling docs from original one
# if this is not the last chunking step, we want to: use local_working_dir, not apply max_tokens_per_file, save_index
new_file = outputfile.copy(
self.save_filename,
self.rand.permutation(len(outputfile.doc_ends)),
self.output_folder
if not self.local_working_dir or not self.shuffle_chunk_size
else self.local_working_dir,
max_tokens_per_file=self.max_tokens_per_file if self.shuffle_chunk_size is None else None,
rank=rank,
sub_filename="doc_shuffled",
save_index=self.save_index or self.shuffle_chunk_size,
)
# remove and replace original file
outputfile.cleanup()
outputfile = new_file
# chunk level shuffling
if self.shuffle_chunk_size is not None:
logger.info("Shuffling chunks...")
all_chunks_doc_ends = chunk_doc_ends(outputfile.doc_ends, self.shuffle_chunk_size)
# get new TokenizedFile, shuffling docs from original one
new_file = outputfile.copy(
self.save_filename,
self.rand.permutation(len(all_chunks_doc_ends)),
self.output_folder,
max_tokens_per_file=self.max_tokens_per_file,
rank=rank,
boundaries=all_chunks_doc_ends,
sub_filename="chunk_shuffled",
save_index=self.save_index,
)
# remove and replace original file
outputfile.cleanup()
outputfile = new_file