build_obelics/09_04_get_domain_to_duplicated_texts.py (90 lines of code) (raw):

import json import logging import os import sys from datasets import load_from_disk from tqdm import tqdm logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", ) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) NUM_SHARDS = 200 IDX_JOB = int(sys.argv[1]) PATH_SAVE_DISK_TMP_FILES = f"/scratch/storage_hugo_{IDX_JOB}/" PATH_WEB_DOCS_S3 = ( "s3://m4-datasets/webdocs/web_document_dataset_filtered_imgurldedup_nsfwfiltered_urldedup_texts_only/" ) PATH_WEB_DOCS_LOCAL = os.path.join(PATH_SAVE_DISK_TMP_FILES, "web_docs") PATH_LINE_DEDUP_DOMAIN_TO_POSITIONS_S3 = ( f"s3://m4-datasets/webdocs/line_dedup_domain_to_positions_sharded/{IDX_JOB}/line_dedup_domain_to_positions.json" ) PATH_LINE_DEDUP_DOMAIN_TO_POSITIONS_LOCAL = os.path.join( PATH_SAVE_DISK_TMP_FILES, "line_dedup_domain_to_positions.json" ) PATH_SAVE_DISK_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS = os.path.join( PATH_SAVE_DISK_TMP_FILES, "line_dedup_domain_to_duplicated_texts.json" ) PATH_SAVE_S3_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS = f"s3://m4-datasets/webdocs/line_dedup_domain_to_duplicated_texts_sharded/{IDX_JOB}/line_dedup_domain_to_duplicated_texts.json" def get_domain_to_duplicated_texts(domain_to_positions): shard_to_domain_to_positions = { str(idx_shard): { domain: domain_to_positions[domain][str(idx_shard)] for domain in domain_to_positions if str(idx_shard) in domain_to_positions[domain] } for idx_shard in range(NUM_SHARDS) } domain_to_duplicated_texts = {} for idx_shard in tqdm(range(NUM_SHARDS)): ds_shard = load_from_disk(os.path.join(PATH_WEB_DOCS_LOCAL, str(idx_shard)), keep_in_memory=True) for domain in shard_to_domain_to_positions[str(idx_shard)]: if domain not in domain_to_duplicated_texts: domain_to_duplicated_texts[domain] = {} positions = shard_to_domain_to_positions[str(idx_shard)][domain] for pos in positions: tot_texts = [txt for txt in ds_shard[pos]["texts"] if txt] tot_texts = [text.split("\n\n") for text in tot_texts] tot_texts = [paragraph for text in tot_texts for paragraph in text] for text in tot_texts: domain_to_duplicated_texts[domain][text] = domain_to_duplicated_texts[domain].get(text, 0) + 1 domain_to_duplicated_texts = { domain: {k: v for k, v in domain_to_duplicated_texts[domain].items() if v > 1} for domain in domain_to_duplicated_texts } return domain_to_duplicated_texts if __name__ == "__main__": if os.path.exists(PATH_SAVE_DISK_TMP_FILES): os.system(f"rm -r {PATH_SAVE_DISK_TMP_FILES}") os.system(f"mkdir {PATH_SAVE_DISK_TMP_FILES}") logger.info( "Starting downloading the web document dataset (texts only) and to dictionary to go from a domain to positions" ) command_sync_s3 = f"aws s3 sync {PATH_WEB_DOCS_S3} {PATH_WEB_DOCS_LOCAL}" os.system(command_sync_s3) os.system(command_sync_s3) os.system(command_sync_s3) command_sync_s3 = f"aws s3 cp {PATH_LINE_DEDUP_DOMAIN_TO_POSITIONS_S3} {PATH_LINE_DEDUP_DOMAIN_TO_POSITIONS_LOCAL}" os.system(command_sync_s3) with open(PATH_LINE_DEDUP_DOMAIN_TO_POSITIONS_LOCAL) as f: domain_to_positions = json.load(f) logger.info( "Finished downloading the web document dataset (texts only) and to dictionary to go from a domain to positions" ) logger.info("Starting finding the duplicated texts for each domain") domain_to_duplicated_texts = get_domain_to_duplicated_texts(domain_to_positions) logger.info("Finished finding the duplicated texts for each domain") logger.info("Starting saving the domain to duplicated texts") with open(PATH_SAVE_DISK_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS, "w") as f: json.dump(domain_to_duplicated_texts, f) command_sync_s3 = ( "aws s3 cp" f" {PATH_SAVE_DISK_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS} {PATH_SAVE_S3_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS}" ) os.system(command_sync_s3) logger.info("Finished saving the domain to duplicated texts") logger.info("Starting deleting the tmp files") os.system(f"rm -r {PATH_SAVE_DISK_TMP_FILES}") logger.info("Finished deleting the tmp files")