def run()

in src/datatrove/pipeline/tokens/merger.py [0:0]


    def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        """Main method to run the merging of files.
            The world_size must be 1 for this pipeline step merging the results of the previous parallel step.

        Args:
            data: DocumentsPipeline
                The data to be processed as a Generator typically created by a Reader initial pipeline step
            rank: int
                The rank of the process
            world_size: int
                The total number of processes
        """
        assert world_size == 1, "world_size must be 1 for DocumentTokenizerMerger"
        datafiles = self.input_folder.list_files(glob_pattern="*.ds")
        datafiles_index = self.input_folder.list_files(glob_pattern="*.ds.index")
        datafiles_loss = (
            self.input_folder.list_files(glob_pattern="*.ds.loss")
            if self.save_loss_metadata
            else ([None] * len(datafiles))
        )
        assert len(datafiles) == len(datafiles_index) == len(datafiles_loss), (
            f"Mismatch between number of .ds, "
            ".ds.index and/or .ds.loss files"
            f"({len(datafiles)} vs {len(datafiles_index)} vs {len(datafiles_loss)})"
        )

        tokenizer_name_or_path, token_size = None, 2
        if self.save_final_metadata:
            if self.input_folder.isfile(f"{datafiles[0]}.metadata"):
                with self.input_folder.open(f"{datafiles[0]}.metadata", "rt") as f:
                    tokenizer_name_or_path = f.read().splitlines()[0]
                    if "|" in tokenizer_name_or_path:
                        tokenizer_name_or_path, token_size = tokenizer_name_or_path.split("|")
                        token_size = int(token_size)

        # we handle doc ends differently if we are just shuffling documents or shuffling chunks with mixes of documents
        doc_ends = [load_doc_ends(self.input_folder.open(file, "rb")) for file in datafiles_index]
        boundaries_to_load = doc_ends
        windows_with_doc_ends = doc_ends
        doc_ends_to_write = [iter([]) for _ in range(len(doc_ends))]
        if self.shuffle_chunk_size:
            boundaries_to_load = [
                [(i + 1) * self.shuffle_chunk_size for i in range(file_doc_ends[-1] // self.shuffle_chunk_size)]
                if len(file_doc_ends) > 0
                else []
                for file_doc_ends in doc_ends
            ]
            windows_with_doc_ends = [
                chunk_doc_ends(file_doc_ends, self.shuffle_chunk_size) for file_doc_ends in doc_ends
            ]
            doc_ends_to_write = [
                (
                    [b - windowi * self.shuffle_chunk_size for b in window]
                    for windowi, window in enumerate(file_windows)
                )
                for file_windows in windows_with_doc_ends
            ]

        # dataloaders
        token_inputs = list(
            map(
                partial(get_data_reader, nb_bytes=token_size),
                self.input_folder.open_files(datafiles),
                boundaries_to_load,
            )
        )
        loss_inputs = (
            list(
                map(
                    partial(get_data_reader, nb_bytes=1),
                    self.input_folder.open_files(datafiles_loss),
                    boundaries_to_load,
                )
            )
            if self.save_loss_metadata
            else None
        )

        ordering = self.get_ordering(windows_with_doc_ends)

        file_ct = 0
        output_file = TokenizedFile(
            output_folder=self.output_folder,
            filename=f"{file_ct:03d}_{self.save_filename}.ds",
            save_loss_metadata=self.save_loss_metadata,
            upload_block_size=self.upload_block_size,
            tokenizer_name_or_path=tokenizer_name_or_path,
            save_final_metadata=self.save_final_metadata,
            token_size=token_size,
        )
        for input_file_id in tqdm(
            ordering, desc="Merging documents", unit="documents", total=len(ordering), disable=not self.progress
        ):
            if 0 < self.max_tokens <= self.stats["tokens"].total:
                break
            if 0 < self.max_tokens_per_file <= len(output_file):
                output_file.close()
                file_ct += 1
                output_file = TokenizedFile(
                    output_folder=self.output_folder,
                    filename=f"{file_ct:03d}_{self.save_filename}.ds",
                    save_loss_metadata=self.save_loss_metadata,
                    upload_block_size=self.upload_block_size,
                    tokenizer_name_or_path=tokenizer_name_or_path,
                    save_final_metadata=self.save_final_metadata,
                    token_size=token_size,
                )
            # copy tokens and loss
            tokens = next(token_inputs[input_file_id])
            output_file.write_bytes(tokens, next(doc_ends_to_write[input_file_id], None))
            if loss_inputs:
                output_file.write_loss_bytes(next(loss_inputs[input_file_id]))
            self.stat_update("tokens", value=len(tokens) // token_size)
        # cleanup
        output_file.close()
        if self.save_final_metadata:
            # save final total metadata file
            output_file.write_final_metadata(self.stats["tokens"].total, filename=f"{self.save_filename}.ds")