def run()

in src/datatrove/pipeline/filters/base_filter.py [0:0]


    def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        with self.exclusion_writer if self.exclusion_writer else contextlib.nullcontext() as writer:
            for batch in batched(data, self.batch_size):
                if self.batch_size > 1:
                    self.stat_update("batches")
                with self.track_time("batch" if self.batch_size > 1 else None):
                    batch_filter_result = self.filter_batch(batch)
                for doc, doc_filter_result in zip(batch, batch_filter_result):
                    self.stat_update(StatHints.total)
                    filter_result, reason = get_filter_result(doc_filter_result)
                    if filter_result:
                        self.stat_update(StatHints.forwarded)
                        self.update_doc_stats(doc)
                        yield doc
                    else:
                        self.stat_update(StatHints.dropped)
                        if reason:
                            self.stat_update(f"dropped_{reason}")
                        if self.exclusion_writer:
                            if reason:
                                doc.metadata["filter_reason"] = reason
                            writer.write(doc, rank)