def run()

in src/datatrove/pipeline/dedup/minhash.py [0:0]


    def run(self, data: DocumentsPipeline = None, _: int = 0, world_size: int = 1):
        dup_files = self.input_folder.list_files(glob_pattern="*.dups")
        assert (len(dup_files) % self.config.num_buckets) == 0, (
            "Number of .dups files should be divisible by number of buckets"
        )
        assert world_size == 1, "World size must be 1 for clustering"
        union_set = {}
        set_size = {}

        def parent(x):
            if x not in union_set or union_set[x] == x:
                union_set[x] = x
                return x
            # Path Compression
            union_set[x] = parent(union_set[x])
            return union_set[x]

        def union(v_a, v_b):
            root_a = parent(v_a)
            root_b = parent(v_b)

            if root_a != root_b:
                # Union by size
                size_a = set_size.get(root_a, 1)
                size_b = set_size.get(root_b, 1)
                if root_b == (SENTINEL, SENTINEL) or (root_a != (SENTINEL, SENTINEL) and size_a < size_b):
                    root_a, root_b = root_b, root_a
                # a is SENTINEL or #a >= #b
                union_set[root_b] = root_a  # make the smallest one join the biggest one to keep sets shallow
                set_size[root_a] = size_a + size_b
                set_size.pop(root_b, None)  # clear up space

        with self.track_time():
            logger.info("Loading dup files...")
            for dup_file in tqdm(dup_files, desc="Reading dup files"):
                with self.input_folder.open(dup_file, "rb") as dupf:
                    for f1, d1, f2, d2 in read_tuples_from_file(dupf, "4I", lines_to_buffer=self.lines_to_buffer):
                        a, b = (f1, d1), (f2, d2)
                        if self.ignore_index_matches and a == (SENTINEL, SENTINEL):
                            # if we are skipping matches with the index and "a" is from the index
                            continue
                        union(a, b)

            logger.info("Finished reading dup files.")
            ci = 0
            cluster_ids = {}
            with self.output_folder.get_output_file_manager(mode="wb") as output_mg:
                for node in sorted(union_set):
                    self.stat_update("duplicates")
                    file, doc = node
                    p = parent(node)
                    if node != p:
                        output_mg.write(f"{file:06d}.remove", struct.pack("<I", doc))
                        self.stat_update("to_remove")

                    # additional metadata
                    if self.save_cluster_id:
                        if p not in cluster_ids:
                            cluster_ids[p] = ci
                            ci += 1
                            self.stat_update("clusters")
                        output_mg.write(f"{file:06d}.clusters", struct.pack("<I", doc))
                        output_mg.write(f"{file:06d}.clusters", struct.pack("<I", cluster_ids[p]))
                    if node == p:
                        # only run once per cluster
                        self.stat_update("cluster_size", value=set_size[p], unit="cluster")
                    if self.save_cluster_size:
                        output_mg.write(f"{file:06d}.sizes", struct.pack("<I", doc))
                        output_mg.write(f"{file:06d}.sizes", struct.pack("<I", set_size[p]))