in src/datatrove/pipeline/dedup/url_dedup.py [0:0]
def save_hashes(self, rank: int, signatures):
sig_dtype = get_sig_dtype(self.config.hash_config)
priority_max = np.iinfo(sig_dtype["priority"]).max
# 0 will stay as is, so we can't use 0 as a priority
assert all(sig[1] >= 1 and sig[1] <= priority_max for sig in signatures), (
f"priority must be between 1 and {priority_max}"
)
signatures = np.array(signatures, dtype=sig_dtype)
# Ensure that the highest priority is always first
signatures["priority"] = -signatures["priority"]
signatures.sort(axis=0)
signatures["priority"] = -signatures["priority"]
# Same code as in sentence_dedup
hashes_per_worker = self.config.hash_config.max // self.finder_workers
left_idx = 0
for hash_i in range(self.finder_workers):
with self.output_folder.open(
f"{hash_i:04d}/{rank:05d}{ExtensionHelperSD.stage_1_signature}",
mode="wb",
) as f:
# last bucket needs to have everything
right_hash = (
(hash_i + 1) * hashes_per_worker if hash_i != self.finder_workers - 1 else np.iinfo(np.uint64).max
)
# find last hash that goes in this bucket. This obeys the following rule:
# signatures['hash'][right_idx - 1] <= right_hash <= signatures['hash'][right_idx]
right_idx = left_idx + signatures["hash"][left_idx:].searchsorted(right_hash, side="right")
# save to file
if right_idx > left_idx:
bts = signatures[left_idx:right_idx].tobytes()
f.write(bts)
left_idx = right_idx
# we've reached the end of our data
if right_idx >= len(signatures):
break