def run()

in src/datatrove/pipeline/dedup/minhash.py [0:0]


    def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1):
        if not self.data_folder.isfile(f"{rank:06d}.remove"):
            logger.warning(f"No .remove file for {rank=}.")
            for doc in data:
                self.stat_update(StatHints.total, StatHints.forwarded)
                yield doc
            return

        # additional metadata files
        # cluster ids
        if self.load_cluster_ids and not self.data_folder.exists(f"{rank:06d}.clusters"):
            logger.warning(f"No .clusters file for {rank=}.")
            raise FileNotFoundError
        # cluster sizes
        if self.load_cluster_sizes and not self.data_folder.exists(f"{rank:06d}.sizes"):
            logger.warning(f"No .sizes file for {rank=}.")
            raise FileNotFoundError

        with self.data_folder.open(f"{rank:06d}.remove", "rb") as f:
            with self.exclusion_writer if self.exclusion_writer else contextlib.nullcontext() as exc_writer:

                def get_next():
                    data = f.read(struct.calcsize("I"))
                    if data:
                        return struct.unpack("<I", data)[0]

                def metadata_loader(file):
                    with self.data_folder.open(file, "rb") as metadata_f:
                        yield from read_tuples_from_file(metadata_f, "2I", lines_to_buffer=self.lines_to_buffer)

                if self.load_cluster_ids:
                    cluster_loader = metadata_loader(f"{rank:06d}.clusters")
                    next_cluster = next(cluster_loader, None)

                if self.load_cluster_sizes:
                    sizes_loader = metadata_loader(f"{rank:06d}.sizes")
                    next_size = next(sizes_loader, None)

                next_removal = get_next()
                for idx, doc in enumerate(data):
                    with self.track_time():
                        # load and save metadata
                        if self.load_cluster_ids:
                            if next_cluster and idx == next_cluster[0]:
                                doc.metadata["minhash_cluster_id"] = next_cluster[1]
                                next_cluster = next(cluster_loader, None)
                            else:
                                doc.metadata["minhash_cluster_id"] = -1

                        if self.load_cluster_sizes:
                            if next_size and idx == next_size[0]:
                                doc.metadata["minhash_cluster_size"] = next_size[1]
                                next_size = next(sizes_loader, None)
                            else:
                                doc.metadata["minhash_cluster_size"] = 1

                        self.stat_update(StatHints.total)
                        if next_removal == idx:
                            # to remove
                            self.stat_update(StatHints.dropped)
                            if self.exclusion_writer:
                                exc_writer.write(doc, rank)
                            next_removal = get_next()
                            continue
                        self.stat_update(StatHints.forwarded)
                    yield doc