misc/reference_datasets/multilingual/copy_raw_data.py (105 lines of code) (raw):
from time import sleep
from datatrove.executor import SlurmPipelineExecutor
from datatrove.pipeline.readers import JsonlReader
from datatrove.pipeline.writers import JsonlWriter
def adapter(self, data: dict, path: str, id_in_file: int | str):
"""
The default data adapter to adapt input data into the datatrove Document format
Args:
data: a dictionary with the "raw" representation of the data
path: file path or source for this sample
id_in_file: its id in this particular file or source
Returns: a dictionary with text, id, media and metadata fields
"""
return {
"text": data.pop(self.text_key, data.pop("content", "")),
"id": data.pop(self.id_key, data.pop("data-id", f"{path}/{id_in_file}")),
"media": data.pop("media", []),
"metadata": data.pop("metadata", {}) | data,
# remaining data goes into metadata
}
class CachedListReader(JsonlReader):
def __init__(self,
data_folder,
dump_to_proc: str,
compression="infer",
limit: int = -1,
skip: int = 0,
file_progress: bool = False,
doc_progress: bool = False,
adapter = None,
text_key: str = "text",
id_key: str = "id",
default_metadata: dict = None,
recursive: bool = True,
glob_pattern: str | None = None,
shuffle_files: bool = False):
super().__init__(
data_folder,
None,
compression,
limit,
skip,
file_progress,
doc_progress,
adapter,
text_key,
id_key,
default_metadata,
recursive,
glob_pattern,
shuffle_files,
)
self.dump_to_proc = dump_to_proc
def run(self, data=None, rank: int = 0, world_size: int = 1):
"""
Will get this rank's shard and sequentially read each file in the shard, yielding Document.
Args:
data: any existing data from previous pipeline stages
rank: rank of the current task
world_size: total number of tasks
Returns:
"""
from loguru import logger
if data:
yield from data
files_shard = []
pathi = 0
with open("/path/to/base_proc_filelist.txt", "rt") as f:
for path in f:
if path.split("/")[1] == self.dump_to_proc:
if (pathi - rank) % world_size == 0:
files_shard.append(path.strip())
pathi += 1
logger.info(f"Loaded {len(files_shard)} for {rank=}, {world_size=}")
if len(files_shard) == 0:
if rank == 0:
raise RuntimeError(f"No files found on {self.data_folder.path}!")
# otherwise just a warning
logger.warning(f"No files found on {self.data_folder.path} for {rank=}")
for doc in self.read_files_shard(files_shard):
self.update_doc_stats(doc)
yield doc
def read_file(self, filepath: str):
import orjson
from orjson import JSONDecodeError
from loguru import logger
with self.data_folder.open(filepath, "r", compression=self.compression) as f:
try:
for li, line in enumerate(f):
with self.track_time():
try:
document = self.get_document_from_dict(orjson.loads(line), filepath, li)
if not document:
continue
except (EOFError, JSONDecodeError) as e:
logger.warning(f"Error when reading `{filepath}`: {e}")
continue
yield document
except UnicodeDecodeError as e:
logger.warning(f"File `{filepath}` may be corrupted: raised UnicodeDecodeError ({e})")
except Exception as e:
if "Error -3 while decompressing data" in str(e):
logger.warning(f"CORRUPTED `{filepath}`: {e}")
else:
logger.warning(f"Unknwon: {e}")
for dump in """CC-MAIN-2014-42
CC-MAIN-2014-23
CC-MAIN-2014-41
CC-MAIN-2014-35
CC-MAIN-2014-15
CC-MAIN-2014-10
CC-MAIN-2013-48
CC-MAIN-2017-17
CC-MAIN-2017-13
CC-MAIN-2017-09
CC-MAIN-2015-18
CC-MAIN-2016-44
CC-MAIN-2014-52
CC-MAIN-2014-49
CC-MAIN-2015-22
CC-MAIN-2022-21
CC-MAIN-2023-23
CC-MAIN-2022-49
CC-MAIN-2017-04
CC-MAIN-2023-40
CC-MAIN-2023-14
CC-MAIN-2023-50
CC-MAIN-2021-43
CC-MAIN-2015-35
CC-MAIN-2016-50
CC-MAIN-2013-20
CC-MAIN-2015-48
CC-MAIN-2023-06
CC-MAIN-2015-11
CC-MAIN-2022-40
CC-MAIN-2015-32
CC-MAIN-2015-06
CC-MAIN-2021-31
CC-MAIN-2022-27
CC-MAIN-2021-04
CC-MAIN-2016-07
CC-MAIN-2017-43
CC-MAIN-2020-40
CC-MAIN-2016-30
CC-MAIN-2021-17
CC-MAIN-2015-27
CC-MAIN-2016-40
CC-MAIN-2021-39
CC-MAIN-2015-14
CC-MAIN-2022-05
CC-MAIN-2020-05
CC-MAIN-2017-34
CC-MAIN-2020-29
CC-MAIN-2017-26
CC-MAIN-2018-05
CC-MAIN-2018-09
CC-MAIN-2016-36
CC-MAIN-2017-22
CC-MAIN-2018-30
CC-MAIN-2020-16
CC-MAIN-2017-47
CC-MAIN-2018-51
CC-MAIN-2017-30
CC-MAIN-2019-35
CC-MAIN-2018-13
CC-MAIN-2019-43
CC-MAIN-2021-10
CC-MAIN-2017-39
CC-MAIN-2021-21
CC-MAIN-2022-33
CC-MAIN-2018-26
CC-MAIN-2020-45
CC-MAIN-2017-51
CC-MAIN-2019-09
CC-MAIN-2016-22
CC-MAIN-2021-49
CC-MAIN-2018-43
CC-MAIN-2018-17
CC-MAIN-2020-50
CC-MAIN-2021-25
CC-MAIN-2015-40
CC-MAIN-2020-24
CC-MAIN-2019-47
CC-MAIN-2024-10
CC-MAIN-2019-22
CC-MAIN-2019-04
CC-MAIN-2016-18
CC-MAIN-2019-30
CC-MAIN-2018-47
CC-MAIN-2019-39
CC-MAIN-2018-39
CC-MAIN-2019-18
CC-MAIN-2019-26
CC-MAIN-2020-34
CC-MAIN-2020-10
CC-MAIN-2019-51
CC-MAIN-2019-13
CC-MAIN-2018-22
CC-MAIN-2018-34
CC-MAIN-2024-18
CC-MAIN-2016-26""".splitlines():
SlurmPipelineExecutor(
job_name=f"cp_{dump}",
pipeline=[
CachedListReader("/path/to/base_processing/non_english",
adapter=adapter, dump_to_proc=dump),
JsonlWriter(f"/path/to/raw_fw/{dump}")
],
tasks=5000,
mem_per_cpu_gb=4,
cpus_per_task=2,
logging_dir=f"/path/to/logs/multilingual/copy/raw_fw/{dump}",
partition="partition",
randomize_start_duration=3 * 60,
time="20:00:00",
max_array_launch_parallel=True
).run()