in src/datatrove/pipeline/dedup/sentence_dedup.py [0:0]
def save_hashes(self, rank: int, signatures):
# explicitly define little endianness
signatures = np.array(
signatures, dtype=[("hash", self.config.hash_config.np_descr), ("doc", "<u4"), ("sent", "<u2")]
)
signatures.sort(axis=0)
hashes_per_worker = self.config.hash_config.max // self.finder_workers
left_idx = 0
for hash_i in range(self.finder_workers):
with self.output_folder.open(
f"{hash_i:04d}/{rank:05d}{ExtensionHelperSD.stage_1_signature}", mode="wb"
) as f:
# last bucket needs to have everything
right_hash = (
(hash_i + 1) * hashes_per_worker
if hash_i != self.finder_workers - 1
else self.config.hash_config.max
)
# find last hash that goes in this bucket. This obeys the following rule:
# signatures['hash'][right_idx - 1] <= right_hash <= signatures['hash'][right_idx]
right_idx = left_idx + signatures["hash"][left_idx:].searchsorted(right_hash, side="right")
# save to file
if right_idx > left_idx:
if self.output_folder.is_local():
signatures[left_idx:right_idx].tofile(f)
else:
f.write(signatures[left_idx:right_idx].tobytes())
left_idx = right_idx
# we've reached the end of our data
if right_idx >= len(signatures):
break