build_obelics/06_01_create_set_image_urls_in_webdocs.py (90 lines of code) (raw):
import argparse
import json
import logging
import os
import pickle
from collections import Counter
from multiprocessing import cpu_count
from datasets import load_from_disk
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def get_args():
parser = argparse.ArgumentParser(description="Create the set of image urls in the web document dataset.")
parser.add_argument(
"idx_job",
type=int,
help="Index of the job (between 0 and 199).",
)
parser.add_argument(
"--path_web_document_dataset_filtered",
type=str,
default="s3://m4-datasets/webdocs/web_document_dataset_filtered/",
help="Path of the web document dataset filtered.",
)
parser.add_argument(
"--path_save_image_urls_in_web_document_dataset_filtered",
type=str,
default="s3://m4-datasets/webdocs/image_urls_in_web_document_dataset_filtered/",
help="Path to save the image URLs in the web document dataset filtered.",
)
parser.add_argument(
"--num_proc",
type=int,
default=cpu_count(),
help="Number of processes to use for the multiprocessing.",
)
args = parser.parse_args()
return args
if __name__ == "__main__":
args = get_args()
path_save_disk_tmp_files = f"/scratch/storage_hugo_{args.idx_job}/"
if os.path.exists(path_save_disk_tmp_files):
os.system(f"rm -r {path_save_disk_tmp_files}")
os.system(f"mkdir {path_save_disk_tmp_files}")
logger.info("Starting loading the web document dataset filtered")
path_sync_s3 = os.path.join(args.path_web_document_dataset_filtered, str(args.idx_job))
path_save_disk_web_document_dataset_filtered = os.path.join(
path_save_disk_tmp_files, "web_document_dataset_filtered"
)
os.system(f"mkdir {path_save_disk_web_document_dataset_filtered}")
command_sync_s3 = f"aws s3 sync {path_sync_s3} {path_save_disk_web_document_dataset_filtered}"
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
web_document_dataset_filtered = load_from_disk(path_save_disk_web_document_dataset_filtered)
logger.info("Finished loading the web document dataset filtered")
logger.info("Starting making the set of image URLs in the web document dataset filtered")
web_document_dataset_filtered = web_document_dataset_filtered.remove_columns(
[c_n for c_n in web_document_dataset_filtered.column_names if c_n != "metadata"]
)
metadata = web_document_dataset_filtered["metadata"]
logger.info("Step 1 done")
metadata = [[el["src"] for el in json.loads(md) if el] for md in metadata]
logger.info("Step 2 done")
metadata = [sub_el for el in metadata for sub_el in el]
metadata = Counter(metadata)
logger.info("Finished making the set of image URLs in the web document dataset filtered")
logger.info("Starting saving the set of image URLs in the web document dataset filtered")
path_save_disk_image_urls_in_web_document_dataset_filtered = os.path.join(
path_save_disk_tmp_files, "image_urls_in_web_document_dataset_filtered.pickle"
)
with open(path_save_disk_image_urls_in_web_document_dataset_filtered, "wb") as f:
pickle.dump(metadata, f, pickle.HIGHEST_PROTOCOL)
path_sync_s3 = os.path.join(
args.path_save_image_urls_in_web_document_dataset_filtered,
str(args.idx_job),
"image_urls_in_web_document_dataset_filtered.pickle",
)
command_sync_s3 = f"aws s3 cp {path_save_disk_image_urls_in_web_document_dataset_filtered} {path_sync_s3}"
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
logger.info("Finished saving the set of image URLs in the web document dataset filtered")
logger.info("Starting deleting the tmp files")
os.system(f"rm -r {path_save_disk_tmp_files}")
logger.info("Finished deleting the tmp files")