in obelics/processors/web_document_line_deduplication.py [0:0]
def line_deduplicate_web_documents(self):
logger.info(
f"Starting line deduplicating the web document dataset for shard {self.id_shard_to_line_deduplicate}"
)
with open(self.path_save_domain_to_duplicated_texts) as f:
self.domain_to_duplicated_texts = json.load(f)
def func_mac_line_deduplicate_web_documents(example):
metadata = json.loads(example["metadata"])
domain = urlparse(self.remove_empty_els_in_list(metadata)[0]["document_url"]).netloc
indices_to_remove = set()
for idx in range(len(example["texts"])):
if example["texts"][idx] is not None:
example["texts"][idx] = "\n\n".join(
[
paragraph
for paragraph in example["texts"][idx].split("\n\n")
if paragraph not in self.domain_to_duplicated_texts[domain]
]
)
if not example["texts"][idx]:
indices_to_remove.add(idx)
if indices_to_remove:
example["texts"] = [el for ind, el in enumerate(example["texts"]) if ind not in indices_to_remove]
example["images"] = [el for ind, el in enumerate(example["images"]) if ind not in indices_to_remove]
example["metadata"] = json.dumps(
[el for ind, el in enumerate(metadata) if ind not in indices_to_remove]
)
return example
os.system(f"mkdir -p {self.path_save_line_deduplicated_sharded_dataset}")
path_subdataset = os.path.join(self.path_sharded_dataset, f"shard_{self.id_shard_to_line_deduplicate}")
sub_ds = load_from_disk(path_subdataset)
sub_ds_line_deduplicated = sub_ds.map(func_mac_line_deduplicate_web_documents, num_proc=self.num_proc)
name_shard = os.path.basename(os.path.normpath(path_subdataset))
sub_ds_line_deduplicated.save_to_disk(
os.path.join(self.path_save_line_deduplicated_sharded_dataset, name_shard)
)
logger.info(
f"Finished line deduplicating the web document dataset for shard {self.id_shard_to_line_deduplicate}"
)