pipeline/data/parallel_downloaders.py (141 lines of code) (raw):
"""
Parallel (bilingual) translation dataset downloaders for various external resources like OPUS, mtdata etc.
"""
import shutil
import subprocess
import tarfile
from enum import Enum
from pathlib import Path
import zipfile
from pipeline.common.command_runner import run_command
from pipeline.common.downloads import stream_download_to_file, compress_file, DownloadException
from pipeline.common.logging import get_logger
logger = get_logger(__file__)
class Downloader(Enum):
opus = "opus"
mtdata = "mtdata"
sacrebleu = "sacrebleu"
flores = "flores"
url = "url"
def opus(src: str, trg: str, dataset: str, output_prefix: Path):
"""
Download a dataset from OPUS
https://opus.nlpl.eu/
"""
logger.info("Downloading opus corpus")
name = dataset.split("/")[0]
name_and_version = "".join(c if c.isalnum() or c in "-_ " else "_" for c in dataset)
tmp_dir = output_prefix.parent / "opus" / name_and_version
tmp_dir.mkdir(parents=True, exist_ok=True)
archive_path = tmp_dir / f"{name}.txt.zip"
def download_opus(pair):
url = f"https://object.pouta.csc.fi/OPUS-{dataset}/moses/{pair}.txt.zip"
logger.info(f"Downloading corpus for {pair} {url} to {archive_path}")
stream_download_to_file(url, archive_path)
try:
pair = f"{src}-{trg}"
download_opus(pair)
except DownloadException:
logger.info("Downloading error, trying opposite direction")
pair = f"{trg}-{src}"
download_opus(pair)
logger.info("Extracting directory")
with zipfile.ZipFile(archive_path, "r") as zip_ref:
zip_ref.extractall(tmp_dir)
logger.info("Compressing output files")
for lang in (src, trg):
file_path = tmp_dir / f"{name}.{pair}.{lang}"
compressed_path = compress_file(file_path, keep_original=False, compression="zst")
output_path = output_prefix.with_suffix(f".{lang}.zst")
compressed_path.rename(output_path)
shutil.rmtree(tmp_dir)
logger.info("Done: Downloading opus corpus")
def mtdata(src: str, trg: str, dataset: str, output_prefix: Path):
"""
Download a dataset using MTData
https://github.com/thammegowda/mtdata
"""
logger.info("Downloading mtdata corpus")
from mtdata.iso import iso3_code
tmp_dir = output_prefix.parent / "mtdata" / dataset
tmp_dir.mkdir(parents=True, exist_ok=True)
run_command(["mtdata", "get", "-l", f"{src}-{trg}", "-tr", dataset, "-o", str(tmp_dir)])
for file in tmp_dir.rglob("*"):
logger.info(file)
for lang in (src, trg):
iso = iso3_code(lang, fail_error=True)
file = tmp_dir / "train-parts" / f"{dataset}.{iso}"
compressed_path = compress_file(file, keep_original=False, compression="zst")
compressed_path.rename(output_prefix.with_suffix(f".{lang}.zst"))
shutil.rmtree(tmp_dir)
logger.info("Done: Downloading mtdata corpus")
def url(src: str, trg: str, url: str, output_prefix: Path):
"""
Download a dataset using http url
"""
logger.info("Downloading corpus from a url")
for lang in (src, trg):
file = url.replace("[LANG]", lang)
dest = output_prefix.with_suffix(f".{lang}.zst")
logger.info(f"{lang} destination: {dest}")
stream_download_to_file(file, dest)
logger.info("Done: Downloading corpus from a url")
def sacrebleu(src: str, trg: str, dataset: str, output_prefix: Path):
"""
Download an evaluation dataset using SacreBLEU
https://github.com/mjpost/sacrebleu
"""
logger.info("Downloading sacrebleu corpus")
def try_download(src_lang, trg_lang):
try:
for lang, target in ((src, "src"), (trg, "ref")):
output = str(
run_command(
[
"sacrebleu",
"--test-set",
dataset,
"--language-pair",
f"{src_lang}-{trg_lang}",
"--echo",
target,
],
capture=True,
)
)
output_file = output_prefix.with_suffix(f".{lang}")
with open(output_file, "w") as f:
f.write(output)
compress_file(output_file, keep_original=False, compression="zst")
return True
except subprocess.CalledProcessError:
return False
# Try original direction
success = try_download(src, trg)
if not success:
logger.info("The first import failed, try again by switching the language pair direction.")
# Try reversed direction
if not try_download(trg, src):
raise RuntimeError("Both attempts to download the dataset failed.")
logger.info("Done: Downloading sacrebleu corpus")
def flores(src: str, trg: str, dataset: str, output_prefix: Path):
"""
Download Flores 101 evaluation dataset
https://github.com/facebookresearch/flores/blob/main/previous_releases/flores101/README.md
"""
def flores_code(lang_code):
if lang_code in ["zh", "zh-Hans"]:
return "zho_simpl"
elif lang_code == "zh-Hant":
return "zho_trad"
else:
# Import and resolve ISO3 code using mtdata
from mtdata.iso import iso3_code
return iso3_code(lang_code, fail_error=True)
logger.info("Downloading flores corpus")
tmp_dir = output_prefix.parent / "flores" / dataset
tmp_dir.mkdir(parents=True, exist_ok=True)
archive_path = tmp_dir / "flores101_dataset.tar.gz"
dataset_url = "https://dl.fbaipublicfiles.com/flores101/dataset/flores101_dataset.tar.gz"
stream_download_to_file(dataset_url, archive_path)
with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(path=tmp_dir)
for lang in (src, trg):
code = flores_code(lang)
file = tmp_dir / "flores101_dataset" / dataset / f"{code}.{dataset}"
compressed_path = compress_file(file, keep_original=False, compression="zst")
compressed_path.rename(output_prefix.with_suffix(f".{lang}.zst"))
shutil.rmtree(tmp_dir)
logger.info("Done: Downloading flores corpus")
mapping = {
Downloader.opus: opus,
Downloader.sacrebleu: sacrebleu,
Downloader.flores: flores,
Downloader.url: url,
Downloader.mtdata: mtdata,
}
def download(
downloader: Downloader, src: str, trg: str, dataset: str, output_prefix: Path
) -> None:
"""
Download a parallel dataset using :downloader
:param downloader: downloader type (opus, mtdata etc.)
:param src: source language code
:param trg: target language code
:param dataset: unsanitized dataset name e.g. wikimedia/v20230407 (for OPUS)
:param output_prefix: output files prefix.
Outputs two compressed files <output_prefix>.<src|trg>.zst
"""
logger.info(f"importer: {downloader}")
logger.info(f"src: {src}")
logger.info(f"trg: {trg}")
logger.info(f"dataset: {dataset}")
logger.info(f"output_prefix: {output_prefix}")
Path(output_prefix).parent.mkdir(parents=True, exist_ok=True)
mapping[downloader](src, trg, dataset, output_prefix)