build_obelics/09_05_merge_domain_to_duplicated_texts_sharded.py (86 lines of code) (raw):
"""
srun --pty --cpus-per-task=96 bash -i
conda activate /fsx/m4/conda/shared-m4-2023-03-10
"""
import json
import logging
import os
from tqdm import tqdm
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
NUM_SHARDS = 200
PATH_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_S3 = (
"s3://m4-datasets/webdocs/line_dedup_domain_to_duplicated_texts_sharded/"
)
PATH_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_LOCAL = "/scratch/line_dedup_domain_to_duplicated_texts_sharded/"
PATH_SAVE_DISK_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_FULL = "/scratch/line_dedup_domain_to_duplicated_texts.json"
PATH_SAVE_S3_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_FULL = (
"s3://m4-datasets/webdocs/line_dedup_domain_to_duplicated_texts.json"
)
PATH_SAVE_DISK_NEW_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_FULL = (
"/scratch/new_line_dedup_domain_to_duplicated_texts.json"
)
PATH_SAVE_S3_NEW_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_FULL = (
"s3://m4-datasets/webdocs/new_line_dedup_domain_to_duplicated_texts.json"
)
if __name__ == "__main__":
logger.info("Starting downloading the dictionaries to go from a domain to the associated duplicated texts")
command_sync_s3 = (
"aws s3 sync"
f" {PATH_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_S3} {PATH_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_LOCAL}"
)
os.system(command_sync_s3)
os.system(command_sync_s3)
os.system(command_sync_s3)
logger.info("Finished downloading the dictionaries to go from a domain to the associated duplicated texts")
logger.info("Starting merging the sub dictionaries")
all_domain_to_duplicated_texts = []
for idx_shard in tqdm(range(NUM_SHARDS)):
with open(
os.path.join(
PATH_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_LOCAL,
str(idx_shard),
"line_dedup_domain_to_duplicated_texts.json",
)
) as f:
all_domain_to_duplicated_texts.append(json.load(f))
domain_to_duplicated_texts = {
k: v for sub_dict in tqdm(all_domain_to_duplicated_texts) for k, v in sub_dict.items()
}
logger.info("Finished merging the sub dictionaries")
logger.info("Starting saving the dictionary to go from a domain to the associated duplicated texts")
with open(PATH_SAVE_DISK_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_FULL, "w") as f:
json.dump(domain_to_duplicated_texts, f)
command_sync_s3 = (
"aws s3 cp"
f" {PATH_SAVE_DISK_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_FULL} {PATH_SAVE_S3_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_FULL}"
)
os.system(command_sync_s3)
logger.info("Finished saving the dictionary to go from a domain to the associated duplicated texts")
# Find the strategy
# data = {k: v for k, v in domain_to_duplicated_texts.items() if len(v) > 0}
# keys = list(data.keys())
# print([(idx, len(data[keys[idx]])) for idx in range(len(keys))])
# print("\n\n".join([k + f"\t{str(v)}" for k, v in {k: v for k, v in data[keys[5258]].items() if v > 2}.items()]))
logger.info(
"Starting making a smaller version of the dictionary, based on only what we will remove in the line"
" deduplication"
)
new_domain_to_duplicated_texts = {
k: {txt: counter for txt, counter in v.items() if counter > 2}
for k, v in tqdm(domain_to_duplicated_texts.items())
}
new_domain_to_duplicated_texts = {
k: {txt: counter for txt, counter in v.items() if "END_OF_DOCUMENT_TOKEN_TO_BE_REPLACED" not in txt}
for k, v in tqdm(new_domain_to_duplicated_texts.items())
}
new_domain_to_duplicated_texts = {k: v for k, v in new_domain_to_duplicated_texts.items() if len(v) > 0}
logger.info(
"Finished making a smaller version of the dictionary, based on only what we will remove in the line"
" deduplication"
)
logger.info("Starting saving the new dictionary to go from a domain to the associated duplicated texts")
with open(PATH_SAVE_DISK_NEW_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_FULL, "w") as f:
json.dump(new_domain_to_duplicated_texts, f)
command_sync_s3 = (
"aws s3 cp"
f" {PATH_SAVE_DISK_NEW_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_FULL} {PATH_SAVE_S3_NEW_LINE_DEDUP_DOMAIN_TO_DUPLICATED_TEXTS_FULL}"
)
os.system(command_sync_s3)
logger.info("Finished saving the new dictionary to go from a domain to the associated duplicated texts")