in src/datatrove/pipeline/dedup/url_dedup.py [0:0]
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1):
folders = self.data_folder.list_files(include_directories=True, recursive=False)
# for performance reasons when having for instance 12k*10k files
files = [
f
for f in [f"{folder}/{rank:05d}{ExtensionHelperSD.stage_2_duplicates}" for folder in folders]
if self.data_folder.exists(f)
]
logger.info(f"Loading duplicate indexes from {len(files)} results files.")
dup_dtype = get_sig_dtype(self.config.hash_config)[2]
all_dups = np.array([], dtype=dup_dtype)
if files:
with ThreadPoolExecutor() as pool:
read_partial = partial(self.read_duplicates, dup_dtype=dup_dtype)
all_dups = np.concatenate(
list(
tqdm(
pool.map(read_partial, self.data_folder.open_files(files)),
total=len(files),
)
),
axis=0,
)
all_dups.sort()
logger.info("Loaded duplicate indexes.")
dups_doc_i = 0
with self.exclusion_writer if self.exclusion_writer else contextlib.nullcontext() as writer:
with self.stats.time_stats:
for doc_idx, doc in enumerate(data):
self.stat_update(StatHints.total)
with self.stats.time_stats:
if dups_doc_i < all_dups.shape[0] and all_dups[dups_doc_i] == doc_idx:
if writer:
writer.write(doc, rank=rank)
self.stat_update(StatHints.dropped)
dups_doc_i += 1
else:
self.stat_update(StatHints.forwarded)
self.update_doc_stats(doc)
yield doc