obelics/callers/download_warc.py (63 lines of code) (raw):
import argparse
import logging
from multiprocessing import cpu_count
from datasets import Features, Value, load_from_disk
from obelics.processors import WarcDownloader
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def get_args():
parser = argparse.ArgumentParser(description="Download warc files from Common Crawl pointers.")
parser.add_argument(
"--path_metadata_dataset",
type=str,
default="./large_files/metadata_dataset_10000",
help="Path of the dataset containing the metadata to retrieve the warc files.",
)
parser.add_argument(
"--path_save_dir_warc_dataset",
type=str,
default="./large_files/warc_dataset_10000",
help="The directory to save the warc dataset.",
)
parser.add_argument(
"--num_proc",
type=int,
default=cpu_count(),
help="Number of processes to use for the multiprocessing.",
)
args = parser.parse_args()
return args
if __name__ == "__main__":
args = get_args()
logger.info("Starting loading the metadata or previous warc dataset")
metadata_dataset = load_from_disk(args.path_metadata_dataset)
if ("warc" not in metadata_dataset.column_names) and ("warc_error" not in metadata_dataset.column_names):
metadata_dataset = metadata_dataset.add_column("warc", [b""] * len(metadata_dataset))
metadata_dataset = metadata_dataset.add_column("warc_error", [""] * len(metadata_dataset))
logger.info("Finished loading the metadata or previous warc dataset")
warc_downloader = WarcDownloader()
logger.info("Starting downloading the warc files")
warc_dataset = metadata_dataset.map(
warc_downloader,
num_proc=args.num_proc,
features=Features(
{
**metadata_dataset.features,
"warc": Value("binary"),
"warc_error": Value("string"),
}
),
)
logger.info("Finished downloading the warc files")
logger.info("Starting saving the warc dataset")
warc_dataset.save_to_disk(args.path_save_dir_warc_dataset)
logger.info("Finished saving the warc dataset")
logger.info("Starting computing the success rate")
num_successes = len([1 for el in warc_dataset["warc_error"] if not el])
logger.info(f"Success rate: {num_successes} / {len(warc_dataset)} ({num_successes / len(warc_dataset) * 100}%)")
logger.info("Finished computing the success rate")