def run()

in src/datatrove/pipeline/dedup/exact_substrings.py [0:0]


    def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        self.reset()
        self.rank = rank
        # loads the sequence file from stage 1, the size file from stage 1 and the bytearange file.
        sequence_file, size_file = self.get_all_files(rank=self.rank, world_size=world_size)
        if not self.dup_ranges:
            return
        # data is still useful for the metadata lost in the sequence format.
        for doc, doc_content in zip(
            data,
            sequence_reader(
                self.sequence_folder.open(sequence_file, "rb"), self.sequence_folder.open(size_file, "rb")
            ),
        ):
            with self.stats.time_stats:
                # We check that the two generators are synced, meaning the docs sizes bytes are correct.
                assert doc.text == self.tokenizer.decode(read_bytes(doc_content), skip_special_tokens=False), (
                    f"{doc.text}\n\n{self.tokenizer.decode(read_bytes(doc_content))}"
                )
                to_yield = self.remove_duplicate(doc, doc_content)
            if to_yield:
                self.update_doc_stats(doc)
                yield doc

        # we check bytes counter matches with the offset of the following rank
        assert self.bytes_counter == self.sequence_bytes_offset[rank + 1] - self.sequence_bytes_offset[rank], (
            f"got {self.bytes_counter=}, expected = "
            f"{self.sequence_bytes_offset[rank + 1] - self.sequence_bytes_offset[rank]}"
        )
        assert self.exhausted_ranges, "One or more duplicate ranges have not been used"