in src/datatrove/pipeline/dedup/sentence_dedup.py [0:0]
def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1):
with self.stats.time_stats:
if world_size == 1:
# check that there was not a mistake in setting this values
sig_files = self.data_folder.list_files(glob_pattern="*/*" + ExtensionHelperSD.stage_1_signature)
if any(not sig_file.startswith("0000/") for sig_file in sig_files):
raise ValueError(
f"{world_size=} but found sig files for different hash buckets. Set tasks=finder_workers"
)
else:
sig_files = self.data_folder.list_files(
subdirectory=f"{rank:04d}", glob_pattern=ExtensionHelperSD.stage_1_signature
)
sig_readers = [
read_sigs(file, file_i, config=self.config, lines_to_buffer=self.lines_to_buffer)
for file_i, file in enumerate(self.data_folder.open_files(sig_files))
]
index_files = self.index_folder.list_files() if self.index_folder else None
if index_files:
logger.info(f"Found index file(s): {', '.join(index_files)}")
sig_readers.extend(
[
read_sigs(
file,
len(sig_readers) + file_i,
config=self.config,
index_file=True,
lines_to_buffer=self.lines_to_buffer,
)
for file_i, file in enumerate(self.index_folder.open_files(index_files))
]
)
logger.info(f"Initializing pq with {len(sig_readers)} files.")
with ThreadPoolExecutor() as executor:
pq = [
x
for x in tqdm(
executor.map(lambda x: next(x, None), sig_readers),
total=len(sig_readers),
desc="Initializing pq...",
)
if x
]
heapq.heapify(pq)
logger.info("PQ initialized.")
output_mg = self.output_folder.get_output_file_manager(mode="wb")
packer = struct.Struct("<IH")
last: HashSig | None = None
while pq:
v: HashSig = heapq.heappop(pq)
if (
last and last.hash_value == v.hash_value and not v.is_from_index()
): # we never want to match samples from the index itself
out_filename = f"{rank:04d}/{v.file_stem}{ExtensionHelperSD.stage_2_duplicates}"
# the previous one we are matching against is part of the index
# OR there are no index files
# OR we are also matching within the main dataset
if last.is_from_index() or not index_files or not self.config.only_dedup_in_index:
output_mg.write(out_filename, packer.pack(v.doc_id, v.sent_id))
if not last or last.hash_value != v.hash_value or not last.is_from_index():
last = v
new_v = next(sig_readers[v.file_id], None)
if new_v:
heapq.heappush(pq, new_v)
output_mg.close()