def line_deduplicate_web_documents()

in obelics/processors/web_document_line_deduplication.py [0:0]


    def line_deduplicate_web_documents(self):
        logger.info(
            f"Starting line deduplicating the web document dataset for shard {self.id_shard_to_line_deduplicate}"
        )
        with open(self.path_save_domain_to_duplicated_texts) as f:
            self.domain_to_duplicated_texts = json.load(f)

        def func_mac_line_deduplicate_web_documents(example):
            metadata = json.loads(example["metadata"])
            domain = urlparse(self.remove_empty_els_in_list(metadata)[0]["document_url"]).netloc

            indices_to_remove = set()
            for idx in range(len(example["texts"])):
                if example["texts"][idx] is not None:
                    example["texts"][idx] = "\n\n".join(
                        [
                            paragraph
                            for paragraph in example["texts"][idx].split("\n\n")
                            if paragraph not in self.domain_to_duplicated_texts[domain]
                        ]
                    )
                    if not example["texts"][idx]:
                        indices_to_remove.add(idx)

            if indices_to_remove:
                example["texts"] = [el for ind, el in enumerate(example["texts"]) if ind not in indices_to_remove]
                example["images"] = [el for ind, el in enumerate(example["images"]) if ind not in indices_to_remove]
                example["metadata"] = json.dumps(
                    [el for ind, el in enumerate(metadata) if ind not in indices_to_remove]
                )

            return example

        os.system(f"mkdir -p {self.path_save_line_deduplicated_sharded_dataset}")

        path_subdataset = os.path.join(self.path_sharded_dataset, f"shard_{self.id_shard_to_line_deduplicate}")
        sub_ds = load_from_disk(path_subdataset)
        sub_ds_line_deduplicated = sub_ds.map(func_mac_line_deduplicate_web_documents, num_proc=self.num_proc)
        name_shard = os.path.basename(os.path.normpath(path_subdataset))
        sub_ds_line_deduplicated.save_to_disk(
            os.path.join(self.path_save_line_deduplicated_sharded_dataset, name_shard)
        )

        logger.info(
            f"Finished line deduplicating the web document dataset for shard {self.id_shard_to_line_deduplicate}"
        )