build_obelics/09_02_get_domain_to_positions.py (62 lines of code) (raw):

import json import logging import os from urllib.parse import urlparse from datasets import load_from_disk from tqdm import tqdm logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", ) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) NUM_SHARDS = 200 PATH_SAVE_DISK_TMP_FILES = "/scratch/storage_hugo/" PATH_WEB_DOCS_S3 = ( "s3://m4-datasets/webdocs/web_document_dataset_filtered_imgurldedup_nsfwfiltered_urldedup_texts_only/" ) PATH_WEB_DOCS_LOCAL = os.path.join(PATH_SAVE_DISK_TMP_FILES, "web_docs") PATH_SAVE_DISK_LINE_DEDUP_DOMAIN_TO_POSITIONS = os.path.join( PATH_SAVE_DISK_TMP_FILES, "line_dedup_domain_to_positions.json" ) PATH_SAVE_S3_LINE_DEDUP_DOMAIN_TO_POSITIONS = "s3://m4-datasets/webdocs/line_dedup_domain_to_positions.json" def get_domain_to_positions(): domain_to_positions = {} for idx_shard in tqdm(range(NUM_SHARDS)): path_subdataset = os.path.join(PATH_WEB_DOCS_LOCAL, str(idx_shard)) sub_ds = load_from_disk(path_subdataset) metadata_sub_ds = sub_ds["general_metadata"] domains = [urlparse(json.loads(meta)["url"]).netloc for meta in metadata_sub_ds] new_domain_to_pos = {} for idx, domain in enumerate(domains): new_domain_to_pos[domain] = new_domain_to_pos.get(domain, []) + [idx] for domain in new_domain_to_pos: if domain not in domain_to_positions: domain_to_positions[domain] = {} domain_to_positions[domain][str(idx_shard)] = new_domain_to_pos[domain] return domain_to_positions if __name__ == "__main__": if os.path.exists(PATH_SAVE_DISK_TMP_FILES): os.system(f"rm -r {PATH_SAVE_DISK_TMP_FILES}") os.system(f"mkdir {PATH_SAVE_DISK_TMP_FILES}") logger.info("Starting downloading the web document dataset (texts only)") command_sync_s3 = f"aws s3 sync {PATH_WEB_DOCS_S3} {PATH_WEB_DOCS_LOCAL}" os.system(command_sync_s3) os.system(command_sync_s3) os.system(command_sync_s3) logger.info("Finished downloading the web document dataset (texts only)") logger.info("Starting creating the dictionary to go from a domain name to positions in the web document dataset") domain_to_positions = get_domain_to_positions() logger.info("Finished creating the dictionary to go from a domain name to positions in the web document dataset") logger.info("Starting saving the domain to positions") with open(PATH_SAVE_DISK_LINE_DEDUP_DOMAIN_TO_POSITIONS, "w") as f: json.dump(domain_to_positions, f) command_sync_s3 = ( f"aws s3 cp {PATH_SAVE_DISK_LINE_DEDUP_DOMAIN_TO_POSITIONS} {PATH_SAVE_S3_LINE_DEDUP_DOMAIN_TO_POSITIONS}" ) os.system(command_sync_s3) logger.info("Finished saving the domain to positions") logger.info("Starting deleting the tmp files") os.system(f"rm -r {PATH_SAVE_DISK_TMP_FILES}") logger.info("Finished deleting the tmp files")