de/cli.py (486 lines of code) (raw):
import json
import functools
from pathlib import Path
import sys
import tempfile
import click
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm.contrib.concurrent import process_map
from rich.console import Console
from rich.table import Table
from humanize import naturalsize
import plotly.graph_objects as go
from .estimate import estimate_de, estimate_xtool
from .fileutils import (
rewrite_to_parquet,
rewrite_to_jsonlines,
rewrite_to_sqlite,
checkout_file_revisions,
get_page_chunk_sizes,
)
from .synthetic import (
FakeDataGenerator,
write_and_compare_parquet,
write_and_compare_json,
write_and_compare_sqlite,
convert_dedupe_images_to_png,
)
def pyarrow_has_cdc():
# check that pyarrow is compoiled with cdc support
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = Path(temp_dir)
table = pa.table({"id": [1, 2, 3, 4, 5]})
try:
pq.write_table(
table, temp_dir / "test.parquet", use_content_defined_chunking=True
)
except TypeError:
return False
return True
def pretty_print_stats(results):
# dump the results to the console as a rich formatted table
console = Console()
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Title")
table.add_column("Total Size", justify="right")
table.add_column("Chunk Size", justify="right")
table.add_column("Compressed Chunk Size", justify="right")
table.add_column("Dedup Ratio", justify="right")
table.add_column("Compressed Dedup Ratio", justify="right")
table.add_column("Transmitted XTool Bytes", justify="right")
for i, row in enumerate(results):
table.add_row(
row["title"],
naturalsize(row["total_len"], binary=True),
naturalsize(row["chunk_bytes"], binary=True),
naturalsize(row["compressed_chunk_bytes"], binary=True),
"{:.0%}".format(row["chunk_bytes"] / results[i]["total_len"]),
"{:.0%}".format(row["compressed_chunk_bytes"] / results[i]["total_len"]),
naturalsize(row["transmitted_xtool_bytes"], binary=True)
if "transmitted_xtool_bytes" in row
else "",
)
console.print(table)
@click.group()
def cli():
if not pyarrow_has_cdc():
click.echo("PyArrow is not compiled with CDC support.", err=True)
sys.exit(1)
@cli.command()
@click.argument("schema", default='{"a": "int", "b": "str", "c": ["int"]}', type=str)
@click.option(
"--target-dir",
"-d",
help="Directory to store the files at",
type=click.Path(file_okay=False, writable=True),
required=True,
default="synthetic",
)
@click.option(
"--size", "-s", default=1, help="Number of millions or records to generate"
)
@click.option(
"--num-edits", "-e", default=10, help="Number of changes to make in the data"
)
@click.option(
"--edit-size", default=10, help="Number of rows to change in each edit", type=int
)
@click.option("--with-json", is_flag=True, help="Also calculate JSONLines stats")
@click.option("--with-sqlite", is_flag=True, help="Also calculate SQLite stats")
@click.option("--use-dictionary", is_flag=True, help="Use parquet dictionary encoding")
@click.option(
"--cdc-min-size", default=256, help="Minimum CDC chunk size in KiB", type=int
)
@click.option(
"--cdc-max-size", default=1024, help="Maximum CDC chunk size in KiB", type=int
)
@click.option("--cdc-norm-level", default=0, help="CDC normalization level", type=int)
def synthetic(
schema,
size,
num_edits,
edit_size,
target_dir,
use_dictionary,
with_json,
with_sqlite,
cdc_min_size,
cdc_max_size,
cdc_norm_level,
):
"""Generate synthetic data and compare the deduplication ratios.
de synthetic -s 1 -e 1 '{"a": "int"}'
de synthetic -s 1 -e 2 '{"a": "int"}'
de synthetic -s 4 -e 1 '{"a": "int"}'
de synthetic -s 4 -e 2 '{"a": "int"}'
de synthetic -s 1 -e 1 '{"a": "int", "b": "str", "c": ["int"]}'
de synthetic -s 1 -e 2 '{"a": "int", "b": "str", "c": ["int"]}'
de synthetic -s 4 -e 1 '{"a": "int", "b": "str", "c": ["int"]}'
de synthetic -s 4 -e 2 '{"a": "int", "b": "str", "c": ["int"]}'
de render-readme README.md.jinja2
"""
directory = Path(target_dir)
directory.mkdir(exist_ok=True)
edit_points = np.linspace(0.5 / num_edits, 1 - 0.5 / num_edits, num_edits)
schema = json.loads(schema)
gen = FakeDataGenerator(schema, seed=42)
original, tables = gen.generate_synthetic_tables(
size=size * 2**20,
edit_size=edit_size,
edit_points=list(edit_points),
append_ratio=0.05,
update_columns={k: [k] for k in schema.keys()},
)
prefix = f"s{size}c{len(schema)}e{num_edits}"
cdc_params = {
"min_chunk_size": cdc_min_size * 1024,
"max_chunk_size": cdc_max_size * 1024,
"norm_level": cdc_norm_level,
}
print("Writing Parquet files without CDC")
results = write_and_compare_parquet(
directory,
original,
tables,
prefix=prefix,
postfix="nocdc",
use_content_defined_chunking=False,
use_dictionary=use_dictionary,
)
print("Writing Parquet files with CDC")
results += write_and_compare_parquet(
directory,
original,
tables,
prefix=prefix,
postfix="cdc",
use_content_defined_chunking=cdc_params,
use_dictionary=use_dictionary,
# data_page_size=100 * 1024 * 1024,
)
if with_json:
print("Writing JSONLines files")
results += write_and_compare_json(directory, original, tables, prefix=prefix)
if with_sqlite:
print("Writing SQLite files")
results += write_and_compare_sqlite(directory, original, tables, prefix=prefix)
convert_dedupe_images_to_png(directory)
for row in results:
row["title"] = (
f"{row['edit'].capitalize()} / {row['compression']} / {row['kind']}"
)
results = sorted(results, key=lambda x: x["title"])
pretty_print_stats(results)
@cli.command()
@click.argument("files", nargs=-1, type=click.Path(exists=True))
@click.option(
"--target-dir",
"-d",
help="Directory to store the revisions",
type=click.Path(file_okay=False, writable=True),
required=True,
)
def revisions(files, target_dir):
"""Checkout all revisions of the given files and calculate the deduplication ratio."""
target_dir = Path("revisions") if target_dir is None else Path(target_dir)
target_dir.mkdir(exist_ok=True)
for file_path in files:
checkout_file_revisions(file_path, target_dir=target_dir)
@cli.command()
@click.argument("directory", type=click.Path(exists=True, file_okay=False))
@click.option("--with-json", is_flag=True, help="Also calculate JSONLines stats")
@click.option("--with-sqlite", is_flag=True, help="Also calculate SQLite stats")
@click.option("--skip-zstd", is_flag=True, help="Skip ZSTD rewrite")
@click.option("--skip-snappy", is_flag=True, help="Skip Snappy rewrite")
@click.option("--skip-rewrite", is_flag=True, help="Skip file rewriting")
@click.option("--skip-json-rewrite", is_flag=True, help="Skip JSON rewrite")
@click.option("--skip-sqlite-rewrite", is_flag=True, help="Skip SQLite rewrite")
@click.option("--skip-parquet-rewrite", is_flag=True, help="Skip Parquet rewrite")
@click.option(
"--disable-dictionary", is_flag=True, help="Disallow parquet dictionary encoding"
)
@click.option(
"--cdc-min-size", default=256, help="Minimum CDC chunk size in KiB", type=int
)
@click.option(
"--cdc-max-size", default=1024, help="Maximum CDC chunk size in KiB", type=int
)
@click.option(
"--data-page-size",
default=1024 * 1024,
help="Parquet data page size in bytes",
type=int,
)
@click.option("--cdc-norm-level", default=0, help="CDC normalization level", type=int)
@click.option(
"--max-processes",
"-p",
default=None,
type=int,
help="Maximum number of processes to use",
)
def stats(
directory,
with_json,
with_sqlite,
skip_zstd,
skip_snappy,
skip_rewrite,
skip_json_rewrite,
skip_sqlite_rewrite,
skip_parquet_rewrite,
disable_dictionary,
cdc_min_size,
cdc_max_size,
cdc_norm_level,
data_page_size,
max_processes,
):
# go over all the parquet files in the directory, read them, generate a cdc
# enabled version and compare the deduplication ratios of all the files
# written without and with CDC
files = [
path for path in Path(directory).rglob("*.parquet") if "cdc" not in path.name
]
json_files = [path.with_name(path.stem + ".jsonlines") for path in files]
sqlite_files = [path.with_name(path.stem + ".sqlite") for path in files]
cdc_zstd_files = [path.with_name(path.stem + "-zstd-cdc.parquet") for path in files]
cdc_snappy_files = [
path.with_name(path.stem + "-snappy-cdc.parquet") for path in files
]
if with_json and not (skip_rewrite or skip_json_rewrite):
print("Writing JSONLines files")
process_map(rewrite_to_jsonlines, files, json_files)
if with_sqlite and not (skip_rewrite or skip_sqlite_rewrite):
print("Writing SQLite files")
process_map(rewrite_to_sqlite, files, sqlite_files)
kwargs = {
"use_content_defined_chunking": {
"min_chunk_size": cdc_min_size * 1024,
"max_chunk_size": cdc_max_size * 1024,
"norm_level": cdc_norm_level,
},
"use_dictionary": not disable_dictionary,
"data_page_size": data_page_size,
}
if not (skip_rewrite or skip_parquet_rewrite or skip_zstd):
print("Writing CDC Parquet files with ZSTD compression")
if max_processes == 1:
for src_path, dst_path in zip(files, cdc_zstd_files):
rewrite_to_parquet(src_path, dst_path, compression="snappy", **kwargs)
else:
process_map(
functools.partial(rewrite_to_parquet, compression="zstd", **kwargs),
files,
cdc_zstd_files,
max_workers=max_processes,
)
if not (skip_rewrite or skip_parquet_rewrite or skip_snappy):
print("Writing CDC Parquet files with Snappy compression")
if max_processes == 1:
for src_path, dst_path in zip(files, cdc_snappy_files):
rewrite_to_parquet(src_path, dst_path, compression="snappy", **kwargs)
else:
process_map(
functools.partial(rewrite_to_parquet, compression="snappy", **kwargs),
files,
cdc_snappy_files,
max_workers=max_processes,
)
column_titles = [
"Total Bytes",
"Chunk Bytes",
"Compressed Chunk Bytes",
"Transmitted XTool Bytes",
]
inputs = {}
if with_json:
inputs["JSONLines"] = json_files
if with_sqlite:
inputs["SQLite"] = sqlite_files
inputs["Parquet"] = files
if not skip_zstd:
inputs["CDC ZSTD"] = cdc_zstd_files
if not skip_snappy:
inputs["CDC Snappy"] = cdc_snappy_files
results = []
for title, paths in inputs.items():
print(f"Estimating deduplication for {title}")
results.append({"title": title, **estimate_de(paths), **estimate_xtool(paths)})
pretty_print_stats(results)
# plot the results using plotly with bars grouped by metric
y_keys = [
"total_len",
"chunk_bytes",
"compressed_chunk_bytes",
"transmitted_xtool_bytes",
]
fig = go.Figure(
data=[
go.Bar(
name=column_title,
x=[r["title"] for r in results],
y=[r[y_keys[i]] for r in results],
)
for i, column_title in enumerate(column_titles)
]
)
fig.update_layout(barmode="group", yaxis=dict(tickformat=".2s", title="Bytes"))
fig.show()
@cli.command()
@click.argument("files", nargs=-1, type=click.Path(exists=True))
def dedup(files):
result_de = estimate_de(files)
result_xtool = estimate_xtool(files)
dedup_ratio = result_de["chunk_bytes"] / result_de["total_len"]
print(
f"Deduplication ratio: {dedup_ratio:.2%} ({naturalsize(result_de['chunk_bytes'])} / {naturalsize(result_de['total_len'])})"
)
print(
f"XTool deduplication ratio: {result_xtool['transmitted_xtool_bytes'] / result_de['total_len']:.2%} ({naturalsize(result_xtool['transmitted_xtool_bytes'])} / {naturalsize(result_de['total_len'])})"
)
@cli.command()
@click.argument("files", nargs=-1, type=click.Path(exists=True))
def rewrite(files):
for path in files:
path = Path(path)
out = path.with_name(path.stem + "-dedup.parquet")
rewrite_to_parquet(path, out, use_content_defined_chunking=True)
@cli.command()
@click.argument("template", type=click.Path(exists=True))
def render_readme(template):
# open the README file and render it using jinja2
from jinja2 import Template
readme = Path(template)
content = Template(readme.read_text()).render()
readme.with_suffix("").write_text(content)
@cli.command()
@click.argument("patterns", nargs=-1, type=str)
def page_chunks(patterns):
paths = []
for pattern in patterns:
if "*" in pattern:
paths.extend(Path().rglob(pattern))
else:
paths.append(Path(pattern))
uncompressed_bytes, num_values = zip(*get_page_chunk_sizes(paths))
fig = go.Figure()
fig.add_trace(
go.Histogram(
x=uncompressed_bytes,
nbinsx=100,
name="Uncompressed Page Sizes",
marker_color="blue",
opacity=0.75,
)
)
fig.update_layout(
title="Distribution of Uncompressed Page Sizes",
xaxis_title="Value",
yaxis_title="Frequency",
barmode="overlay",
)
fig.update_xaxes(tickformat=".2s")
fig.show()
def calculate_parameter_impact(
path, directory, param_name, param_values, param_default
):
# calculate the impact of a parameter on the deduplication ratio
# by rewriting the file with different values of the parameter
# and comparing the deduplication ratios
directory.mkdir(exist_ok=True)
table = pq.read_table(path)
default_path = directory / f"{param_name}={param_default}.parquet"
if not default_path.exists():
pq.write_table(
table,
default_path,
use_content_defined_chunking=True,
**{param_name: param_default},
)
files = [default_path]
results = {}
for value in param_values:
path = directory / f"{param_name}={value}.parquet"
files.append(path)
if not path.exists():
pq.write_table(
table,
path,
use_content_defined_chunking=True,
**{param_name: value},
)
de_result = estimate_de([default_path, path])
xtool_result = estimate_xtool([default_path, path])
result = {
"total_len": de_result["total_len"],
"chunk_bytes": de_result["chunk_bytes"],
"compressed_chunk_bytes": de_result["compressed_chunk_bytes"],
"transmitted_xtool_bytes": xtool_result["transmitted_xtool_bytes"],
"dedup_ratio": de_result["chunk_bytes"] / de_result["total_len"],
"xtool_dedup_ratio": (
xtool_result["transmitted_xtool_bytes"] / de_result["total_len"]
),
}
results[value] = result
overall_de_result = estimate_de(files)
overall_xtool_result = estimate_xtool(files)
overall_result = {
"total_len": overall_de_result["total_len"],
"chunk_bytes": overall_de_result["chunk_bytes"],
"compressed_chunk_bytes": overall_de_result["compressed_chunk_bytes"],
"transmitted_xtool_bytes": overall_xtool_result["transmitted_xtool_bytes"],
"dedup_ratio": overall_de_result["chunk_bytes"]
/ overall_de_result["total_len"],
"xtool_dedup_ratio": (
overall_xtool_result["transmitted_xtool_bytes"]
/ overall_de_result["total_len"]
),
}
return results, overall_result
@cli.command
@click.argument("file", type=Path)
@click.argument("directory", type=Path)
@click.option(
"--row-group-size",
is_flag=True,
help="Calculate the impact of row group size on deduplication ratio",
)
@click.option(
"--data-page-size",
is_flag=True,
help="Calculate the impact of max page size on deduplication ratio",
)
def param_impact(file, directory, row_group_size, data_page_size):
Mi = 1024 * 1024
if row_group_size:
param_name = "row_group_size"
param_default = 2**20
param_values = [2**i for i in range(10, 22)]
elif data_page_size:
param_name = "data_page_size"
param_default = 2**20
param_values = [2**i for i in range(15, 23)]
else:
print("Please specify either --row-group-size or --max-page-size")
sys.exit(1)
results, overall_result = calculate_parameter_impact(
file, directory, param_name, param_values, param_default
)
for param_value, result in results.items():
print(
f"{param_name}: {param_value}\n"
f"Deduplication ratio: {result['dedup_ratio']:.2%} ({naturalsize(result['chunk_bytes'])} / {naturalsize(result['total_len'])})\n"
f"XTool deduplication ratio: {result['xtool_dedup_ratio']:.2%} ({naturalsize(result['transmitted_xtool_bytes'])} / {naturalsize(result['total_len'])})\n"
)
print(f"Overall deduplication ratio over {len(results)} files:")
print(
f"Overall deduplication ratio: {overall_result['dedup_ratio']:.2%} ({naturalsize(overall_result['chunk_bytes'])} / {naturalsize(overall_result['total_len'])})\n"
f"XTool overall deduplication ratio: {overall_result['xtool_dedup_ratio']:.2%} ({naturalsize(overall_result['transmitted_xtool_bytes'])} / {naturalsize(overall_result['total_len'])})\n"
)
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=param_values,
y=[result["dedup_ratio"] for result in results.values()],
mode="lines+markers",
name="DE Dedup Ratio",
marker=dict(symbol="circle"),
)
)
fig.add_trace(
go.Scatter(
x=param_values,
y=[result["xtool_dedup_ratio"] for result in results.values()],
mode="lines+markers",
name="XTool Dedup Ratio",
marker=dict(symbol="square"),
)
)
fig.update_layout(
title="Deduplication Ratios vs " + param_name,
xaxis=dict(title=param_name, type="log", dtick=1, tickformat=".2s"),
yaxis=dict(title="Deduplication Ratio", tickformat=".2%"),
legend=dict(title="Metric"),
template="plotly_white",
)
fig.show()