build_obelics/08_02_urldedup.py (82 lines of code) (raw):

import json import logging import os import sys from datasets import load_from_disk from PIL import Image, ImageFile # Useful to avoid DecompressionBombError and truncated image error Image.MAX_IMAGE_PIXELS = None ImageFile.LOAD_TRUNCATED_IMAGES = True logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", ) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) MAX_NUM_RETRIES_SYNC = 3 IDX_JOB = sys.argv[1] PATH_SAVE_DISK_TMP_FILES = f"/scratch/storage_hugo_{IDX_JOB}/" PATH_DUP_URLS_S3 = "s3://m4-datasets/webdocs/dup_urls.json" PATH_DUP_URLS_LOCAL = os.path.join(PATH_SAVE_DISK_TMP_FILES, "dup_urls.json") PATH_WEB_DOCS_S3 = f"s3://m4-datasets/webdocs/web_document_dataset_filtered_imgurldedup_nsfwfiltered/{IDX_JOB}/" PATH_WEB_DOCS_LOCAL = os.path.join(PATH_SAVE_DISK_TMP_FILES, "web_docs") NUM_PROC = 10 PATH_SAVE_DISK_WEB_DOCS_URL_DEDUP = os.path.join( PATH_SAVE_DISK_TMP_FILES, "web_document_dataset_filtered_imgurldedup_nsfwfiltered_urldedup" ) PATH_SAVE_S3_WEB_DOCS_URL_DEDUP = os.path.join( "s3://m4-datasets/webdocs/web_document_dataset_filtered_imgurldedup_nsfwfiltered_urldedup", str(IDX_JOB) ) class URLDeduplication: def __init__(self, path_dup_urls): self.path_dup_urls = path_dup_urls with open(path_dup_urls) as f: self.dup_urls = json.load(f) def __call__(self, example): general_metadata = json.loads(example["general_metadata"]) url, warc_filename = general_metadata["url"], general_metadata["warc_filename"] if url in self.dup_urls: if warc_filename != self.dup_urls[url]: return False # Bonus: removes documents without any images metadata = [meta for meta in json.loads(example["metadata"]) if meta] if not metadata: return False return True def __reduce__(self): return self.__class__, (self.path_dup_urls,) if __name__ == "__main__": if os.path.exists(PATH_SAVE_DISK_TMP_FILES): os.system(f"rm -r {PATH_SAVE_DISK_TMP_FILES}") os.system(f"mkdir {PATH_SAVE_DISK_TMP_FILES}") logger.info("Starting downloading the set of duplicated urls") command_sync_s3 = f"aws s3 cp {PATH_DUP_URLS_S3} {PATH_DUP_URLS_LOCAL}" os.system(command_sync_s3) logger.info("Finished downloading the set of duplicated urls") logger.info("Starting loading the web docs") command_sync_s3 = f"aws s3 sync {PATH_WEB_DOCS_S3} {PATH_WEB_DOCS_LOCAL}" for _ in range(MAX_NUM_RETRIES_SYNC): os.system(command_sync_s3) web_docs_dataset = load_from_disk(PATH_WEB_DOCS_LOCAL) num_docs_before_filtering = web_docs_dataset.num_rows logger.info("Finished loading the web docs") logger.info("Starting deduplicating documents on URLs") url_deduplication = URLDeduplication(path_dup_urls=PATH_DUP_URLS_LOCAL) web_docs_dataset = web_docs_dataset.filter(url_deduplication, num_proc=NUM_PROC) logger.info("Finished deduplicating documents on URLs") logger.info("Starting saving the web document dataset after the URL deduplication") web_docs_dataset.save_to_disk(PATH_SAVE_DISK_WEB_DOCS_URL_DEDUP, num_proc=NUM_PROC) command_sync_s3 = f"aws s3 sync {PATH_SAVE_DISK_WEB_DOCS_URL_DEDUP} {PATH_SAVE_S3_WEB_DOCS_URL_DEDUP}" for _ in range(MAX_NUM_RETRIES_SYNC): os.system(command_sync_s3) logger.info("Finished saving the web document dataset after the URL deduplication") logger.info( "Number of documents in the web document dataset before the URL deduplication and the removal of documents" f" without images): {num_docs_before_filtering}" ) logger.info( "Number of documents in the web document dataset after the URL deduplication and the removal of documents" f" without images): {web_docs_dataset.num_rows}" ) logger.info("Starting deleting the tmp files") os.system(f"rm -r {PATH_SAVE_DISK_TMP_FILES}") logger.info("Finished deleting the tmp files")