mozilla_schema_generator/validate_bigquery.py (158 lines of code) (raw):
#!/usr/bin/env python3
import difflib
import json
import sys
from pathlib import Path
from shutil import copyfile
from typing import Tuple
import click
from git import Repo
BASE_DIR = Path("/app").resolve()
def compute_compact_columns(document):
def traverse(prefix, columns):
res = []
for node in columns:
name = node["name"] + (".[]" if node["mode"] == "REPEATED" else "")
dtype = node["type"]
if dtype == "RECORD":
res += traverse(f"{prefix}.{name}", node["fields"])
else:
res += [f"{prefix}.{name} {dtype}"]
return res
res = traverse("root", document)
return sorted(res)
def check_evolution(base, head, verbose=False):
def nop(*args, **kwargs):
pass
log = print if verbose else nop
a, b = set(base), set(head)
is_error = 0
# error condition
base_only = a - b
if len(base_only) > 0:
log("items removed from the base")
log("\n".join([f"-{x}" for x in base_only]))
log("")
# set the status
is_error = 1
# informative only
head_only = b - a
if len(head_only) > 0:
log("items added to the base")
log("\n".join([f"+{x}" for x in head_only]))
log("")
return is_error
def copy_schemas(head: str, repository: Path, artifact: Path) -> Path:
"""Copy BigQuery schemas to a directory as an intermediary step for schema
evolution checks."""
src = Path(repository)
repo = Repo(repository)
dst = Path(artifact) / repo.rev_parse(head).name_rev.replace(" ", "_")
dst.mkdir(parents=True, exist_ok=True)
schemas = sorted(src.glob("**/*.bq"))
if not schemas:
raise ValueError("no schemas found")
for path in schemas:
namespace = path.parts[-3]
doc = path.parts[-1]
qualified = f"{namespace}.{doc}"
click.echo(qualified)
copyfile(path, dst / qualified)
# also generate something easy to diff
cols = compute_compact_columns(json.loads(path.read_text()))
compact_filename = ".".join(qualified.split(".")[:-1]) + ".txt"
(dst / compact_filename).write_text("\n".join(cols))
return dst
def checkout_copy_schemas_revisions(
head: str, base: str, repository: Path, artifact: Path
) -> Tuple[Path, Path]:
"""Checkout two revisions of the schema repository into the artifact
directory. This returns paths to the head and the base directories."""
repo = Repo(repository)
if repo.is_dirty():
raise ValueError("the repo is dirty, stash any changes and try again")
head_path = None
base_path = None
# get the head to the closest symbolic reference
current_ref = repo.git.rev_parse("HEAD", abbrev_ref=True)
# note: if we try using --abbrev-ref on something like
# `generated-schemas~1`, we may end up with an empty string. We should
# fallback to the commit-hash if used.
head_rev = repo.rev_parse(head).hexsha
base_rev = repo.rev_parse(base).hexsha
try:
repo.git.checkout(head_rev)
head_path = copy_schemas(head_rev, repository, artifact)
repo.git.checkout(base_rev)
base_path = copy_schemas(base_rev, repository, artifact)
finally:
repo.git.checkout(current_ref)
return head_path, base_path
def parse_incompatibility_allowlist(allowlist: Path) -> list:
res = []
if not allowlist or not allowlist.exists():
return res
lines = [line.strip() for line in allowlist.read_text().split("\n")]
for line in lines:
if not line or line.startswith("#"):
continue
res.append(line)
return res
@click.group()
def validate():
"""Click command group."""
@validate.command()
@click.option("--head", type=str, default="local-working-branch")
@click.option("--base", type=str, default="generated-schemas")
@click.option(
"--repository",
type=click.Path(exists=True, file_okay=False),
default=BASE_DIR / "mozilla-pipeline-schemas",
)
@click.option(
"--artifact",
type=click.Path(file_okay=False),
default=BASE_DIR / "validate_schema_evolution",
)
@click.option(
"--incompatibility-allowlist",
type=click.Path(dir_okay=False),
help="newline delimited globs of schemas with allowed schema incompatibilities",
default=BASE_DIR / "mozilla-schema-generator/incompatibility-allowlist",
)
def local_validation(head, base, repository, artifact, incompatibility_allowlist):
"""Validate schemas using a heuristic from the compact schemas."""
head_path, base_path = checkout_copy_schemas_revisions(
head, base, repository, artifact
)
is_error = 0
# look at the compact schemas
head_files = (head_path).glob("*.txt")
base_files = (base_path).glob("*.txt")
# also look at the exceptions
allowed_incompatibility_base_files = []
if incompatibility_allowlist:
for glob in parse_incompatibility_allowlist(Path(incompatibility_allowlist)):
allowed_incompatibility_base_files += list((base_path).glob(f"{glob}.txt"))
a = set([p.name for p in base_files])
b = set([p.name for p in head_files])
allowed_incompatibility = set([p.name for p in allowed_incompatibility_base_files])
# Check that we're not removing any schemas. If there are exceptions, we
# remove this from the base set before checking for evolution.
if allowed_incompatibility:
print("allowing incompatible changes in the following documents:")
print("\n".join([f"\t{x}" for x in allowed_incompatibility]))
is_error |= check_evolution((a - allowed_incompatibility), b, verbose=True)
for schema_name in a & b:
base = base_path / schema_name
head = head_path / schema_name
base_data = base.read_text().split("\n")
head_data = head.read_text().split("\n")
diff = "\n".join(
# control lines contain a newline at the end
[
line.strip()
for line in difflib.unified_diff(
base_data,
head_data,
fromfile=base.as_posix(),
tofile=head.as_posix(),
n=1,
)
]
)
if not diff:
# no difference detected
continue
# check if this is an error condition
print(diff + "\n")
err_code = check_evolution(base_data, head_data)
if err_code and schema_name in allowed_incompatibility:
print("found incompatible changes, but continuing")
continue
is_error |= err_code
if not is_error:
click.echo("no incompatible changes detected")
else:
click.echo("found incompatible changes")
sys.exit(is_error)
if __name__ == "__main__":
validate()