in src/datatrove/pipeline/dedup/minhash.py [0:0]
def run(self, data: DocumentsPipeline = None, bucket: int = 0, world_size: int = 1):
assert data is None, "You should not use an input block before MinhashBuildIndex"
assert world_size == self.config.num_buckets, "You must run exactly one task per bucket"
sig_files = self.input_folder.list_files(subdirectory=f"bucket_{bucket:03d}")
sig_readers = [
read_sigs(file, file_i, self.config, lines_to_buffer=self.lines_to_buffer)
for file_i, file in enumerate(self.input_folder.open_files(sig_files, mode="rb"))
]
pq = [x for x in [next(sig_reader, None) for sig_reader in sig_readers] if x is not None]
heapq.heapify(pq)
logger.info("Finished initializing signatures priority queue.")
# writes all the sigs for the entire bucket, sequentially
out_f = self.output_folder.open(f"bucket_{bucket:03d}/{self.index_name}.minhash.index", mode="wb")
last: HashSig | None = None
with self.track_time():
while pq:
v: HashSig = heapq.heappop(pq)
if not last or last.sig != v.sig:
out_f.write(
struct.pack(
f"<%d{self.config.hash_config.struct_format}" % self.config.hashes_per_bucket, *v.sig
)
)
last = v
next_sig = next(sig_readers[v.file_id], None)
if next_sig:
heapq.heappush(pq, next_sig)
out_f.close()