build_obelics/02_extract_html_get_image_urls.py (144 lines of code) (raw):

import argparse import logging import os from multiprocessing import cpu_count from datasets import load_from_disk from obelics.processors import ( DOMTreeSimplificator, HtmlExtractor, PreExtractionSimplificator, WebDocumentExtractor, ) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", ) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def get_args(): parser = argparse.ArgumentParser( description="Extract html from warc files, simplify them, get the urls of the images." ) parser.add_argument( "idx_job", type=int, help="Index of the job (between 0 and 199).", ) parser.add_argument( "--path_warc_dataset", type=str, default="s3://m4-datasets/webdocs/warc_dataset/", help="Path of the dataset containing the warc files to retrieve the html.", ) parser.add_argument( "--path_save_file_image_urls", type=str, default="/scratch/storage_hugo/image_urls.txt", help="The file to save the urls of all images.", ) parser.add_argument( "--path_save_dir_html_dataset", type=str, default="s3://m4-datasets/webdocs/html_dataset/", help="The directory to save the html dataset.", ) parser.add_argument( "--num_proc", type=int, default=cpu_count(), help="Number of processes to use for the multiprocessing.", ) args = parser.parse_args() return args if __name__ == "__main__": args = get_args() path_save_tmp_files = "/scratch/storage_hugo/" if os.path.exists(path_save_tmp_files): os.system(f"rm -r {path_save_tmp_files}") os.system(f"mkdir {path_save_tmp_files}") logger.info("Starting loading the warc or previous html dataset") path_sync_s3 = os.path.join(args.path_warc_dataset, str(args.idx_job)) path_save_disk_input = f"/scratch/storage_hugo/warc_dataset_{args.idx_job}" if os.path.exists(path_save_disk_input): os.system(f"rm -r {path_save_disk_input}") os.system(f"mkdir {path_save_disk_input}") command_sync_s3 = f"aws s3 sync {path_sync_s3} {path_save_disk_input}" os.system(command_sync_s3) os.system(command_sync_s3) os.system(command_sync_s3) warc_dataset = load_from_disk(path_save_disk_input) if ("html" not in warc_dataset.column_names) and ("html_error" not in warc_dataset.column_names): warc_dataset = warc_dataset.add_column("html", [""] * len(warc_dataset)) warc_dataset = warc_dataset.add_column("html_error", [""] * len(warc_dataset)) logger.info("Finished loading the warc or previous html dataset") html_extractor = HtmlExtractor() logger.info("Starting retrieving the html") html_dataset = warc_dataset.map(html_extractor, num_proc=args.num_proc) logger.info("Finished retrieving the html") logger.info("Starting computing the success rate for the html extraction") num_successes = len([1 for el in html_dataset["html_error"] if not el]) logger.info( f"Success rate for the html extraction: {num_successes} /" f" {len(html_dataset)} ({num_successes / len(html_dataset) * 100}%)" ) logger.info("Finished computing the success rate for the html extraction") dom_tree_simplificator = DOMTreeSimplificator( strip_multiple_linebreaks=True, strip_multiple_spaces=True, remove_html_comments=True, replace_line_break_tags=True, unwrap_tags=True, strip_tags=True, strip_special_divs=True, remove_dates=True, remove_empty_leaves=True, unnest_nodes=True, remake_tree=True, ) pre_extraction_simplificator = PreExtractionSimplificator( only_text_image_nodes=True, format_texts=True, merge_consecutive_text_nodes=True, ) web_document_extractor = WebDocumentExtractor( html_dataset=html_dataset, dom_tree_simplificator=dom_tree_simplificator, pre_extraction_simplificator=pre_extraction_simplificator, path_save_dir_dataset=None, num_proc=args.num_proc, path_save_file_image_urls=args.path_save_file_image_urls, path_save_dir_downloaded_images=None, thread_count=None, number_sample_per_shard=None, image_size=None, resize_mode=None, path_save_dir_tmp_datasets_images=None, path_save_dir_dataset_images=None, path_save_file_map_url_idx=None, num_proc_urls_to_images=None, path_save_dir_sharded_dataset=None, shard_size=None, ) web_document_extractor.html_to_web_documents() html_dataset = web_document_extractor.dataset web_document_extractor.get_image_urls() path_sync_s3 = os.path.join("s3://m4-datasets/webdocs/image_urls/", str(args.idx_job), "image_urls.txt") command_sync_s3 = f"aws s3 cp {args.path_save_file_image_urls} {path_sync_s3}" os.system(command_sync_s3) os.system(command_sync_s3) os.system(command_sync_s3) logger.info("Starting saving the html dataset") path_save_disk_output = f"/scratch/storage_hugo/html_dataset_{args.idx_job}" if os.path.exists(path_save_disk_output): os.system(f"rm -r {path_save_disk_output}") html_dataset.save_to_disk(path_save_disk_output) path_sync_s3 = os.path.join(args.path_save_dir_html_dataset, str(args.idx_job)) command_sync_s3 = f"aws s3 sync {path_save_disk_output} {path_sync_s3}" os.system(command_sync_s3) os.system(command_sync_s3) os.system(command_sync_s3) logger.info("Finished saving the html dataset") logger.info("Starting deleting the tmp files") os.system(f"rm -r {path_save_tmp_files}") logger.info("Finished deleting the tmp files")