bigquery_etl/cli/dryrun.py (126 lines of code) (raw):
"""bigquery-etl CLI dryrun command."""
import fnmatch
import glob
import os
import re
import sys
from functools import partial
from multiprocessing.pool import Pool
from typing import List, Set, Tuple
import rich_click as click
from ..cli.utils import is_authenticated
from ..config import ConfigLoader
from ..dryrun import DryRun, get_credentials, get_id_token
@click.command(
help="""Dry run SQL.
Uses the dryrun Cloud Function by default which only has access to shared-prod.
To dryrun queries accessing tables in another project use set
`--use-cloud-function=false` and ensure that the command line has access to a
GCP service account.
Examples:
./bqetl dryrun sql/moz-fx-data-shared-prod/telemetry_derived/
# Dry run SQL with tables that are not in shared prod
./bqetl dryrun --use-cloud-function=false sql/moz-fx-data-marketing-prod/
""",
)
@click.argument(
"paths",
nargs=-1,
type=click.Path(file_okay=True),
)
@click.option(
"--use_cloud_function",
"--use-cloud-function",
help=(
"Use the Cloud Function for dry running SQL, if set to `True`. "
"The Cloud Function can only access tables in shared-prod. "
"If set to `False`, use active GCP credentials for the dry run."
),
type=bool,
default=True,
)
@click.option(
"--validate_schemas",
"--validate-schemas",
help="Require dry run schema to match destination table and file if present.",
is_flag=True,
default=False,
)
@click.option(
"--respect-skip/--ignore-skip",
help="Respect or ignore query skip configuration. Default is --respect-skip.",
default=True,
)
@click.option(
"--project",
help="GCP project to perform dry run in when --use_cloud_function=False",
default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"),
)
def dryrun(
paths: List[str],
use_cloud_function: bool,
validate_schemas: bool,
respect_skip: bool,
project: str,
):
"""Perform a dry run."""
file_names = (
"query.sql",
"view.sql",
"part*.sql",
"init.sql",
"materialized_view.sql",
)
file_re = re.compile("|".join(map(fnmatch.translate, file_names)))
sql_files: Set[str] = set()
for path in paths:
if os.path.isdir(path):
sql_files |= {
sql_file
for pattern in file_names
for sql_file in glob.glob(f"{path}/**/{pattern}", recursive=True)
}
elif os.path.isfile(path):
if file_re.fullmatch(os.path.basename(path)):
sql_files.add(path)
else:
click.echo(f"Invalid path {path}", err=True)
sys.exit(1)
if respect_skip:
sql_files -= DryRun.skipped_files()
if not sql_files:
click.echo("Skipping dry run because no queries matched")
return
if not use_cloud_function and not is_authenticated():
click.echo(
"Not authenticated to GCP. Run `gcloud auth login --update-adc` to login."
)
sys.exit(1)
credentials = get_credentials()
id_token = get_id_token(credentials=credentials)
sql_file_valid = partial(
_sql_file_valid,
use_cloud_function,
respect_skip,
validate_schemas,
credentials=credentials,
id_token=id_token,
)
with Pool(8) as p:
result = p.map(sql_file_valid, sql_files, chunksize=1)
failures = sorted([r[1] for r in result if not r[0]])
if len(failures) > 0:
click.echo(
f"Failed to validate {len(failures)} queries (see above for error messages):",
err=True,
)
click.echo("\n".join(failures), err=True)
sys.exit(1)
def _sql_file_valid(
use_cloud_function, respect_skip, validate_schemas, sqlfile, credentials, id_token
) -> Tuple[bool, str]:
"""Dry run the SQL file."""
result = DryRun(
sqlfile,
use_cloud_function=use_cloud_function,
credentials=credentials,
respect_skip=respect_skip,
id_token=id_token,
)
if validate_schemas:
try:
success = result.validate_schema()
except Exception as e: # validate_schema raises base exception
click.echo(e, err=True)
success = False
return success, sqlfile
return result.is_valid(), sqlfile