def get_duplicate_range()

in src/datatrove/pipeline/dedup/exact_substrings.py [0:0]


    def get_duplicate_range(self, bytes_len: int):
        """Ranges produced by deduplicate-text-dataset can fall in one of the following 4 categories

                   left    )  A   *    B    *       A --> *, idx <-- idx + 1
                   centre  )  *   A    B    *       idx <-- idx + 1
                   right   )  *   A    *    B       B --> *
                   outside )  A   *    *    B       A --> *, B --> *

        * is self.bytes_counter
        * is upper_limit =  self.bytes_counter + bytes_len

        """
        ranges = []
        upper_limit = self.bytes_counter + bytes_len + SEPARATOR_BYTES

        if self.exhausted_ranges:
            return ranges

        while True:
            a, b = self.dup_ranges[self.range_idx][0], self.dup_ranges[self.range_idx][1]

            left = a < self.bytes_counter and self.bytes_counter + SEPARATOR_BYTES < b <= upper_limit
            centre = self.bytes_counter <= a < b <= upper_limit
            right = self.bytes_counter <= a < upper_limit - SEPARATOR_BYTES and upper_limit < b
            outside = a < self.bytes_counter < upper_limit < b

            if not any([left, centre, right, outside]):
                break

            assert sum([left, centre, right, outside]) == 1, f"{left=}, {centre=}, {right=}, {outside=}"

            if left:
                self.range_idx += 1
                a = self.bytes_counter
            if centre:
                self.range_idx += 1
            if right:
                ranges.append(self.normalize_range(a, upper_limit, bytes_len))
                break
            if outside:
                ranges.append(self.normalize_range(self.bytes_counter, upper_limit, bytes_len))
                break

            ranges.append(self.normalize_range(a, b, bytes_len))

            if self.range_idx == len(self.dup_ranges):
                self.exhausted_ranges = True
                break

        return ranges