in src/datatrove/pipeline/dedup/minhash.py [0:0]
def check_can_skip_sig_writing(self, rank):
if not self.skip_existing_sigs:
return False
# check if the files exist
if any(
not self.output_folder.exists(f"bucket_{bi:03d}/{rank:05d}.minhash.sig")
for bi in range(self.config.num_buckets)
):
return False
# check if they all have the same size (same nb of docs)
fsizes = [
self.output_folder.size(f"bucket_{bi:03d}/{rank:05d}.minhash.sig") for bi in range(self.config.num_buckets)
]
if any(fsize != fsizes[0] for fsize in fsizes):
return False
# check if they aren't empty and if they have a multiple of a full sig
sig_doc_size = struct.calcsize(f"<{self.config.hashes_per_bucket}{self.config.hash_config.struct_format}I")
if fsizes[0] == 0 or fsizes[0] % sig_doc_size != 0:
return False
logger.info(f"Found existing sig files with {fsizes[0] // sig_doc_size} entries. Skipping sig writing step.")
return True