in src/datatrove/pipeline/dedup/exact_substrings.py [0:0]
def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
self.reset()
self.rank = rank
# loads the sequence file from stage 1, the size file from stage 1 and the bytearange file.
sequence_file, size_file = self.get_all_files(rank=self.rank, world_size=world_size)
if not self.dup_ranges:
return
# data is still useful for the metadata lost in the sequence format.
for doc, doc_content in zip(
data,
sequence_reader(
self.sequence_folder.open(sequence_file, "rb"), self.sequence_folder.open(size_file, "rb")
),
):
with self.stats.time_stats:
# We check that the two generators are synced, meaning the docs sizes bytes are correct.
assert doc.text == self.tokenizer.decode(read_bytes(doc_content), skip_special_tokens=False), (
f"{doc.text}\n\n{self.tokenizer.decode(read_bytes(doc_content))}"
)
to_yield = self.remove_duplicate(doc, doc_content)
if to_yield:
self.update_doc_stats(doc)
yield doc
# we check bytes counter matches with the offset of the following rank
assert self.bytes_counter == self.sequence_bytes_offset[rank + 1] - self.sequence_bytes_offset[rank], (
f"got {self.bytes_counter=}, expected = "
f"{self.sequence_bytes_offset[rank + 1] - self.sequence_bytes_offset[rank]}"
)
assert self.exhausted_ranges, "One or more duplicate ranges have not been used"