in src/datatrove/pipeline/dedup/minhash.py [0:0]
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1):
if not self.data_folder.isfile(f"{rank:06d}.remove"):
logger.warning(f"No .remove file for {rank=}.")
for doc in data:
self.stat_update(StatHints.total, StatHints.forwarded)
yield doc
return
# additional metadata files
# cluster ids
if self.load_cluster_ids and not self.data_folder.exists(f"{rank:06d}.clusters"):
logger.warning(f"No .clusters file for {rank=}.")
raise FileNotFoundError
# cluster sizes
if self.load_cluster_sizes and not self.data_folder.exists(f"{rank:06d}.sizes"):
logger.warning(f"No .sizes file for {rank=}.")
raise FileNotFoundError
with self.data_folder.open(f"{rank:06d}.remove", "rb") as f:
with self.exclusion_writer if self.exclusion_writer else contextlib.nullcontext() as exc_writer:
def get_next():
data = f.read(struct.calcsize("I"))
if data:
return struct.unpack("<I", data)[0]
def metadata_loader(file):
with self.data_folder.open(file, "rb") as metadata_f:
yield from read_tuples_from_file(metadata_f, "2I", lines_to_buffer=self.lines_to_buffer)
if self.load_cluster_ids:
cluster_loader = metadata_loader(f"{rank:06d}.clusters")
next_cluster = next(cluster_loader, None)
if self.load_cluster_sizes:
sizes_loader = metadata_loader(f"{rank:06d}.sizes")
next_size = next(sizes_loader, None)
next_removal = get_next()
for idx, doc in enumerate(data):
with self.track_time():
# load and save metadata
if self.load_cluster_ids:
if next_cluster and idx == next_cluster[0]:
doc.metadata["minhash_cluster_id"] = next_cluster[1]
next_cluster = next(cluster_loader, None)
else:
doc.metadata["minhash_cluster_id"] = -1
if self.load_cluster_sizes:
if next_size and idx == next_size[0]:
doc.metadata["minhash_cluster_size"] = next_size[1]
next_size = next(sizes_loader, None)
else:
doc.metadata["minhash_cluster_size"] = 1
self.stat_update(StatHints.total)
if next_removal == idx:
# to remove
self.stat_update(StatHints.dropped)
if self.exclusion_writer:
exc_writer.write(doc, rank)
next_removal = get_next()
continue
self.stat_update(StatHints.forwarded)
yield doc