def save_hashes()

in src/datatrove/pipeline/dedup/url_dedup.py [0:0]


    def save_hashes(self, rank: int, signatures):
        sig_dtype = get_sig_dtype(self.config.hash_config)
        priority_max = np.iinfo(sig_dtype["priority"]).max

        # 0 will stay as is, so we can't use 0 as a priority
        assert all(sig[1] >= 1 and sig[1] <= priority_max for sig in signatures), (
            f"priority must be between 1 and {priority_max}"
        )
        signatures = np.array(signatures, dtype=sig_dtype)

        # Ensure that the highest priority is always first
        signatures["priority"] = -signatures["priority"]
        signatures.sort(axis=0)
        signatures["priority"] = -signatures["priority"]

        # Same code as in sentence_dedup
        hashes_per_worker = self.config.hash_config.max // self.finder_workers
        left_idx = 0
        for hash_i in range(self.finder_workers):
            with self.output_folder.open(
                f"{hash_i:04d}/{rank:05d}{ExtensionHelperSD.stage_1_signature}",
                mode="wb",
            ) as f:
                # last bucket needs to have everything
                right_hash = (
                    (hash_i + 1) * hashes_per_worker if hash_i != self.finder_workers - 1 else np.iinfo(np.uint64).max
                )
                # find last hash that goes in this bucket. This obeys the following rule:
                # signatures['hash'][right_idx - 1] <= right_hash <= signatures['hash'][right_idx]
                right_idx = left_idx + signatures["hash"][left_idx:].searchsorted(right_hash, side="right")
                # save to file
                if right_idx > left_idx:
                    bts = signatures[left_idx:right_idx].tobytes()
                    f.write(bts)
                left_idx = right_idx
                # we've reached the end of our data
                if right_idx >= len(signatures):
                    break