build_obelics/09_06_line_dedup.py (91 lines of code) (raw):
import json
import logging
import os
import sys
from urllib.parse import urlparse
from datasets import load_from_disk
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
IDX_JOB = sys.argv[1]
PATH_SAVE_DISK_TMP_FILES = f"/scratch/storage_hugo_{IDX_JOB}/"
PATH_DOMAIN_TO_DUPLICATED_TEXTS_S3 = "s3://m4-datasets/webdocs/new_line_dedup_domain_to_duplicated_texts.json"
PATH_DOMAIN_TO_DUPLICATED_TEXTS_LOCAL = os.path.join(
PATH_SAVE_DISK_TMP_FILES, "new_line_dedup_domain_to_duplicated_texts.json"
)
PATH_REDUCED_DOMAIN_TO_DUPLICATED_TEXTS_LOCAL = os.path.join(
PATH_SAVE_DISK_TMP_FILES, "reduced_new_line_dedup_domain_to_duplicated_texts.json"
)
PATH_WEB_DOCS_S3 = (
f"s3://m4-datasets/webdocs/web_document_dataset_filtered_imgurldedup_nsfwfiltered_urldedup_texts_only/{IDX_JOB}/"
)
PATH_WEB_DOCS_LOCAL = os.path.join(PATH_SAVE_DISK_TMP_FILES, "web_docs")
NUM_PROC = 20
PATH_SAVE_DISK_WEB_DOCS_LINE_DEDUP = os.path.join(PATH_SAVE_DISK_TMP_FILES, "web_docs_linededup")
PATH_SAVE_S3_WEB_DOCS_LINE_DEDUP = f"s3://m4-datasets/webdocs/web_document_dataset_filtered_imgurldedup_nsfwfiltered_urldedup_linededup_texts_only/{IDX_JOB}/"
class LineDeduplication:
def __init__(self, path_domain_to_duplicated_texts):
self.path_domain_to_duplicated_texts = path_domain_to_duplicated_texts
with open(path_domain_to_duplicated_texts) as f:
self.domain_to_duplicated_texts = json.load(f)
def __call__(self, example):
domain = urlparse(json.loads(example["general_metadata"])["url"]).netloc
if domain not in self.domain_to_duplicated_texts:
return example
for idx in range(len(example["texts"])):
if example["texts"][idx] is not None:
example["texts"][idx] = "\n\n".join(
[
paragraph
for paragraph in example["texts"][idx].split("\n\n")
if paragraph not in self.domain_to_duplicated_texts[domain]
]
)
return example
def __reduce__(self):
return self.__class__, (self.path_domain_to_duplicated_texts,)
if __name__ == "__main__":
if os.path.exists(PATH_SAVE_DISK_TMP_FILES):
os.system(f"rm -r {PATH_SAVE_DISK_TMP_FILES}")
os.system(f"mkdir {PATH_SAVE_DISK_TMP_FILES}")
logger.info("Starting downloading the dictionary to go from a domain to the associated duplicated texts")
command_sync_s3 = f"aws s3 cp {PATH_DOMAIN_TO_DUPLICATED_TEXTS_S3} {PATH_DOMAIN_TO_DUPLICATED_TEXTS_LOCAL}"
os.system(command_sync_s3)
logger.info("Finished downloading the dictionary to go from a domain to the associated duplicated texts")
logger.info("Starting loading the web docs")
command_sync_s3 = f"aws s3 sync {PATH_WEB_DOCS_S3} {PATH_WEB_DOCS_LOCAL}"
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
web_docs_dataset = load_from_disk(PATH_WEB_DOCS_LOCAL)
logger.info("Finished loading the web docs")
logger.info("Starting reducing the dictionary to go from a domain to the associated duplicated texts")
domains_in_shard = set([urlparse(json.loads(meta)["url"]).netloc for meta in web_docs_dataset["general_metadata"]])
with open(PATH_DOMAIN_TO_DUPLICATED_TEXTS_LOCAL) as f:
domain_to_duplicated_texts = json.load(f)
reduced_domain_to_duplicated_texts = {k: v for k, v in domain_to_duplicated_texts.items() if k in domains_in_shard}
with open(PATH_REDUCED_DOMAIN_TO_DUPLICATED_TEXTS_LOCAL, "w") as f:
json.dump(reduced_domain_to_duplicated_texts, f)
del domain_to_duplicated_texts
del reduced_domain_to_duplicated_texts
logger.info("Finished reducing the dictionary to go from a domain to the associated duplicated texts")
logger.info("Starting line deduplicating documents")
line_deduplication = LineDeduplication(
path_domain_to_duplicated_texts=PATH_REDUCED_DOMAIN_TO_DUPLICATED_TEXTS_LOCAL
)
web_docs_dataset = web_docs_dataset.map(line_deduplication, num_proc=NUM_PROC)
logger.info("Finished line deduplicating documents")
logger.info("Starting saving the web document dataset after the line deduplication")
web_docs_dataset.save_to_disk(PATH_SAVE_DISK_WEB_DOCS_LINE_DEDUP, num_proc=NUM_PROC)
command_sync_s3 = f"aws s3 sync {PATH_SAVE_DISK_WEB_DOCS_LINE_DEDUP} {PATH_SAVE_S3_WEB_DOCS_LINE_DEDUP}"
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
logger.info("Finished saving the web document dataset after the line deduplication")
logger.info("Starting deleting the tmp files")
os.system(f"rm -r {PATH_SAVE_DISK_TMP_FILES}")
logger.info("Finished deleting the tmp files")