def run()

in src/datatrove/pipeline/dedup/sentence_dedup.py [0:0]


    def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        """step method for Filters.
        Drops documents that if .filter() is False

        SentenceDedupFilter reads a DocumentPipeline and removes duplicated sentences found at stage 2
        """
        folders = self.data_folder.list_files(include_directories=True, recursive=False)
        # for performance reasons when having for instance 12k*10k files
        files = [
            f
            for f in [f"{folder}/{rank:05d}{ExtensionHelperSD.stage_2_duplicates}" for folder in folders]
            if self.data_folder.exists(f)
        ]

        logger.info(f"Loading duplicate indexes from {len(files)} results files.")

        all_dups = np.array([], dtype=[("doc", "<u4"), ("sent", "<u2")])
        if files:
            with ThreadPoolExecutor() as pool:
                all_dups = np.concatenate(
                    list(tqdm(pool.map(self.read_duplicates, self.data_folder.open_files(files)), total=len(files))),
                    axis=0,
                )
            all_dups.sort()
        _, doc_starts = np.unique(all_dups["doc"], return_index=True)

        logger.info("Loaded duplicate indexes.")

        dups_doc_i = 0
        with self.exclusion_writer if self.exclusion_writer else contextlib.nullcontext() as writer:
            for doc_idx, doc in enumerate(data):
                self.stat_update(StatHints.total)
                with self.stats.time_stats:
                    if dups_doc_i >= len(doc_starts) or all_dups["doc"][doc_starts[dups_doc_i]] > doc_idx:
                        filtered_text, original_formatted = doc.text, None
                    else:
                        sents_span_l, sents_span_r = (
                            doc_starts[dups_doc_i],
                            doc_starts[dups_doc_i + 1] if dups_doc_i + 1 < len(doc_starts) else None,
                        )
                        filtered_text, original_formatted = self.remove_dup_sentences(
                            doc, all_dups["sent"][sents_span_l:sents_span_r]
                        )
                        dups_doc_i += 1

                if (
                    (
                        filtered_text == doc.text  # no change
                        or (
                            (
                                # min doc words
                                self.config.min_doc_words <= 0
                                or len(self.tokenizer.word_tokenize(filtered_text)) >= self.config.min_doc_words
                            )
                            and (
                                # min num sentences
                                self.config.min_num_sentences <= 0
                                or len(split_into_sentences(filtered_text, self.language))
                                >= self.config.min_num_sentences
                            )
                        )
                    )
                    and filtered_text  # can not be completely empty
                ):  # document is kept
                    self.update_doc_stats(doc)
                    if not filtered_text == doc.text and writer:
                        writer.write(dataclasses.replace(doc, text=original_formatted), rank=rank)
                    doc.text = filtered_text
                    yield doc
                elif writer:
                    doc.text = original_formatted
                    writer.write(doc, rank=rank)