build_obelics/06_03_remove_image_duplicates.py (123 lines of code) (raw):
import argparse
import json
import logging
import os
import pickle
from datasets import load_from_disk
from PIL import Image, ImageFile
Image.MAX_IMAGE_PIXELS = None
ImageFile.LOAD_TRUNCATED_IMAGES = True
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def get_args():
parser = argparse.ArgumentParser(description="Remove the images that are too duplicated.")
parser.add_argument(
"idx_job",
type=int,
help="Index of the job (between 0 and 199).",
)
parser.add_argument(
"--path_web_document_dataset_filtered",
type=str,
default="s3://m4-datasets/webdocs/web_document_dataset_filtered/",
help="Path of the web document dataset filtered.",
)
parser.add_argument(
"--path_tot_image_urls_in_web_document_dataset_filtered_too_duplicated",
type=str,
default="s3://m4-datasets/webdocs/tot_image_urls_in_web_document_dataset_filtered_too_duplicated.pickle",
help="Path of the file containing the image urls to remove.",
)
parser.add_argument(
"--path_save_web_document_dataset_filtered_imgurldedup",
type=str,
default="s3://m4-datasets/webdocs/web_document_dataset_filtered_imgurldedup/",
help="Path to save the web document dataset filtered with the deduplication of image urls.",
)
parser.add_argument(
"--num_proc",
type=int,
default=48,
help="Number of processes to use for the multiprocessing.",
)
args = parser.parse_args()
return args
class ImageURLDeduplication:
def __init__(self, path_image_urls_to_remove):
self.path_image_urls_to_remove = path_image_urls_to_remove
with open(path_image_urls_to_remove, "rb") as f:
self.image_urls_to_remove = set(pickle.load(f))
def __call__(self, web_document):
metadata = json.loads(web_document["metadata"])
indices_to_remove = set(
[
ind
for ind, meta in enumerate(metadata)
if (meta is not None) and (meta["src"] in self.image_urls_to_remove)
]
)
if indices_to_remove:
web_document["texts"] = [
el for ind, el in enumerate(web_document["texts"]) if ind not in indices_to_remove
]
web_document["images"] = [
el for ind, el in enumerate(web_document["images"]) if ind not in indices_to_remove
]
web_document["metadata"] = json.dumps(
[el for ind, el in enumerate(metadata) if ind not in indices_to_remove]
)
return web_document
def __reduce__(self):
return self.__class__, (self.path_image_urls_to_remove,)
if __name__ == "__main__":
args = get_args()
path_save_disk_tmp_files = f"/scratch/storage_hugo_{args.idx_job}/"
if os.path.exists(path_save_disk_tmp_files):
os.system(f"rm -r {path_save_disk_tmp_files}")
os.system(f"mkdir {path_save_disk_tmp_files}")
logger.info("Starting loading the web document dataset filtered and the set of image urls to remove")
path_sync_s3 = os.path.join(args.path_web_document_dataset_filtered, str(args.idx_job))
path_save_disk_web_document_dataset_filtered = os.path.join(
path_save_disk_tmp_files, "web_document_dataset_filtered"
)
os.system(f"mkdir {path_save_disk_web_document_dataset_filtered}")
command_sync_s3 = f"aws s3 sync {path_sync_s3} {path_save_disk_web_document_dataset_filtered}"
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
web_document_dataset_filtered = load_from_disk(path_save_disk_web_document_dataset_filtered)
path_sync_s3 = args.path_tot_image_urls_in_web_document_dataset_filtered_too_duplicated
path_save_disk_image_urls_to_remove = os.path.join(path_save_disk_tmp_files, "image_urls_to_remove.pickle")
command_sync_s3 = f"aws s3 cp {path_sync_s3} {path_save_disk_image_urls_to_remove}"
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
logger.info("Finished loading the web document dataset filtered and the set of image urls to remove")
logger.info("Starting removing the images too deduplicated")
image_url_deduplication = ImageURLDeduplication(path_image_urls_to_remove=path_save_disk_image_urls_to_remove)
web_document_dataset_filtered_imgurldedup = web_document_dataset_filtered.map(
image_url_deduplication,
num_proc=args.num_proc,
)
logger.info("Finished removing the images too deduplicated")
logger.info("Starting saving the web document dataset filtered with the deduplication of image urls")
path_save_disk_web_document_dataset_filtered_imgurldedup = os.path.join(
path_save_disk_tmp_files, "web_document_dataset_filtered_imgurldedup"
)
web_document_dataset_filtered_imgurldedup.save_to_disk(
path_save_disk_web_document_dataset_filtered_imgurldedup, num_proc=args.num_proc
)
path_sync_s3 = os.path.join(args.path_save_web_document_dataset_filtered_imgurldedup, str(args.idx_job))
command_sync_s3 = f"aws s3 sync {path_save_disk_web_document_dataset_filtered_imgurldedup} {path_sync_s3}"
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
logger.info("Finished saving the web document dataset filtered with the deduplication of image urls")
logger.info("Starting deleting the tmp files")
os.system(f"rm -r {path_save_disk_tmp_files}")
logger.info("Finished deleting the tmp files")