misc/reference_datasets/multilingual/download_mc4.py (29 lines of code) (raw):

from datatrove.executor import SlurmPipelineExecutor from datatrove.pipeline.base import PipelineStep from datatrove.pipeline.readers import ParquetReader, JsonlReader from datatrove.pipeline.writers import JsonlWriter def adapter(self, data: dict, path: str, id_in_file: int | str): """ The default data adapter to adapt input data into the datatrove Document format Args: data: a dictionary with the "raw" representation of the data path: file path or source for this sample id_in_file: its id in this particular file or source Returns: a dictionary with text, id, media and metadata fields """ import os.path if "validation." in path: return {} return { "text": data.pop(self.text_key, ""), "id": data.pop(self.id_key, f"{path}/{id_in_file}"), "media": data.pop("media", []), "metadata": {"language": os.path.basename(path).split(".")[0].split("-")[1]} | data.pop("metadata", {}) | data, # remaining data goes into metadata } SlurmPipelineExecutor( job_name="mc4", pipeline=[ JsonlReader("hf://datasets/allenai/c4/multilingual", glob_pattern="c4-*.*.json.gz", adapter=adapter), JsonlWriter("/path/to/ref-datasets/mc4", output_filename="${language}" + "/${rank}.jsonl.gz") ], tasks=300, # workers=50, mem_per_cpu_gb=4, logging_dir="/path/to/logs/multilingual/copy/mc4", partition="partition", randomize_start_duration=10 * 60, time="20:00:00" ).run()