de/fileutils.py (83 lines of code) (raw):

import json import subprocess import os from pathlib import Path import pyarrow.parquet as pq def rewrite_to_parquet(src_path, dest_path, block_size=1024 * 1024, **kwargs): """ Reads a Parquet file in blocks and writes them out to another file. :param src_path: Path to the source Parquet file. :param dest_path: Path to the destination Parquet file. :param block_size: Size of the blocks to read and write in bytes. """ src_path = Path(src_path) dest_path = Path(dest_path) with pq.ParquetFile(src_path) as src: schema = src.schema.to_arrow_schema() writer = pq.ParquetWriter(dest_path, schema, **kwargs) for batch in src.iter_batches(batch_size=block_size): writer.write(batch, row_group_size=1024 * 1024) writer.close() src = pq.ParquetFile(src_path) dst = pq.ParquetFile(dest_path) src_metadata = src.metadata dst_metadata = dst.metadata assert src_metadata.num_rows == dst_metadata.num_rows assert ( src_metadata.schema.to_arrow_schema() == dst_metadata.schema.to_arrow_schema() ) def rewrite_to_jsonlines(src, dest, **kwargs): table = pq.read_table(src) table.to_pandas().to_json(dest, orient="records", lines=True, **kwargs) def rewrite_to_sqlite(src, dest, **kwargs): """ Reads a Parquet file and writes it out to a SQLite database. :param src: Path to the source Parquet file. :param dest: Path to the destination SQLite database. """ table = pq.read_table(src) table.to_pandas().to_sql(dest.stem, dest, if_exists="replace", **kwargs) def checkout_file_revisions(file_path, target_dir) -> list[str]: """ Returns a list of short commit hashes for all revisions of the given file. """ file_path = Path(file_path) target_dir = Path(target_dir) cwd = Path.cwd() try: os.chdir(file_path.parent) git_dir = Path( subprocess.check_output( ["git", "rev-parse", "--show-toplevel"], text=True ).strip() ) finally: os.chdir(cwd) git_file = file_path.relative_to(git_dir) git_cmd = ["git", "-C", str(git_dir)] try: command = git_cmd + [ "log", "--pretty=format:%h", "--follow", "--diff-filter=d", "--", str(git_file), ] output = subprocess.check_output(command, text=True) except subprocess.CalledProcessError as e: raise RuntimeError(f"Failed to retrieve revisions for {git_file}") from e revisions = output.strip().split("\n") print(f"{git_file} has {len(revisions)} revisions") for rev in revisions: print("Checking out", rev) command = git_cmd + [ f"--work-tree={target_dir}", "checkout", rev, "--", str(git_file), ] try: subprocess.run(command, check=True) except subprocess.CalledProcessError as e: raise RuntimeError( f"Failed to checkout {file_path} at revision {rev}" ) from e # rename the file to include the commit hash new_file = target_dir / f"{file_path.stem}-{rev}{file_path.suffix}" os.rename(target_dir / git_file, new_file) def get_page_chunk_sizes(paths): # get the result of parquet-layout command for path in paths: output = subprocess.check_output(["parquet-layout", path], text=True) meta = json.loads(output) for row_group in meta["row_groups"]: for column in row_group["columns"]: for page in column["pages"]: if page["page_type"].startswith("data"): yield page["uncompressed_bytes"], page["num_values"]