fineweb-2-pipeline.py (227 lines of code) (raw):
"""
This file contains the code used to process and create the
FineWeb 2 dataset (https://huggingface.co/datasets/HuggingFaceFW/fineweb-2)
1. we took the non english data we collected during the creation of FineWeb English as a starting point (first pipeline)
2. we then applied GlotLID for language identification covering a large number of languages
3. we filtered each language based on the language score
4. we than ran deduplication per language
5. we applied a filtering pipeline per language
6. applied some finishing touches such as pii removal, ftfy, etc
"""
from functools import partial
import yaml
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupCluster, MinhashDedupFilter, MinhashDedupSignature
from datatrove.pipeline.dedup.minhash import MinhashConfig, MinhashDedupBuckets
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import (
FineWebQualityFilter,
GopherQualityFilter,
GopherRepetitionFilter,
LanguageFilter,
URLFilter, LambdaFilter,
)
from datatrove.pipeline.formatters import PIIFormatter, FTFYFormatter, SymbolLinesFormatter
from datatrove.pipeline.readers import JsonlReader, WarcReader
from datatrove.pipeline.writers.jsonl import JsonlWriter
from datatrove.utils.hashing import HashConfig
"""
1. The non english data from this pipeline was the starting point
"""
DUMP_TO_PROCESS = "CC-MAIN-2023-50" # example
MAIN_OUTPUT_PATH = "s3://some_s3_bucket"
BASE_OUTPUT_PATH = f"{MAIN_OUTPUT_PATH}/base_processing"
LOGS_PATH = "/fsx/guilherme/logs/fineweb-2"
fineweb_english_executor = SlurmPipelineExecutor(
job_name=f"cc_{DUMP_TO_PROCESS}",
pipeline=[
WarcReader(
f"s3://commoncrawl/crawl-data/{DUMP_TO_PROCESS}/segments/",
glob_pattern="*/warc/*", # we want the warc files
default_metadata={"dump": DUMP_TO_PROCESS},
),
URLFilter(exclusion_writer=JsonlWriter(f"{BASE_OUTPUT_PATH}/removed/1_url/{DUMP_TO_PROCESS}")),
Trafilatura(favour_precision=True),
LanguageFilter(
exclusion_writer=JsonlWriter(
# THIS is the data we care about
f"{BASE_OUTPUT_PATH}/2_non_english/{DUMP_TO_PROCESS}",
)
),
# ... other steps from FineWeb english
JsonlWriter(f"{BASE_OUTPUT_PATH}/english/{DUMP_TO_PROCESS}"),
],
tasks=8000,
time="10:00:00",
logging_dir=f"{LOGS_PATH}/base_processing/{DUMP_TO_PROCESS}",
randomize_start_duration=180, # don't hit the bucket all at once with the list requests
mem_per_cpu_gb=2,
partition="hopper-cpu",
)
fineweb_english_executor.run()
"""
2. We then applied GlotLID (we actually applied it to all dumps)
"""
GLOTLID_OUTPUT_PATH = f"{MAIN_OUTPUT_PATH}/glotlid"
for dump in [
"CC-MAIN-2023-50",
# ...
]:
SlurmPipelineExecutor(
job_name=f"glotlid_{dump}",
pipeline=[
JsonlReader(f"{BASE_OUTPUT_PATH}/2_non_english/{DUMP_TO_PROCESS}"),
# we keep annotations of alternative labels that are classified above 0.01
# backend glotlid instead of ft176
LanguageFilter(backend="glotlid", label_only=True, keep_top_pairs_threshold=0.01),
# save in "language_script/dump"
JsonlWriter(GLOTLID_OUTPUT_PATH,
output_filename="${language}_${language_script}/" + dump + "/${rank}.jsonl.gz")
],
tasks=1000,
# workers=50,
mem_per_cpu_gb=4,
logging_dir=f"{LOGS_PATH}/glotlid/{dump}",
partition="hopper-cpu",
randomize_start_duration=5 * 60,
time="10:00:00",
).run()
"""
From this point on, processing is PER LANGUAGE
"""
for lang_script in ["por_Latn", "swh_Latn", "tha_Thai", "ces_Latn"]: #, ...]
# we will save all data here
LANGUAGE_OUTPUT_PATH = f"{MAIN_OUTPUT_PATH}/language/{lang_script}"
# load the specific thresholds, stopwords etc for this language
# we will soon share more details about how we computed them
with open(f"configs/{lang_script}.yml") as f:
filter_config = yaml.safe_load(f)
def above_lang_threshold(doc, threshold):
return doc.metadata["language_score"] >= threshold
"""
3. Enforce a minimum language_score (which changes per language)
"""
lang_score_filtering = SlurmPipelineExecutor(
job_name=f"lang_filter_{lang_script}",
pipeline=[
JsonlReader(f"{GLOTLID_OUTPUT_PATH}/{lang_script}"), # all the data for this language
LambdaFilter(
# we only keep documents with language score above the threshold
filter_function=partial(above_lang_threshold, threshold=filter_config["language_score"]),
exclusion_writer=JsonlWriter(
f"{LANGUAGE_OUTPUT_PATH}/language_filtering/removed")
),
JsonlWriter(f"{LANGUAGE_OUTPUT_PATH}/language_filtering/output")
],
tasks=1000,
mem_per_cpu_gb=4,
logging_dir=f"{LOGS_PATH}/language_filtering/{lang_script}",
partition="hopper-cpu",
randomize_start_duration=5 * 60,
time="10:00:00",
)
"""
4. we then applied minhash deduplication to each language
"""
# you can also change ngrams or the number of buckets and their size here
minhash_config = MinhashConfig(
hash_config=HashConfig(
hash_fc="xxhash",
precision=64, # better precision -> fewer false positives (collisions)
),
num_buckets=14,
hashes_per_bucket=8,
n_grams=5,
)
S3_MINHASH_BASE_PATH = f"{LANGUAGE_OUTPUT_PATH}/minhash"
MINHASH_LOGS_FOLDER = f"{LOGS_PATH}/minhash/{lang_script}"
TOTAL_TASKS = 1000
# this is the original data that we want to deduplicate
INPUT_READER = JsonlReader(
f"{LANGUAGE_OUTPUT_PATH}/language_filtering/output"
) # this is the output from the language threshold filtering
# stage 1 computes minhash signatures for each task (each task gets a set of files)
stage1 = SlurmPipelineExecutor(
job_name=f"mh1_{lang_script}",
pipeline=[
INPUT_READER,
MinhashDedupSignature(
output_folder=f"{S3_MINHASH_BASE_PATH}/signatures", config=minhash_config,
language=lang_script # [!] THIS IS IMPORTANT: we need this to know which word tokenizer to use to split
# into words and ngrams
),
],
tasks=TOTAL_TASKS,
time="5:00:00",
partition="hopper-cpu",
logging_dir=f"{MINHASH_LOGS_FOLDER}/signatures",
randomize_start_duration=180,
depends=lang_score_filtering, # only start after the first one completes
)
stage2 = SlurmPipelineExecutor(
job_name=f"mh2_{lang_script}",
pipeline=[
MinhashDedupBuckets(
input_folder=f"{S3_MINHASH_BASE_PATH}/signatures",
output_folder=f"{S3_MINHASH_BASE_PATH}/buckets",
config=MinhashConfig(hash_config=minhash_config.hash_config),
),
],
tasks=minhash_config.num_buckets * 50, # the code supports parallelizing each bucket. here we run 50
# workers per bucket
randomize_start_duration=180,
logging_dir=f"{MINHASH_LOGS_FOLDER}/buckets",
partition="hopper-cpu",
time="02:00:00",
mem_per_cpu_gb=4,
cpus_per_task=3, # you can add run more (smaller) tasks if you do not have a lot of memory
depends=stage1,
)
# this is the slowest step. If needed, you can use the rust version in datatrove/tools/fast_mh3
stage3 = SlurmPipelineExecutor(
job_name=f"mh3_{lang_script}",
pipeline=[
MinhashDedupCluster(
input_folder=f"{S3_MINHASH_BASE_PATH}/buckets",
output_folder=f"{S3_MINHASH_BASE_PATH}/remove_ids",
config=minhash_config,
save_cluster_size=True # this option allows us to later upsample data based on cluster sizes
),
],
tasks=1, # this step runs on a single task
logging_dir=f"{MINHASH_LOGS_FOLDER}/clusters",
partition="hopper-cpu",
time="30:00:00", # and can also be quite slow. Usually not this slow though
mem_per_cpu_gb=25,
cpus_per_task=8, # if you dedup a full dump, you do need a lot of memory for this one
depends=stage2,
)
stage4 = SlurmPipelineExecutor(
job_name=f"mh4_{lang_script}",
pipeline=[
# we must read the original input in the exact same way (nb of tasks etc)
INPUT_READER,
# before and after dedup
MinhashDedupFilter(input_folder=f"{S3_MINHASH_BASE_PATH}/remove_ids"),
JsonlWriter(f"{S3_MINHASH_BASE_PATH}/output"),
],
tasks=TOTAL_TASKS,
logging_dir=f"{MINHASH_LOGS_FOLDER}/filter",
partition="hopper-cpu",
time="5:00:00",
mem_per_cpu_gb=4,
depends=stage3,
)
# launch dedup pipelines
stage4.run()
"""
5. language specific filtering pipeline
"""
FILTERING_OUTPUT_PATH = f"{LANGUAGE_OUTPUT_PATH}/filtering"
SlurmPipelineExecutor(
job_name=f"filter_{lang_script}",
pipeline=[
# read minhashed data
JsonlReader(f"{S3_MINHASH_BASE_PATH}/output"),
# gopher repetition filter
GopherRepetitionFilter(
language=lang_script, # [!] THIS IS IMPORTANT: we need this to know which word tokenizer to use to split
# into words and ngrams
# we disable these. trafilatura pretty much removes paragraph and we use a different threshold
# for dup_line_char_frac in fineweb quality
dup_para_frac=0,
dup_line_char_frac=0,
dup_para_char_frac=0,
dup_line_frac=filter_config['dup_line_frac'],
top_n_grams=filter_config["top_n_grams"],
dup_n_grams=filter_config["dup_n_grams"],
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/goph_rep/")
),
# fineweb quality
FineWebQualityFilter(
language=lang_script,
short_line_thr=999, # we disable this filter
char_duplicates_ratio=0.1, # we changed this from 0.01 in fineweb english
line_punct_thr=filter_config["line_punct_thr"],
new_line_ratio=filter_config['new_line_ratio'],
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/fw_qual/")
),
# gopher quality filter
GopherQualityFilter(
language=lang_script,
max_avg_word_length=filter_config['max_avg_word_length'],
min_avg_word_length=filter_config['min_avg_word_length'],
stop_words=filter_config['stopwords'],
max_non_alpha_words_ratio=filter_config['max_non_alpha_words_ratio'],
min_stop_words=2,
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/goph_qual/")
),
# we do not apply the C4 filters
JsonlWriter(f"{FILTERING_OUTPUT_PATH}/output"),
],
tasks=TOTAL_TASKS,
logging_dir=f"{LOGS_PATH}/filtering/{lang_script}",
partition="hopper-cpu",
time="5:00:00",
mem_per_cpu_gb=4,
depends=stage3,
).run()
"""
6. final touches
"""
SlurmPipelineExecutor(
job_name=f"final_touches_{lang_script}",
pipeline=[
JsonlReader(f"{FILTERING_OUTPUT_PATH}/output"),
FTFYFormatter(), # fix encoding issues. Important in a multilingual setting
PIIFormatter(), # remove PII
SymbolLinesFormatter(symbols_to_remove=["|"], replace_char="\n"), # fix trafilatura table artifacts
JsonlWriter(f"{LANGUAGE_OUTPUT_PATH}/final_touches"),
],
tasks=TOTAL_TASKS,
logging_dir=f"{LOGS_PATH}/final_touches/{lang_script}",
partition="hopper-cpu",
time="5:00:00",
mem_per_cpu_gb=4,
).run()