in src/datatrove/pipeline/dedup/exact_substrings.py [0:0]
def get_duplicate_range(self, bytes_len: int):
"""Ranges produced by deduplicate-text-dataset can fall in one of the following 4 categories
left ) A * B * A --> *, idx <-- idx + 1
centre ) * A B * idx <-- idx + 1
right ) * A * B B --> *
outside ) A * * B A --> *, B --> *
* is self.bytes_counter
* is upper_limit = self.bytes_counter + bytes_len
"""
ranges = []
upper_limit = self.bytes_counter + bytes_len + SEPARATOR_BYTES
if self.exhausted_ranges:
return ranges
while True:
a, b = self.dup_ranges[self.range_idx][0], self.dup_ranges[self.range_idx][1]
left = a < self.bytes_counter and self.bytes_counter + SEPARATOR_BYTES < b <= upper_limit
centre = self.bytes_counter <= a < b <= upper_limit
right = self.bytes_counter <= a < upper_limit - SEPARATOR_BYTES and upper_limit < b
outside = a < self.bytes_counter < upper_limit < b
if not any([left, centre, right, outside]):
break
assert sum([left, centre, right, outside]) == 1, f"{left=}, {centre=}, {right=}, {outside=}"
if left:
self.range_idx += 1
a = self.bytes_counter
if centre:
self.range_idx += 1
if right:
ranges.append(self.normalize_range(a, upper_limit, bytes_len))
break
if outside:
ranges.append(self.normalize_range(self.bytes_counter, upper_limit, bytes_len))
break
ranges.append(self.normalize_range(a, b, bytes_len))
if self.range_idx == len(self.dup_ranges):
self.exhausted_ranges = True
break
return ranges