utils/find_corpus.py (436 lines of code) (raw):

#!/usr/bin/env python3 """ Finds all opus datasets for a language pair and prints them to set config settings. Usage: task find-corpus -- en ca task find-corpus -- en fr --importer opus """ import argparse import logging import re import sys from typing import Any, Iterable, Literal, NamedTuple, Optional, TypeVar, Union import humanize import requests class OpusDataset(NamedTuple): # The name of this dataset, e.g. "CCAligned" corpus: str # This is a blank string at the time of this writing. documents: str # 'moses' preprocessing: str # The language tag. source: str # The language tag. target: str # The URL to the download url: str # For example "v1" version: str alignment_pairs: int id: int # Size in KiB size: int source_tokens: int target_tokens: int latest: Union[Literal["True"], Literal["False"]] def corpus_key(self) -> str: return f"opus_{self.corpus}/{self.version}" def website_url(self) -> str: return f"https://opus.nlpl.eu/{self.corpus}/{self.source}&{self.target}/{self.version}/{self.corpus}" def humanize_size(self) -> str: return humanize.naturalsize(self.size * 1024) def fetch_opus(source: str, target: str) -> list[OpusDataset]: # This API is documented: https://opus.nlpl.eu/opusapi/ url = f"https://opus.nlpl.eu/opusapi/?source={source}&target={target}&preprocessing=moses&version=latest" datasets = requests.get(url).json() # Convert the response into a typed object that is sorted. datasets_typed = [OpusDataset(**corpus_data) for corpus_data in datasets.get("corpora", [])] return sorted(datasets_typed, key=lambda x: x.alignment_pairs or 0, reverse=True) def get_opus(source: str, target: str, download_url: bool): print("") print("┌──────────────────────────────┐") print("│ OPUS - https://opus.nlpl.eu/ │") print("└──────────────────────────────┘") datasets = fetch_opus(source, target) print_table( [ [ "Dataset", "Code", "Sentences", "Size", "URL", ], *[ [ dataset.corpus, dataset.corpus_key(), dataset.alignment_pairs, dataset.humanize_size(), dataset.url if download_url else dataset.website_url(), ] for dataset in datasets if dataset.alignment_pairs ], ] ) names = [dataset.corpus_key() for dataset in datasets] print_yaml(names, exclude=["OPUS100v", "WMT-News"]) def fetch_sacrebleu(source: str, target: str) -> dict[str, Any]: import sacrebleu return { name: entry for name, entry in sacrebleu.DATASETS.items() if f"{source}-{target}" in entry.langpairs or f"{target}-{source}" in entry.langpairs } def get_sacrebleu(source: str, target: str): datasets_dict = fetch_sacrebleu(source, target) print("") print("┌─────────────────────────────────────────────────┐") print("│ sacrebleu - https://github.com/mjpost/sacrebleu │") print("└─────────────────────────────────────────────────┘") print_table( [ ["Dataset", "Description", "URLs"], *[ [ # name, dataset.description, ", ".join(dataset.data), ] for name, dataset in datasets_dict.items() ], ] ) print_yaml(list(f"sacrebleu_{name}" for name in datasets_dict.keys())) def get_size(tags: list[str]) -> str: size = next( filter( lambda tag: tag.startswith("size_categories:"), tags, ), None, ) if not size or size == "unknown": return "" # Lowercase the text since it's not consistent. return size.replace("size_categories:", "").lower() def get_language_count(tags: list[str]): count = 0 for tag in tags: if tag.startswith("language:"): count = count + 1 return count HF_DATASET_SIZES = { "": 0, "unknown": 0, "n<1k": 1, "1k<n<10k": 2, "10k<100k": 3, "10k<n<100k": 3, "100k<n<1m": 4, "1m<n<10m": 5, "10m<n<100m": 6, "100m<n<1b": 7, "1b<n<10b": 8, "10b<n<100b": 9, "100b<n<1t": 10, } def get_huggingface_monolingual(language: str): """ Returns monolingual datasets ordered by size. Datasets with few downloads are ignored as they are probably low quality and not trustworthy. """ from huggingface_hub import DatasetFilter, HfApi api = HfApi() datasets = list( api.list_datasets( filter=DatasetFilter( # language=language, multilinguality="monolingual", ) ) ) datasets.sort(key=lambda dataset: -dataset.downloads) datasets.sort(key=lambda dataset: -HF_DATASET_SIZES.get(get_size(dataset.tags), 0)) print("") print("┌─────────────────────────────────────────────────┐") print("│ huggingface monolingual data │") print("└─────────────────────────────────────────────────┘") print_table( [ ["ID", "Size", "Downloads"], *[ [ # f"https://huggingface.co/datasets/{dataset.id}", get_size(dataset.tags), dataset.downloads, ] for dataset in datasets if is_useful_dataset(dataset) ], ] ) def get_huggingface_parallel(source: str, target: str): """ Returns parallel datasets ordered by size. Datasets with few downloads are ignored as they are probably low quality and not trustworthy. """ from huggingface_hub import DatasetFilter, HfApi api = HfApi() datasets = list( api.list_datasets( filter=DatasetFilter( # language=[source, target], ) ) ) datasets.sort(key=lambda dataset: -dataset.downloads) datasets.sort(key=lambda dataset: -HF_DATASET_SIZES.get(get_size(dataset.tags), 0)) print("") print( "┌────────────────────────────────────────────────────────────────────────────────────────────────────┐" ) print( f"│ huggingface parallel data https://huggingface.co/datasets?language=language:{source},language:{target}" ) print( "└────────────────────────────────────────────────────────────────────────────────────────────────────┘" ) print_table( [ ["ID", "Size", "Downloads"], *[ [ # f"https://huggingface.co/datasets/{dataset.id}", get_size(dataset.tags), dataset.downloads, ] for dataset in datasets if is_useful_dataset(dataset) ], ] ) def is_useful_dataset(dataset: Any) -> bool: """Determines if a dataset is useful or not.""" return "task_categories:automatic-speech-recognition" not in dataset.tags def get_huggingface_any(language: str): """ Returns parallel datasets ordered by size. Datasets with few downloads are ignored as they are probably low quality and not trustworthy. """ from huggingface_hub import DatasetFilter, HfApi api = HfApi() datasets = list( api.list_datasets( filter=DatasetFilter( # language=language, ) ) ) datasets.sort(key=lambda dataset: -dataset.downloads) datasets.sort(key=lambda dataset: -HF_DATASET_SIZES.get(get_size(dataset.tags), 0)) print("") print("┌─────────────────────────────────────────────────────────────────────────────┐") print(f"│ huggingface any data https://huggingface.co/datasets?language=language:{language}") print("└─────────────────────────────────────────────────────────────────────────────┘") print_table( [ ["ID", "Size", "Downloads"], *[ [ # f"https://huggingface.co/datasets/{dataset.id}", get_size(dataset.tags), dataset.downloads, ] for dataset in datasets if is_useful_dataset(dataset) ], ] ) def get_remote_file_size( url: str, display_not_200: bool = True ) -> tuple[Optional[int], Optional[str]]: try: response = requests.head(url, timeout=1, allow_redirects=True) if response.ok: if "Content-Length" in response.headers: int_size = int(response.headers.get("Content-Length", 0)) return int_size, humanize.naturalsize(int_size) # Try again using GET. else: if display_not_200: print(f"Failed to retrieve file information for: {url}") print(f"Status code: {response.status_code}") return None, None # Sometimes when the HEAD does not have the Content-Length, the GET response does. response = requests.get(url, timeout=1, allow_redirects=True, stream=True) int_size = int(response.headers.get("Content-Length", 0)) response.close() return int_size, humanize.naturalsize(int_size) except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") return None, None T = TypeVar("T") from mtdata.entry import Entry def fetch_mtdata(source: str, target: str) -> dict[str, Entry]: """ Returns a dict that maps the corpus key to the mtdata entry. """ # mtdata outputs debug logs logging.disable(logging.CRITICAL) from mtdata.entry import BCP47Tag from mtdata.index import get_entries from mtdata.iso import iso3_code source_tricode = iso3_code(source, fail_error=True) target_tricode = iso3_code(target, fail_error=True) entries = sorted( get_entries((BCP47Tag(source_tricode), BCP47Tag(target_tricode)), None, None, True), key=lambda entry: entry.did.group, ) def get_corpus_key(entry): return ( f"mtdata_{entry.did.group}-{entry.did.name}-{entry.did.version}-{entry.did.lang_str}" ) entries = {get_corpus_key(entry): entry for entry in entries} excludes = ["opus", "newstest", "unv1"] # lowercase excludes. def is_excluded(corpus_key: str) -> bool: for exclude in excludes: if exclude in corpus_key.lower(): return True return False # Filter out the excluded entries. return { corpus_key: entry for corpus_key, entry in entries.items() if not is_excluded(corpus_key) } def get_mtdata(source: str, target: str): entries = fetch_mtdata(source, target) print("") print("┌────────────────────────────────────────────────┐") print("│ mtdata - https://github.com/thammegowda/mtdata │") print("└────────────────────────────────────────────────┘") print_table( [ [ "Dataset", "URL", # "Size", ], *[ [ # corpus_key, entry.url, # get_remote_file_size(entry.url), ] for corpus_key, entry in entries.items() # Filter out the excludes ], ] ) print_yaml(entries.keys()) class MonoDataset(NamedTuple): name: str url: str size: Optional[int] display_size: Optional[str] lines_num: Optional[int] def fetch_news_crawl(lang: str) -> list[MonoDataset]: base_url = f"https://data.statmt.org/news-crawl/{lang}/" response = requests.get(base_url, allow_redirects=True) datasets = [] if response.ok: # Example row: (indentation and newlines added) # <tr> # <td valign="top"><img src="/icons/compressed.gif" alt="[ ]"></td> # <td><a href="news.2013.en.shuffled.deduped.gz">news.2013.en.shuffled.deduped.gz</a></td> # <td align="right">2019-01-14 10:23 </td> # <td align="right">1.2G</td> # <td>&nbsp;</td> # </tr> regex = re.compile( r""" # Match the file name year. # >news.2008.en.shuffled.deduped.gz< # ^^^^ >news.(\d+)\.\w+\.shuffled\.deduped\.gz< [^\n]* # Match the file size and unit. # <td align="right">176M</td> # ^^^^ <td\ align="right"> ([\d\.]+)(\w+) </td> """, re.VERBOSE, ) matches = re.findall(regex, response.text) if matches: for year, size_number, size_unit in matches: multiplier = 1 if size_unit == "K": multiplier = 1_000 elif size_unit == "M": multiplier = 1_000_000 elif size_unit == "G": multiplier = 1_000_000_000 name = f"news-crawl_news.{year}" url = f"https://data.statmt.org/news-crawl/{lang}/news.{year}.{lang}.shuffled.deduped.gz" size = int(float(size_number) * multiplier) datasets.append(MonoDataset(name, url, size, f"{size_number}{size_unit}", None)) else: print("The regex could not find newscrawl datasets for", lang) else: print("No newscrawl data was available for", lang) return datasets def get_news_crawl(source: str, target: str): for lang in (source, target): datasets = fetch_news_crawl(lang) print("") print("┌─────────────────────────────────────────────────────────────────────┐") print(f"│ news-crawl ({lang}) - https://github.com/data.statmt.org/news-crawl │") print("└─────────────────────────────────────────────────────────────────────┘") print_table( [ [ "Dataset", "URL", "Size", ], *[[name, url, display_size] for name, url, _, display_size, _ in datasets], ] ) print_yaml([name for name, _, _, _, _ in datasets]) def fetch_hplt(lang: str, prefixes=("08", "09")) -> list[MonoDataset]: all_datasets = [] for threshold in prefixes: for i in range(5): shard_id = i + 1 base_url = f"https://storage.googleapis.com/releng-translations-dev/data/mono-hplt/{threshold}/hplt_filtered_{lang}_{shard_id}.count.txt" response = requests.get(base_url, allow_redirects=True) if response.ok: lines_number = int(response.content) url = f"https://storage.googleapis.com/releng-translations-dev/data/mono-hplt/{threshold}/hplt_filtered_{lang}_{shard_id}.txt.zst" dataset = MonoDataset(f"url_{url}", url, None, None, lines_number) all_datasets.append(dataset) return all_datasets def print_yaml(names: Iterable[str], exclude: list[str] = []): cleaned = set() for name in names: filter = False for ex in exclude: if ex.lower() in name.lower(): filter = True break if not filter: cleaned.add(name) print("\nYAML:") if len(cleaned) == 0: print("(no datasets)\n") else: print("\n".join(sorted([f" - {name}" for name in cleaned]))) def print_table(table: list[list[Any]]): """ Nicely print a table, the first row is the header """ # Compute the column lengths. transposed_table = list(map(list, zip(*table))) column_lengths = [max(len(str(x)) for x in column) for column in transposed_table] print("") for index, row in enumerate(table): # Print the row. for datum, max_len in zip(row, column_lengths): print(str(datum).ljust(max_len), end=" ") print("") # Print a separator between the header and the rest of the table. if index == 0: for length in column_lengths: print("".ljust(length, "─"), end=" ") print("") if len(table) == 1: print("(no datasets)") def main(args_list: Optional[list[str]] = None) -> None: importers = [ "opus", "sacrebleu", "mtdata", "huggingface_mono", "huggingface_parallel", "huggingface_any", "news-crawl", "hplt-mono", ] parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawTextHelpFormatter, # Preserves whitespace in the help text. ) parser.add_argument("source", type=str, nargs="?", help="Source language code") parser.add_argument("target", type=str, nargs="?", help="Target language code") parser.add_argument( "--importer", type=str, help=f"The importer to use: {', '.join(importers)}", ) parser.add_argument( "--download_url", action="store_true", default=False, help="Show the download url if available.", ) args = parser.parse_args(args_list) if not args.source or not args.target: parser.print_help() sys.exit(1) if args.importer and args.importer not in importers: print(f'"{args.importer}" is not a valid importer.') sys.exit(1) if args.importer == "opus" or not args.importer: get_opus(args.source, args.target, args.download_url) if args.importer == "sacrebleu" or not args.importer: get_sacrebleu(args.source, args.target) if args.importer == "mtdata" or not args.importer: get_mtdata(args.source, args.target) if args.importer == "huggingface_mono" or not args.importer: get_huggingface_monolingual(args.target if args.source == "en" else args.source) if args.importer == "huggingface_parallel" or not args.importer: get_huggingface_parallel(args.source, args.target) if args.importer == "huggingface_any" or not args.importer: get_huggingface_any(args.target if args.source == "en" else args.source) if args.importer == "news-crawl" or not args.importer: get_news_crawl(args.source, args.target) if __name__ == "__main__": main()