build_obelics/08_01_prepare_urldedup.py (64 lines of code) (raw):
"""
srun --pty --cpus-per-task=96 bash -i
conda activate /fsx/m4/conda/shared-m4-2023-03-10
"""
import json
import os
from collections import Counter
from datasets import load_from_disk
from tqdm import tqdm
NUM_SHARDS = 200
PATH_WEB_DOCS_S3 = "s3://m4-datasets/webdocs/web_document_dataset_filtered_imgurldedup_nsfwfiltered_texts_only/"
PATH_WEB_DOCS_LOCAL = "/scratch/web_docs_texts_only/"
PATH_SAVE_DISK_DUP_URLS = "/scratch/dup_urls.json"
PATH_SAVE_S3_DUP_URLS = "s3://m4-datasets/webdocs/dup_urls.json"
def unroll_list(list_):
list_ = [sub_el for el in list_ for sub_el in el]
return list_
if __name__ == "__main__":
command_sync_s3 = f"aws s3 sync {PATH_WEB_DOCS_S3} {PATH_WEB_DOCS_LOCAL}"
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
ds_shards = [
load_from_disk(os.path.join(PATH_WEB_DOCS_LOCAL, str(idx_shard))) for idx_shard in tqdm(range(NUM_SHARDS))
]
all_urls = []
for idx_shard in tqdm(range(NUM_SHARDS)):
all_urls.extend([json.loads(meta)["url"] for meta in ds_shards[idx_shard]["general_metadata"]])
print(f"Total number of documents: {len(all_urls)}")
# Total number of documents: 361_209_568
del all_urls
del ds_shards
dup_urls = Counter(all_urls)
dup_urls = {k: v for k, v in dup_urls.items() if v > 1}
sum_dup_urls = sum(list(dup_urls.values()))
print(f"{len(dup_urls)} URLs appear at least twice, for a total of {sum_dup_urls} documents")
# 80_236_663 URLs appear at least twice, for a total of 196_108_071 documents
print(f"We can then remove {sum_dup_urls - len(dup_urls)} documents")
# We can then remove 115_871_408 documents
# This block of lines was done in multiprocessing, one shard per job (200 jobs), and 24 cpus per job
# The result is one dup_urls_to_warcfilename per job,
# saved at /fsx/hugo/trash/dup_urls_to_warcfilename/dup_urls_to_warcfilename_{idx_shard}.json,
# and we will have to merge them after
dup_urls_to_warcfilename = {}
for idx_shard in tqdm(range(NUM_SHARDS)):
ds_shard = load_from_disk(os.path.join(PATH_WEB_DOCS_LOCAL, str(idx_shard)), keep_in_memory=True)
urls_shard = [json.loads(meta)["url"] for meta in ds_shard["general_metadata"]]
for idx, url in enumerate(urls_shard):
if url in dup_urls:
dup_urls_to_warcfilename[url] = dup_urls_to_warcfilename.get(url, []) + [
json.loads(ds_shard[idx]["general_metadata"])["warc_filename"]
]
# We start from the result of the last operation which was done in multiple jobs
# We need to merge the different dup_urls_to_warcfilename since we created one per job
all_dup_urls_to_warcfilename = []
for idx_shard in tqdm(range(NUM_SHARDS)):
with open(f"/fsx/hugo/trash/dup_urls_to_warcfilename/dup_urls_to_warcfilename_{idx_shard}.json") as f:
all_dup_urls_to_warcfilename.append(json.load(f))
dup_urls_to_warcfilename = {
url: unroll_list(
[
all_dup_urls_to_warcfilename[idx_shard][url]
for idx_shard in range(NUM_SHARDS)
if url in all_dup_urls_to_warcfilename[idx_shard]
]
)
for url in tqdm(dup_urls)
}
print(len(dup_urls_to_warcfilename)) # Check
print(sum([len(dup_urls_to_warcfilename[url]) for url in dup_urls_to_warcfilename])) # Check
# We only keep the most recent document in a group with a common url, all the others will be deleted
dup_urls_to_warcfilename = {
url: sorted(warc_filenames)[-1] for url, warc_filenames in tqdm(dup_urls_to_warcfilename.items())
}
with open(PATH_SAVE_DISK_DUP_URLS, "w") as f:
json.dump(dup_urls_to_warcfilename, f)
command_sync_s3 = f"aws s3 cp {PATH_SAVE_DISK_DUP_URLS} {PATH_SAVE_S3_DUP_URLS}"
os.system(command_sync_s3)