build_obelics/04_merge_web_docs_with_images.py (140 lines of code) (raw):

import argparse import logging import os from copy import deepcopy from multiprocessing import cpu_count import datasets from datasets import concatenate_datasets, load_from_disk from PIL import Image, ImageFile Image.MAX_IMAGE_PIXELS = None ImageFile.LOAD_TRUNCATED_IMAGES = True URL_BAN_WORDS = ["logo", "button", "icon", "plugin", "widget", "porn", "xxx", "sex"] logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", ) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def get_args(): parser = argparse.ArgumentParser( description="Merge the web document dataset without images with the dataset of images." ) parser.add_argument( "idx_job", type=int, help="Index of the job (between 0 and 199).", ) parser.add_argument( "--path_web_document_dataset_without_images", type=str, default="s3://m4-datasets/webdocs/web_document_dataset_without_images/", help="Path of the web document dataset without the images.", ) parser.add_argument( "--path_image_dataset_1", type=str, default="s3://m4-datasets/webdocs/image_dataset/", help="Path of the dataset containing the images.", ) parser.add_argument( "--path_image_dataset_2", type=str, default="s3://m4-datasets/webdocs/image_dataset_2/", help="Path of the second dataset containing the images.", ) parser.add_argument( "--path_save_dir_web_document_dataset", type=str, default="s3://m4-datasets/webdocs/web_document_dataset/", help="Path to save the web document dataset with the images.", ) parser.add_argument( "--num_proc", type=int, default=cpu_count(), help="Number of processes to use for the multiprocessing.", ) args = parser.parse_args() return args def urls_to_images(web_document_dataset_without_images, image_dataset, map_url_idx, url_ban_words, num_proc): def retrieve_image(url): if url not in map_url_idx: return None if any([url_ban_word in url for url_ban_word in url_ban_words]): return None # Uncomment if one process seems silently killed without throwing any error in the `map` function. # It's rare, but approximately 1/100M pages contain many huge pictures that break things. # 2M in bytes was chosen by looking at the distribution of the length in bytes of pictures. # It would remove only 1/1000 picture. # if len(image_dataset[map_url_idx[url]]["image"]) > 2_000_000: # return None image = {"path": None, "bytes": image_dataset[map_url_idx[url]]["image"]} return image def func_urls_to_images_urls_in_images_col(example): # Uncomment if one process seems silently killed without throwing any error in the `map` function. # It's rare, but approximately 1/100M pages contain many huge pictures that break things. # num_images = len([1 for url in example["images"] if url in map_url_idx]) # if num_images > 50: # example["images"] = [None for url in example["images"]] # return example example["images"] = [retrieve_image(url) if url else None for url in example["images"]] return example logger.info("Starting replacing urls by images") new_features = deepcopy(web_document_dataset_without_images.features) new_features["images"] = datasets.Sequence(datasets.Image()) web_document_dataset = web_document_dataset_without_images.map( func_urls_to_images_urls_in_images_col, features=new_features, num_proc=num_proc, load_from_cache_file=False, ) logger.info("Finished replacing urls by images") return web_document_dataset if __name__ == "__main__": args = get_args() path_save_disk_tmp_files = f"/scratch/storage_hugo_{args.idx_job}/" if os.path.exists(path_save_disk_tmp_files): os.system(f"rm -r {path_save_disk_tmp_files}") os.system(f"mkdir {path_save_disk_tmp_files}") logger.info("Starting loading the previous web document dataset without the images") path_sync_s3 = os.path.join(args.path_web_document_dataset_without_images, str(args.idx_job)) path_save_disk_web_document_dataset_without_images = os.path.join( path_save_disk_tmp_files, "web_document_dataset_without_images" ) os.system(f"mkdir {path_save_disk_web_document_dataset_without_images}") command_sync_s3 = f"aws s3 sync {path_sync_s3} {path_save_disk_web_document_dataset_without_images}" os.system(command_sync_s3) os.system(command_sync_s3) os.system(command_sync_s3) web_document_dataset_without_images = load_from_disk(path_save_disk_web_document_dataset_without_images) logger.info("Finished loading the previous web document dataset without the images") logger.info("Starting loading the image datasets and the mapping") path_sync_s3 = os.path.join(args.path_image_dataset_1, str(args.idx_job)) path_save_disk_image_dataset_1 = os.path.join(path_save_disk_tmp_files, "image_dataset_1") os.system(f"mkdir {path_save_disk_image_dataset_1}") command_sync_s3 = f"aws s3 sync {path_sync_s3} {path_save_disk_image_dataset_1}" os.system(command_sync_s3) os.system(command_sync_s3) os.system(command_sync_s3) image_dataset_1 = load_from_disk(path_save_disk_image_dataset_1) path_sync_s3 = os.path.join(args.path_image_dataset_2, str(args.idx_job)) path_save_disk_image_dataset_2 = os.path.join(path_save_disk_tmp_files, "image_dataset_2") os.system(f"mkdir {path_save_disk_image_dataset_2}") command_sync_s3 = f"aws s3 sync {path_sync_s3} {path_save_disk_image_dataset_2}" os.system(command_sync_s3) os.system(command_sync_s3) os.system(command_sync_s3) image_dataset_2 = load_from_disk(path_save_disk_image_dataset_2) logger.info("Starting concatenating the image datasets") image_dataset = concatenate_datasets([image_dataset_1, image_dataset_2]) logger.info("Finished concatenating the image datasets") logger.info("Starting creating the mapping") map_url_idx = {url: idx for idx, url in enumerate(image_dataset["url"])} logger.info("Finished creating the mapping") logger.info("Finished loading the image datasets and the mapping") logger.info("Starting to merge the web document dataset without images and the dataset containing the images") web_document_dataset = urls_to_images( web_document_dataset_without_images, image_dataset, map_url_idx, URL_BAN_WORDS, args.num_proc ) logger.info("Finished to merge the web document dataset without images and the dataset containing the images") logger.info("Starting saving the web document dataset") path_save_disk_web_document_dataset = os.path.join(path_save_disk_tmp_files, "web_document_dataset") web_document_dataset.save_to_disk(path_save_disk_web_document_dataset, num_proc=args.num_proc) path_sync_s3 = os.path.join(args.path_save_dir_web_document_dataset, str(args.idx_job)) command_sync_s3 = f"aws s3 sync {path_save_disk_web_document_dataset} {path_sync_s3}" os.system(command_sync_s3) os.system(command_sync_s3) os.system(command_sync_s3) logger.info("Finished saving the web document dataset") logger.info("Starting deleting the tmp files") os.system(f"rm -r {path_save_disk_tmp_files}") logger.info("Finished deleting the tmp files")