misc/reference_datasets/multilingual/download_cc-100.py (162 lines of code) (raw):
from datatrove.executor import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
class CC100Reader(PipelineStep):
def run(self, data=None, rank: int = 0, world_size: int = 1):
languages = [
"af",
"am",
"ar",
"as",
"az",
"be",
"bg",
"bn",
"bn_rom",
"br",
"bs",
"ca",
"cs",
"cy",
"da",
"de",
"el",
"en",
"eo",
"es",
"et",
"eu",
"fa",
"ff",
"fi",
"fr",
"fy",
"ga",
"gd",
"gl",
"gn",
"gu",
"ha",
"he",
"hi",
"hi_rom",
"hr",
"ht",
"hu",
"hy",
"id",
"ig",
"is",
"it",
"ja",
"jv",
"ka",
"kk",
"km",
"kn",
"ko",
"ku",
"ky",
"la",
"lg",
"li",
"ln",
"lo",
"lt",
"lv",
"mg",
"mk",
"ml",
"mn",
"mr",
"ms",
"my",
"my_zaw",
"ne",
"nl",
"no",
"ns",
"om",
"or",
"pa",
"pl",
"ps",
"pt",
"qu",
"rm",
"ro",
"ru",
"sa",
"si",
"sc",
"sd",
"sk",
"sl",
"so",
"sq",
"sr",
"ss",
"su",
"sv",
"sw",
"ta",
"ta_rom",
"te",
"te_rom",
"th",
"tl",
"tn",
"tr",
"ug",
"uk",
"ur",
"ur_rom",
"uz",
"vi",
"wo",
"xh",
"yi",
"yo",
"zh-Hans",
"zh-Hant",
"zu"
]
from fsspec import open as fsspec_open
def get_doc_texts(file):
with fsspec_open(file, mode="rt", compression="xz") as f:
lines = []
for line in f:
if line == "\n":
yield "".join(lines).strip()
lines = []
lines.append(line)
if lines:
yield "".join(lines).strip()
from loguru import logger
from datatrove.data import Document
if rank >= len(languages):
return
from datatrove.pipeline.writers import JsonlWriter
lang = languages[rank]
logger.info(f"Processing \"{lang}\"")
with JsonlWriter(f"/path/to/ref-datasets/cc-100/{lang.lower()}", max_file_size=200 * 2**20) as writer:
for doci, doctext in enumerate(get_doc_texts(f"/path/to/data/cc-100/{lang}.txt.xz")):
doc = Document(
text=doctext,
id=f"cc-100/{lang}/{doci}",
metadata={
"lang": lang
}
)
writer.write(doc)
SlurmPipelineExecutor(
job_name="cc100",
pipeline=[
CC100Reader(),
],
tasks=120,
mem_per_cpu_gb=4,
cpus_per_task=4,
logging_dir="/path/to/logs/multilingual/copy/cc-100",
partition="hopper-cpu",
time="20:00:00"
).run()