build_obelics/01_download_warc.py (89 lines of code) (raw):
import argparse
import logging
import os
from multiprocessing import cpu_count
from datasets import Features, Value, load_from_disk
from obelics.processors import WarcDownloader
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def get_args():
parser = argparse.ArgumentParser(description="Download warc files from Common Crawl pointers.")
parser.add_argument(
"idx_job",
type=int,
help="Index of the job (between 0 and 199).",
)
parser.add_argument(
"--path_metadata_dataset",
type=str,
default="s3://m4-datasets/webdocs/pointers_cc_dataset/",
help="Path of the dataset containing the metadata to retrieve the warc files.",
)
parser.add_argument(
"--path_save_dir_warc_dataset",
type=str,
default="s3://m4-datasets/webdocs/warc_dataset/",
help="The directory to save the warc dataset.",
)
parser.add_argument(
"--num_proc",
type=int,
default=4 * cpu_count(),
help="Number of processes to use for the multiprocessing.",
)
args = parser.parse_args()
return args
if __name__ == "__main__":
args = get_args()
path_save_tmp_files = "/scratch/storage_hugo/"
if os.path.exists(path_save_tmp_files):
os.system(f"rm -r {path_save_tmp_files}")
os.system(f"mkdir {path_save_tmp_files}")
logger.info("Starting loading the metadata or previous warc dataset")
path_sync_s3 = os.path.join(args.path_metadata_dataset, str(args.idx_job))
path_save_disk_input = "/scratch/storage_hugo/pointers_cc_ds/"
os.system(f"mkdir {path_save_disk_input}")
command_sync_s3 = f"aws s3 sync {path_sync_s3} {path_save_disk_input}"
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
metadata_dataset = load_from_disk(path_save_disk_input)
if ("warc" not in metadata_dataset.column_names) and ("warc_error" not in metadata_dataset.column_names):
metadata_dataset = metadata_dataset.add_column("warc", [b""] * len(metadata_dataset))
metadata_dataset = metadata_dataset.add_column("warc_error", [""] * len(metadata_dataset))
logger.info("Finished loading the metadata or previous warc dataset")
warc_downloader = WarcDownloader()
logger.info("Starting downloading the warc files")
warc_dataset = metadata_dataset.map(
warc_downloader,
num_proc=args.num_proc,
features=Features(
{
**metadata_dataset.features,
"warc": Value("binary"),
"warc_error": Value("string"),
}
),
)
logger.info("Finished downloading the warc files")
logger.info("Starting saving the warc dataset")
path_save_disk_output = "/scratch/storage_hugo/warc_ds"
warc_dataset.save_to_disk(path_save_disk_output)
path_sync_s3 = os.path.join(args.path_save_dir_warc_dataset, str(args.idx_job))
command_sync_s3 = f"aws s3 sync {path_save_disk_output} {path_sync_s3}"
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
logger.info("Finished saving the warc dataset")
logger.info("Starting computing the success rate")
num_successes = len([1 for el in warc_dataset["warc_error"] if not el])
logger.info(f"Success rate: {num_successes} / {len(warc_dataset)} ({num_successes / len(warc_dataset) * 100}%)")
logger.info("Finished computing the success rate")
logger.info("Starting deleting the tmp files")
os.system(f"rm -r {path_save_tmp_files}")
logger.info("Finished deleting the tmp files")