in src/datatrove/pipeline/filters/base_filter.py [0:0]
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
with self.exclusion_writer if self.exclusion_writer else contextlib.nullcontext() as writer:
for batch in batched(data, self.batch_size):
if self.batch_size > 1:
self.stat_update("batches")
with self.track_time("batch" if self.batch_size > 1 else None):
batch_filter_result = self.filter_batch(batch)
for doc, doc_filter_result in zip(batch, batch_filter_result):
self.stat_update(StatHints.total)
filter_result, reason = get_filter_result(doc_filter_result)
if filter_result:
self.stat_update(StatHints.forwarded)
self.update_doc_stats(doc)
yield doc
else:
self.stat_update(StatHints.dropped)
if reason:
self.stat_update(f"dropped_{reason}")
if self.exclusion_writer:
if reason:
doc.metadata["filter_reason"] = reason
writer.write(doc, rank)