def run()

in src/datatrove/pipeline/dedup/minhash.py [0:0]


    def run(self, data: DocumentsPipeline = None, bucket: int = 0, world_size: int = 1):
        assert data is None, "You should not use an input block before MinhashBuildIndex"
        assert world_size == self.config.num_buckets, "You must run exactly one task per bucket"
        sig_files = self.input_folder.list_files(subdirectory=f"bucket_{bucket:03d}")
        sig_readers = [
            read_sigs(file, file_i, self.config, lines_to_buffer=self.lines_to_buffer)
            for file_i, file in enumerate(self.input_folder.open_files(sig_files, mode="rb"))
        ]

        pq = [x for x in [next(sig_reader, None) for sig_reader in sig_readers] if x is not None]
        heapq.heapify(pq)
        logger.info("Finished initializing signatures priority queue.")

        # writes all the sigs for the entire bucket, sequentially
        out_f = self.output_folder.open(f"bucket_{bucket:03d}/{self.index_name}.minhash.index", mode="wb")

        last: HashSig | None = None
        with self.track_time():
            while pq:
                v: HashSig = heapq.heappop(pq)
                if not last or last.sig != v.sig:
                    out_f.write(
                        struct.pack(
                            f"<%d{self.config.hash_config.struct_format}" % self.config.hashes_per_bucket, *v.sig
                        )
                    )
                last = v
                next_sig = next(sig_readers[v.file_id], None)
                if next_sig:
                    heapq.heappush(pq, next_sig)
        out_f.close()