in src/datatrove/pipeline/dedup/sentence_dedup.py [0:0]
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
"""step method for Filters.
Drops documents that if .filter() is False
SentenceDedupFilter reads a DocumentPipeline and removes duplicated sentences found at stage 2
"""
folders = self.data_folder.list_files(include_directories=True, recursive=False)
# for performance reasons when having for instance 12k*10k files
files = [
f
for f in [f"{folder}/{rank:05d}{ExtensionHelperSD.stage_2_duplicates}" for folder in folders]
if self.data_folder.exists(f)
]
logger.info(f"Loading duplicate indexes from {len(files)} results files.")
all_dups = np.array([], dtype=[("doc", "<u4"), ("sent", "<u2")])
if files:
with ThreadPoolExecutor() as pool:
all_dups = np.concatenate(
list(tqdm(pool.map(self.read_duplicates, self.data_folder.open_files(files)), total=len(files))),
axis=0,
)
all_dups.sort()
_, doc_starts = np.unique(all_dups["doc"], return_index=True)
logger.info("Loaded duplicate indexes.")
dups_doc_i = 0
with self.exclusion_writer if self.exclusion_writer else contextlib.nullcontext() as writer:
for doc_idx, doc in enumerate(data):
self.stat_update(StatHints.total)
with self.stats.time_stats:
if dups_doc_i >= len(doc_starts) or all_dups["doc"][doc_starts[dups_doc_i]] > doc_idx:
filtered_text, original_formatted = doc.text, None
else:
sents_span_l, sents_span_r = (
doc_starts[dups_doc_i],
doc_starts[dups_doc_i + 1] if dups_doc_i + 1 < len(doc_starts) else None,
)
filtered_text, original_formatted = self.remove_dup_sentences(
doc, all_dups["sent"][sents_span_l:sents_span_r]
)
dups_doc_i += 1
if (
(
filtered_text == doc.text # no change
or (
(
# min doc words
self.config.min_doc_words <= 0
or len(self.tokenizer.word_tokenize(filtered_text)) >= self.config.min_doc_words
)
and (
# min num sentences
self.config.min_num_sentences <= 0
or len(split_into_sentences(filtered_text, self.language))
>= self.config.min_num_sentences
)
)
)
and filtered_text # can not be completely empty
): # document is kept
self.update_doc_stats(doc)
if not filtered_text == doc.text and writer:
writer.write(dataclasses.replace(doc, text=original_formatted), rank=rank)
doc.text = filtered_text
yield doc
elif writer:
doc.text = original_formatted
writer.write(doc, rank=rank)