de/synthetic.py (177 lines of code) (raw):
from pathlib import Path
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from faker import Faker
from PIL import Image
import sqlite3
from .estimate import estimate_de
class DataGenerator:
def __init__(self, schema, seed=42):
self.schema = schema
self.seed = seed
def generate_table(self, num_rows):
raise NotImplementedError("Subclasses should implement this method.")
def delete_rows(self, table, edit_points, edit_size=10):
pieces = []
for start, end in zip([0] + edit_points, edit_points + [1]):
start_idx = int(start * len(table))
end_idx = int(end * len(table))
if end == 1:
pieces.append(table.slice(start_idx, end_idx - start_idx))
else:
pieces.append(table.slice(start_idx, end_idx - start_idx - edit_size))
return pa.concat_tables(pieces).combine_chunks()
def insert_rows(self, table, edit_points, edit_size=10):
pieces = []
for start, end in zip([0] + edit_points, edit_points + [1]):
start_idx = int(start * len(table))
end_idx = int(end * len(table))
pieces.append(table.slice(start_idx, end_idx - start_idx))
if end != 1:
pieces.append(self.generate_table(edit_size))
return pa.concat_tables(pieces).combine_chunks()
def append_rows(self, table, ratio):
new_part = self.generate_table(int(ratio * len(table)))
return pa.concat_tables([table, new_part]).combine_chunks()
def update_rows(self, table, edit_points, columns, edit_size=10):
df = table.to_pandas()
edits = self.generate_table(len(edit_points) * edit_size)
edits_df = edits.to_pandas()
for i, place in enumerate(edit_points):
idx = int(place * len(df))
for column in columns:
for j in range(edit_size):
edit_idx = i * edit_size + j
df.at[idx, column] = edits_df.at[edit_idx, column]
return pa.Table.from_pandas(df)
def generate_synthetic_tables(
self,
size,
edit_points=(0.5,),
append_ratio=0.05,
update_columns=None,
edit_size=10,
):
fields = list(self.schema.keys())
table = self.generate_table(size)
deleted = self.delete_rows(table, edit_points, edit_size=edit_size)
inserted = self.insert_rows(table, edit_points, edit_size=edit_size)
appended = self.append_rows(table, append_ratio)
updated = self.update_rows(table, edit_points, columns=fields)
assert len(table) == size
assert len(updated) == size
assert len(deleted) == size - edit_size * len(edit_points)
assert len(inserted) == size + edit_size * len(edit_points)
result = {
"deleted": deleted,
"inserted": inserted,
"appended": appended,
"updated": updated,
}
for key, fields in (update_columns or {}).items():
result[f"updated_{key}"] = self.update_rows(
table, edit_points, columns=fields
)
return table, result
class FakeDataGenerator(DataGenerator):
def __init__(self, schema, seed=42):
super().__init__(schema, seed)
self.fake = Faker()
self.fake.random.seed(seed)
def generate_data(self, dtype, num_samples):
if dtype in ("int", int):
return np.random.randint(0, 1_000_000, size=num_samples).tolist()
elif dtype in ("float", float):
return np.random.uniform(0, 1_000_000, size=num_samples).round(3).tolist()
elif dtype in ("str", str):
num_chars = np.random.randint(10, 200, size=num_samples)
return [self.fake.text(max_nb_chars=n_chars) for n_chars in num_chars]
elif dtype in ("largestr",):
num_chars = np.random.randint(100, 1000, size=num_samples)
return [self.fake.text(max_nb_chars=n_chars) for n_chars in num_chars]
elif dtype == ("bool", bool):
return np.random.choice([True, False], size=num_samples).tolist()
elif isinstance(dtype, dict):
columns = [
self.generate_data(field_type, num_samples)
for field_type in dtype.values()
]
return [dict(zip(dtype.keys(), row)) for row in zip(*columns)]
elif isinstance(dtype, list) and dtype:
lengths = np.random.randint(0, 5, size=num_samples)
values = self.generate_data(dtype[0], lengths.sum())
return [
values[i : i + length]
for i, length in zip(np.cumsum([0] + lengths), lengths)
]
else:
raise ValueError("Unsupported data type: {}".format(dtype))
def generate_table(self, num_rows):
data = self.generate_data(self.schema, num_rows)
arr = pa.array(data)
table = pa.Table.from_struct_array(arr)
return table
# # TODO(kszucs)
# class WikiDataSampler(DataGenerator):
# pass
def write_parquet(path, table, **kwargs):
pq.write_table(table, path, **kwargs)
readback = pq.read_table(path)
assert table.equals(readback)
def write_and_compare_parquet(
directory, original, alts, prefix, postfix, **parquet_options
):
results = []
for compression in ["none", "zstd", "snappy"]:
if compression == "none":
parquet_options["compression"] = None
else:
parquet_options["compression"] = compression
a = directory / f"{prefix}-{compression}-original-{postfix}.parquet"
write_parquet(a, original, **parquet_options)
for name, table in alts.items():
b = directory / f"{prefix}-{compression}-{name}-{postfix}.parquet"
write_parquet(b, table, **parquet_options)
result = estimate_de([a, b])
results.append(
{"kind": postfix, "edit": name, "compression": compression, **result}
)
return results
def write_and_compare_json(directory, original, alts, prefix):
results = []
original_df = original.to_pandas()
for compression in ["none", "zstd"]:
comp = None if compression == "none" else compression
a = directory / f"{prefix}-{compression}-original.jsonlines"
original_df.to_json(a, orient="records", lines=True, compression=comp)
for name, table in alts.items():
b = directory / f"{prefix}-{compression}-{name}.jsonlines"
table.to_pandas().to_json(b, orient="records", lines=True, compression=comp)
result = estimate_de([a, b])
results.append(
{"kind": "json", "edit": name, "compression": compression, **result}
)
return results
def write_and_compare_sqlite(directory, original, alts, prefix):
results = []
original_df = original.to_pandas()
for compression in ["none"]:
comp = None if compression == "none" else compression
a = directory / f"{prefix}-{compression}-original.sqlite"
con = sqlite3.connect(a)
original_df.to_sql("table", con, if_exists="replace", index=False)
for name, table in alts.items():
b = directory / f"{prefix}-{compression}-{name}.sqlite"
con = sqlite3.connect(b)
table.to_pandas().to_sql("table", con, if_exists="replace", index=False)
result = estimate_de([a, b])
results.append(
{"kind": "sqlite", "edit": name, "compression": compression, **result}
)
return results
def convert_dedupe_images_to_png(directory):
directory = Path(directory)
for ppm in directory.iterdir():
if ppm.suffix == ".ppm":
png = ppm.with_suffix(".png")
with Image.open(ppm) as img:
img.save(png, "PNG")
ppm.unlink()