misc/reference_datasets/monolingual/zh/download_mapcc.py (140 lines of code) (raw):

import gzip import itertools import json import fsspec from datatrove.executor import LocalPipelineExecutor, SlurmPipelineExecutor from datatrove.pipeline.base import PipelineStep from datatrove.pipeline.writers import JsonlWriter import orjson class ConcatenatedFileStream: def __init__(self, filepaths): self.filepaths = filepaths self.file_index = 0 self.current_file = None self._open_next_file() def _open_next_file(self): if self.current_file: self.current_file.close() if self.file_index < len(self.filepaths): print(f"opening {self.filepaths[self.file_index]}") self.current_file = fsspec.open(self.filepaths[self.file_index], mode="rb").open() self.file_index += 1 else: self.current_file = None def read(self, size=-1): result = b"" while size != 0: if self.current_file is None: break # No more files to read from chunk = self.current_file.read(size) if not chunk: # End of current file self._open_next_file() else: result += chunk if size > 0: size -= len(chunk) return result def close(self): if self.current_file: self.current_file.close() class JsonlPartReader(JsonlReader): def __init__( self, data_folder, adapter=None, text_key: str = "text", id_key: str = "id", default_metadata: dict = None, recursive: bool = True, glob_pattern: str | None = None, ): super().__init__( data_folder, adapter=adapter, text_key=text_key, id_key=id_key, default_metadata=default_metadata, recursive=recursive, glob_pattern=glob_pattern, ) def read_files_shard(self, shard: list[str]): """ Reads a list of files and yield Documents Args: shard: a list of file paths Returns: generator of Document """ from tqdm import tqdm li = 0 skipped = 0 with ( tqdm( total=self.limit if self.limit != -1 else None, desc="Document progress", unit="doc", disable=not self.doc_progress, ) as doc_pbar, tqdm(total=len(shard), desc="File progress", unit="file", disable=not self.file_progress) as file_pbar, ): for i, filepath in enumerate(shard): self.stat_update("input_files") di = 0 for di, document in enumerate(self.read_file(filepath)): if skipped < self.skip: skipped += 1 continue if self.limit != -1 and li >= self.limit: break yield document doc_pbar.update() li += 1 file_pbar.update() self.stat_update("documents", value=di, unit="input_file") if self.limit != -1 and li >= self.limit: break def open_concatenated_gzip_files(filepaths): # Create a concatenated binary stream concatenated_stream = ConcatenatedFileStream(filepaths) # Wrap it with gzip to decompress gzip_stream = gzip.GzipFile(fileobj=concatenated_stream, mode='r') return gzip_stream class ExtractMapccStep(PipelineStep): """Pipeline step to extract MAP-CC Chinese data. Reads downloaded MAP-CC files and yields documents with metadata. """ def run(self, data, rank: int = 0, world_size: int = 1): if rank != 0: return with open_concatenated_gzip_files(data) as f: for li, line in enumerate(itertools.islice(f, 0, None)): yield orjson.loads(line) class CollectMapccStep(PipelineStep): """Base pipeline block, all blocks should inherit from this one. Takes care of some general things such as handling dependencies, and stats Args: name: Name of the step type: Type of the step Types are high-level categories of steps, e.g. "Reader", "Tokenizer", "Filters", etc. """ def run(self, data, rank: int = 0, world_size: int = 1): from tqdm import tqdm import os from datatrove.io import get_datafolder # Create results directory if it doesn't exist os.makedirs("results", exist_ok=True) # Initialize fsspec with Hugging Face protocol df = get_datafolder("hf://datasets/m-a-p/MAP-CC") # Download from m-a-p/MAP-CC to results folder files_to_download = [] for file in df.list_files(recursive=True, glob_pattern="zh_cc.jsonl.gz*"): files_to_download.append(file) output_path = "/path/to/ref-datasets/mapcc" for file in files_to_download[rank::world_size]: print(file) print(f"Downloading {file}") output_file = f"{output_path}/{os.path.basename(file)}" # Get file size for progress bar file_size = df.info(file)['size'] # Open input file in binary mode and write chunks directly to output with df.open(file, "rb") as source, open(output_file, "wb") as dest: with tqdm(total=file_size, unit='B', unit_scale=True, desc=os.path.basename(file)) as pbar: while True: chunk = source.read(8192) if not chunk: break dest.write(chunk) pbar.update(len(chunk)) if __name__ == "__main__": SlurmPipelineExecutor( job_name="mapcc-collect", pipeline=[ CollectMapccStep(), ExtractMapccStep(), JsonlWriter("/path/to/ref-datasets/monolingual/zh/mapcc", output_filename="${rank}.jsonl.gz", max_file_size=2*2**30) ], logging_dir="/path/to/logs/dataset_download_logs/zh/mapcc-collect", randomize_start_duration=3 * 60, tasks=100, mem_per_cpu_gb=2, cpus_per_task=1, workers=10, partition="partition", time="11:59:59", ).run()