def run()

in src/datatrove/pipeline/tokens/tokenizer.py [0:0]


    def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        """Main method to run the tokenization.
            We first batch tokenize the documents and write them to a file.
            Then we shuffle the documents and write them to a new file if self.shuffle is True (and remove the original file)

        Args:
            data: DocumentsPipeline
                The data to be processed as a Generator typically created by a Reader initial pipeline step
            rank: int
                The rank of the process
            world_size: int
                The total number of processes
        """
        unshuf_filename = get_output_filename(self.save_filename, rank, "unshuffled")
        logger.info(f'Tokenizing in "{unshuf_filename}"...')
        outputfile: TokenizedFile = self.write_unshuffled(data, unshuf_filename)
        if len(outputfile) == 0:
            logger.warning("No data saved.")
            return

        # document level shuffling
        if self.shuffle_documents:
            logger.info("Shuffling documents...")
            # get new TokenizedFile, shuffling docs from original one
            # if this is not the last chunking step, we want to: use local_working_dir, not apply max_tokens_per_file, save_index
            new_file = outputfile.copy(
                self.save_filename,
                self.rand.permutation(len(outputfile.doc_ends)),
                self.output_folder
                if not self.local_working_dir or not self.shuffle_chunk_size
                else self.local_working_dir,
                max_tokens_per_file=self.max_tokens_per_file if self.shuffle_chunk_size is None else None,
                rank=rank,
                sub_filename="doc_shuffled",
                save_index=self.save_index or self.shuffle_chunk_size,
            )
            # remove and replace original file
            outputfile.cleanup()
            outputfile = new_file

        # chunk level shuffling
        if self.shuffle_chunk_size is not None:
            logger.info("Shuffling chunks...")

            all_chunks_doc_ends = chunk_doc_ends(outputfile.doc_ends, self.shuffle_chunk_size)
            # get new TokenizedFile, shuffling docs from original one
            new_file = outputfile.copy(
                self.save_filename,
                self.rand.permutation(len(all_chunks_doc_ends)),
                self.output_folder,
                max_tokens_per_file=self.max_tokens_per_file,
                rank=rank,
                boundaries=all_chunks_doc_ends,
                sub_filename="chunk_shuffled",
                save_index=self.save_index,
            )
            # remove and replace original file
            outputfile.cleanup()
            outputfile = new_file