in src/datatrove/pipeline/dedup/minhash.py [0:0]
def run(self, data: DocumentsPipeline = None, _: int = 0, world_size: int = 1):
dup_files = self.input_folder.list_files(glob_pattern="*.dups")
assert (len(dup_files) % self.config.num_buckets) == 0, (
"Number of .dups files should be divisible by number of buckets"
)
assert world_size == 1, "World size must be 1 for clustering"
union_set = {}
set_size = {}
def parent(x):
if x not in union_set or union_set[x] == x:
union_set[x] = x
return x
# Path Compression
union_set[x] = parent(union_set[x])
return union_set[x]
def union(v_a, v_b):
root_a = parent(v_a)
root_b = parent(v_b)
if root_a != root_b:
# Union by size
size_a = set_size.get(root_a, 1)
size_b = set_size.get(root_b, 1)
if root_b == (SENTINEL, SENTINEL) or (root_a != (SENTINEL, SENTINEL) and size_a < size_b):
root_a, root_b = root_b, root_a
# a is SENTINEL or #a >= #b
union_set[root_b] = root_a # make the smallest one join the biggest one to keep sets shallow
set_size[root_a] = size_a + size_b
set_size.pop(root_b, None) # clear up space
with self.track_time():
logger.info("Loading dup files...")
for dup_file in tqdm(dup_files, desc="Reading dup files"):
with self.input_folder.open(dup_file, "rb") as dupf:
for f1, d1, f2, d2 in read_tuples_from_file(dupf, "4I", lines_to_buffer=self.lines_to_buffer):
a, b = (f1, d1), (f2, d2)
if self.ignore_index_matches and a == (SENTINEL, SENTINEL):
# if we are skipping matches with the index and "a" is from the index
continue
union(a, b)
logger.info("Finished reading dup files.")
ci = 0
cluster_ids = {}
with self.output_folder.get_output_file_manager(mode="wb") as output_mg:
for node in sorted(union_set):
self.stat_update("duplicates")
file, doc = node
p = parent(node)
if node != p:
output_mg.write(f"{file:06d}.remove", struct.pack("<I", doc))
self.stat_update("to_remove")
# additional metadata
if self.save_cluster_id:
if p not in cluster_ids:
cluster_ids[p] = ci
ci += 1
self.stat_update("clusters")
output_mg.write(f"{file:06d}.clusters", struct.pack("<I", doc))
output_mg.write(f"{file:06d}.clusters", struct.pack("<I", cluster_ids[p]))
if node == p:
# only run once per cluster
self.stat_update("cluster_size", value=set_size[p], unit="cluster")
if self.save_cluster_size:
output_mg.write(f"{file:06d}.sizes", struct.pack("<I", doc))
output_mg.write(f"{file:06d}.sizes", struct.pack("<I", set_size[p]))