bigquery_etl/cli/query.py (1,961 lines of code) (raw):

"""bigquery-etl CLI query command.""" import concurrent.futures import copy import datetime import json import logging import multiprocessing import os import re import string import subprocess import sys import tempfile from concurrent import futures from datetime import date, timedelta from functools import partial from glob import glob from multiprocessing.pool import Pool, ThreadPool from pathlib import Path from traceback import print_exc from typing import Optional import rich_click as click import sqlparse import yaml from google.cloud import bigquery from google.cloud.exceptions import NotFound from ..backfill.date_range import BackfillDateRange, get_backfill_partition from ..backfill.utils import QUALIFIED_TABLE_NAME_RE, qualified_table_name_matching from ..cli import check from ..cli.format import format from ..cli.utils import ( billing_project_option, is_authenticated, is_valid_project, no_dryrun_option, parallelism_option, paths_matching_name_pattern, project_id_option, respect_dryrun_skip_option, sql_dir_option, temp_dataset_option, use_cloud_function_option, ) from ..config import ConfigLoader from ..dependency import get_dependency_graph from ..deploy import ( FailedDeployException, SkippedDeployException, SkippedExternalDataException, deploy_table, ) from ..dryrun import DryRun, get_credentials, get_id_token from ..format_sql.format import skip_format from ..format_sql.formatter import reformat from ..metadata import validate_metadata from ..metadata.parse_metadata import ( METADATA_FILE, BigQueryMetadata, ClusteringMetadata, DatasetMetadata, Metadata, PartitionMetadata, PartitionType, ) from ..query_scheduling.dag_collection import DagCollection from ..query_scheduling.generate_airflow_dags import get_dags from ..schema import SCHEMA_FILE, Schema from ..util import extract_from_query_path from ..util.bigquery_id import sql_table_id from ..util.common import random_str from ..util.common import render as render_template from ..util.parallel_topological_sorter import ParallelTopologicalSorter from .dryrun import dryrun from .generate import generate_all QUERY_NAME_RE = re.compile(r"(?P<dataset>[a-zA-z0-9_]+)\.(?P<name>[a-zA-z0-9_]+)") VERSION_RE = re.compile(r"_v[0-9]+") DESTINATION_TABLE_RE = re.compile(r"^[a-zA-Z0-9_$]{0,1024}$") DEFAULT_DAG_NAME = "bqetl_default" DEFAULT_INIT_PARALLELISM = 10 DEFAULT_CHECKS_FILE_NAME = "checks.sql" VIEW_FILE = "view.sql" MATERIALIZED_VIEW = "materialized_view.sql" NBR_DAYS_RETAINED = 775 @click.group(help="Commands for managing queries.") @click.pass_context def query(ctx): """Create the CLI group for the query command.""" # create temporary directory generated content is written to # the directory will be deleted automatically after the command exits ctx.ensure_object(dict) ctx.obj["TMP_DIR"] = ctx.with_resource(tempfile.TemporaryDirectory()) @query.command( help="""Create a new query with name <dataset>.<query_name>, for example: telemetry_derived.active_profiles. Use the `--project_id` option to change the project the query is added to; default is `moz-fx-data-shared-prod`. Views are automatically generated in the publicly facing dataset. Examples: \b ./bqetl query create telemetry_derived.deviations_v1 \\ --owner=example@mozilla.com \b # The query version gets autocompleted to v1. Queries are created in the # _derived dataset and accompanying views in the public dataset. ./bqetl query create telemetry.deviations --owner=example@mozilla.com """, ) @click.argument("name") @sql_dir_option @project_id_option( ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod") ) @click.option( "--owner", "-o", help="Owner of the query (email address)", default="example@mozilla.com", ) @click.option( "--dag", "-d", help=( f"Name of the DAG the query should be scheduled under." "If there is no DAG name specified, the query is" f"scheduled by default in DAG {DEFAULT_DAG_NAME}." "To skip the automated scheduling use --no_schedule." "To see available DAGs run `bqetl dag info`." "To create a new DAG run `bqetl dag create`." ), default=DEFAULT_DAG_NAME, ) @click.option( "--no_schedule", "--no-schedule", help=( "Using this option creates the query without scheduling information." " Use `bqetl query schedule` to add it manually if required." ), default=False, is_flag=True, ) @click.pass_context def create(ctx, name, sql_dir, project_id, owner, dag, no_schedule): """CLI command for creating a new query.""" # create directory structure for query try: match = QUERY_NAME_RE.match(name) name = match.group("name") dataset = match.group("dataset") version = "_" + name.split("_")[-1] if not VERSION_RE.match(version): version = "_v1" else: name = "_".join(name.split("_")[:-1]) except AttributeError: click.echo( "New queries must be named like:" + " <dataset>.<table> or <dataset>.<table>_v[n]" ) sys.exit(1) derived_path = None view_path = None path = Path(sql_dir) if dataset.endswith("_derived"): # create directory for this table derived_path = path / project_id / dataset / (name + version) derived_path.mkdir(parents=True) # create a directory for the corresponding view view_path = path / project_id / dataset.replace("_derived", "") / name # new versions of existing tables may already have a view view_path.mkdir(parents=True, exist_ok=True) else: # check if there is a corresponding derived dataset if (path / project_id / (dataset + "_derived")).exists(): derived_path = path / project_id / (dataset + "_derived") / (name + version) derived_path.mkdir(parents=True) view_path = path / project_id / dataset / name view_path.mkdir(parents=True) dataset = dataset + "_derived" else: # some dataset that is not specified as _derived # don't automatically create views derived_path = path / project_id / dataset / (name + version) derived_path.mkdir(parents=True) click.echo(f"Created query in {derived_path}") if view_path and not (view_file := view_path / "view.sql").exists(): # Don't overwrite the view_file if it already exists click.echo(f"Created corresponding view in {view_path}") view_dataset = dataset.replace("_derived", "") view_file.write_text( reformat( f"""CREATE OR REPLACE VIEW `{project_id}.{view_dataset}.{name}` AS SELECT * FROM `{project_id}.{dataset}.{name}{version}`""" ) + "\n" ) # create query.sql file query_file = derived_path / "query.sql" query_file.write_text( reformat( f"""-- Query for {dataset}.{name}{version} -- For more information on writing queries see: -- https://docs.telemetry.mozilla.org/cookbooks/bigquery/querying.html SELECT * FROM table WHERE submission_date = @submission_date""" ) + "\n" ) # create default metadata.yaml metadata_file = derived_path / "metadata.yaml" metadata = Metadata( friendly_name=string.capwords(name.replace("_", " ")), description="Please provide a description for the query", owners=[owner], labels={"incremental": True}, bigquery=BigQueryMetadata( time_partitioning=PartitionMetadata(field="", type=PartitionType.DAY), clustering=ClusteringMetadata(fields=[]), ), require_column_descriptions=True, ) metadata.write(metadata_file) dataset_metadata_file = derived_path.parent / "dataset_metadata.yaml" if not dataset_metadata_file.exists(): dataset_name = str(dataset_metadata_file.parent.name) dataset_metadata = DatasetMetadata( friendly_name=string.capwords(dataset_name.replace("_", " ")), description="Please provide a description for the dataset", dataset_base_acl="derived", user_facing=False, ) dataset_metadata.write(dataset_metadata_file) click.echo(f"Created dataset metadata in {dataset_metadata_file}") if view_path: dataset_metadata_file = view_path.parent / "dataset_metadata.yaml" if not dataset_metadata_file.exists(): dataset_name = str(dataset_metadata_file.parent.name) dataset_metadata = DatasetMetadata( friendly_name=string.capwords(dataset_name.replace("_", " ")), description="Please provide a description for the dataset", dataset_base_acl="view", user_facing=True, ) dataset_metadata.write(dataset_metadata_file) click.echo(f"Created dataset metadata in {dataset_metadata_file}") if no_schedule: click.echo( click.style( "WARNING: This query has been created without " "scheduling information. Use `bqetl query schedule`" " to manually add it to a DAG or " "`bqetl query create --help` for more options.", fg="yellow", ) ) else: ctx.invoke(schedule, name=derived_path, dag=dag) @query.command( help="""Schedule an existing query Examples: \b ./bqetl query schedule telemetry_derived.deviations_v1 \\ --dag=bqetl_deviations \b # Set a specific name for the task ./bqetl query schedule telemetry_derived.deviations_v1 \\ --dag=bqetl_deviations \\ --task-name=deviations """, ) @click.argument("name") @sql_dir_option @project_id_option() @click.option( "--dag", "-d", help=( "Name of the DAG the query should be scheduled under. " "To see available DAGs run `bqetl dag info`. " "To create a new DAG run `bqetl dag create`." ), ) @click.option( "--depends_on_past", "--depends-on-past", help="Only execute query if previous scheduled run succeeded.", default=False, type=bool, ) @click.option( "--task_name", "--task-name", help=( "Custom name for the Airflow task. By default the task name is a " "combination of the dataset and table name." ), ) def schedule(name, sql_dir, project_id, dag, depends_on_past, task_name): """CLI command for scheduling a query.""" query_files = paths_matching_name_pattern(name, sql_dir, project_id) if query_files == []: click.echo(f"Name doesn't refer to any queries: {name}", err=True) sys.exit(1) sql_dir = Path(sql_dir) dags = DagCollection.from_file(sql_dir.parent / "dags.yaml") for query_file in query_files: try: metadata = Metadata.of_query_file(query_file) except FileNotFoundError: click.echo(f"Cannot schedule {query_file}. No metadata.yaml found.") continue if dag: # check if DAG already exists existing_dag = dags.dag_by_name(dag) if not existing_dag: click.echo( ( f"DAG {dag} does not exist. " "To see available DAGs run `bqetl dag info`. " "To create a new DAG run `bqetl dag create`." ), err=True, ) sys.exit(1) # write scheduling information to metadata file metadata.scheduling = {} metadata.scheduling["dag_name"] = dag if depends_on_past: metadata.scheduling["depends_on_past"] = depends_on_past if task_name: metadata.scheduling["task_name"] = task_name metadata.write(query_file.parent / METADATA_FILE) logging.info( f"Updated {query_file.parent / METADATA_FILE} with scheduling" " information. For more information about scheduling queries see: " "https://github.com/mozilla/bigquery-etl#scheduling-queries-in-airflow" ) # update dags since new task has been added dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir) else: dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir) if metadata.scheduling == {}: click.echo(f"No scheduling information for: {query_file}", err=True) sys.exit(1) @query.command( help="""Get information about all or specific queries. Examples: \b # Get info for specific queries ./bqetl query info telemetry_derived.* \b # Get cost and last update timestamp information ./bqetl query info telemetry_derived.clients_daily_v6 \\ --cost --last_updated """, ) @click.argument("name", required=False) @sql_dir_option @project_id_option() @click.pass_context def info(ctx, name, sql_dir, project_id): """Return information about all or specific queries.""" if name is None: name = "*.*" query_files = paths_matching_name_pattern(name, sql_dir, project_id) if query_files == []: # run SQL generators if no matching query has been found ctx.invoke( generate_all, output_dir=ctx.obj["TMP_DIR"], ignore=["derived_view_schemas", "stable_views"], ) query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) if query_files == []: raise click.ClickException(f"No queries matching `{name}` were found.") for query_file in query_files: query_file_path = Path(query_file) table = query_file_path.parent.name dataset = query_file_path.parent.parent.name project = query_file_path.parent.parent.parent.name try: metadata = Metadata.of_query_file(query_file) except FileNotFoundError: metadata = None click.secho(f"{project}.{dataset}.{table}", bold=True) click.echo(f"path: {query_file}") if metadata is None: click.echo("No metadata") else: click.echo(f"description: {metadata.description}") click.echo(f"owners: {metadata.owners}") if metadata.scheduling == {}: click.echo("scheduling: not scheduled") else: click.echo("scheduling:") click.echo(f" dag_name: {metadata.scheduling['dag_name']}") # TODO: Add costs and last_updated info click.echo("") def _parse_parameter(parameter: str, param_date: str) -> str: # TODO: Parse more complex parameters such as macro_ds_add param_name, param_type, param_value = parameter.split(":") if param_type == "DATE" and param_value != "{{ds}}": raise ValueError(f"Unable to parse parameter {parameter}") return f"--parameter={parameter.replace('{{ds}}', param_date)}" def _backfill_query( query_file_path, project_id, date_partition_parameter, date_partition_offset, max_rows, dry_run, scheduling_parameters, args, partitioning_type, backfill_date, destination_table, run_checks, checks_file_name, billing_project, ): """Run a query backfill for a specific date.""" project, dataset, table = extract_from_query_path(query_file_path) if destination_table is None: destination_table = f"{project}.{dataset}.{table}" # For partitioned tables, get the partition to write to the correct destination: if partitioning_type is not None: if ( partition := get_backfill_partition( backfill_date, date_partition_parameter, date_partition_offset, partitioning_type, ) ) is not None: destination_table = f"{destination_table}${partition}" if not QUALIFIED_TABLE_NAME_RE.match(destination_table): click.echo("Destination table must be named like: <project>.<dataset>.<table>") sys.exit(1) backfill_date_str = backfill_date.strftime("%Y-%m-%d") query_parameters = [ _parse_parameter(param, backfill_date_str) for param in scheduling_parameters ] if date_partition_parameter is not None: offset_param = backfill_date + timedelta(days=date_partition_offset) query_parameters.append( f"--parameter={date_partition_parameter}:DATE:{offset_param.strftime('%Y-%m-%d')}" ) arguments = ( [ "query", "--use_legacy_sql=false", "--replace", f"--max_rows={max_rows}", f"--project_id={project_id}", "--format=none", ] + args + query_parameters ) if dry_run: arguments += ["--dry_run"] click.echo( f"Backfill run: {backfill_date_str} " f"Destination_table: {destination_table} " f"Scheduling Parameters: {query_parameters}" ) _run_query( [query_file_path], project_id=project_id, dataset_id=dataset, destination_table=destination_table, public_project_id=ConfigLoader.get( "default", "public_project", fallback="mozilla-public-data" ), query_arguments=arguments, billing_project=billing_project, ) # Run checks on the query checks_file = query_file_path.parent / checks_file_name if run_checks and checks_file.exists(): table_name = checks_file.parent.name # query_args have things like format, which we don't want to push # to the check; so we just take the query parameters check_args = [qa for qa in arguments if qa.startswith("--parameter")] check._run_check( checks_file=checks_file, project_id=project_id, dataset_id=dataset, table=table_name, query_arguments=check_args, dry_run=dry_run, ) return True @query.command( help="""Run a backfill for a query. Additional parameters will get passed to bq. Examples: \b # Backfill for specific date range # second comment line ./bqetl query backfill telemetry_derived.ssl_ratios_v1 \\ --start_date=2021-03-01 \\ --end_date=2021-03-31 \b # Dryrun backfill for specific date range and exclude date ./bqetl query backfill telemetry_derived.ssl_ratios_v1 \\ --start_date=2021-03-01 \\ --end_date=2021-03-31 \\ --exclude=2021-03-03 \\ --dry_run """, context_settings=dict( ignore_unknown_options=True, allow_extra_args=True, ), ) @click.argument("name") @sql_dir_option @project_id_option(required=True) @billing_project_option() @click.option( "--start_date", "--start-date", "-s", help="First date to be backfilled", type=click.DateTime(formats=["%Y-%m-%d"]), required=True, ) @click.option( "--end_date", "--end-date", "-e", help="Last date to be backfilled", type=click.DateTime(formats=["%Y-%m-%d"]), default=str(date.today()), ) @click.option( "--exclude", "-x", multiple=True, help="Dates excluded from backfill. Date format: yyyy-mm-dd", default=[], ) @click.option( "--dry_run/--no_dry_run", "--dry-run/--no-dry-run", help="Dry run the backfill" ) @click.option( "--max_rows", "-n", type=int, default=100, help="How many rows to return in the result", ) @click.option( "--parallelism", "-p", type=int, default=8, help="How many threads to run backfill in parallel", ) @click.option( "--destination_table", "--destination-table", required=False, help=( "Destination table name results are written to. " + "If not set, determines destination table based on query." ), ) @click.option( "--checks/--no-checks", help="Whether to run checks during backfill", default=False ) @click.option( "--custom_query_path", "--custom-query-path", help="Name of a custom query to run the backfill. If not given, the proces runs as usual.", default=None, ) @click.option( "--checks_file_name", "--checks_file_name", help="Name of a custom data checks file to run after each partition backfill. E.g. custom_checks.sql. Optional.", default=None, ) @click.option( "--scheduling_overrides", "--scheduling-overrides", required=False, type=str, default="{}", help=( "Pass overrides as a JSON string for scheduling sections: " "parameters and/or date_partition_parameter as needed." ), ) @click.option( "--override-retention-range-limit", required=False, type=bool, is_flag=True, help="True to allow running a backfill outside the retention policy limit.", default=False, ) @click.pass_context def backfill( ctx, name, sql_dir, project_id, billing_project, start_date, end_date, exclude, dry_run, max_rows, parallelism, destination_table, checks, checks_file_name, custom_query_path, scheduling_overrides, override_retention_range_limit, ): """Run a backfill.""" if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login --update-adc` " "and check that the project is set correctly." ) sys.exit(1) # If override retention policy is False, and the start date is less than NBR_DAYS_RETAINED if ( not override_retention_range_limit and start_date.date() < date.today() - timedelta(days=NBR_DAYS_RETAINED) ): # Exit - cannot backfill due to risk of losing data click.echo( f"Cannot backfill more than {NBR_DAYS_RETAINED} days prior to current date due to retention policies" ) sys.exit(1) # If override retention policy is true, continue to run the backfill if override_retention_range_limit: click.echo("Over-riding retention limit - ensure data exists in source tables") if custom_query_path: query_files = paths_matching_name_pattern( custom_query_path, sql_dir, project_id ) else: query_files = paths_matching_name_pattern(name, sql_dir, project_id) if query_files == []: if custom_query_path: click.echo(f"Custom query file '{custom_query_path}' not found in {name}") sys.exit(1) # run SQL generators if no matching query has been found ctx.invoke( generate_all, output_dir=ctx.obj["TMP_DIR"], ignore=["derived_view_schemas", "stable_views", "country_code_lookup"], ) query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) if query_files == []: raise click.ClickException(f"No queries matching `{name}` were found.") for query_file in query_files: query_file_path = Path(query_file) try: metadata = Metadata.of_query_file(str(query_file_path)) except FileNotFoundError: click.echo(f"Can't run backfill without metadata for {query_file_path}.") continue depends_on_past = metadata.scheduling.get("depends_on_past", False) # If date_partition_parameter isn't set it's assumed to be submission_date: # https://github.com/mozilla/telemetry-airflow/blob/dbc2782fa23a34ae8268e7788f9621089ac71def/utils/gcp.py#L194C48-L194C48 # adding copy logic for cleaner handling of overrides scheduling_metadata = metadata.scheduling.copy() scheduling_metadata.update(json.loads(scheduling_overrides)) date_partition_parameter = scheduling_metadata.get( "date_partition_parameter", "submission_date" ) scheduling_parameters = scheduling_metadata.get("parameters", []) date_partition_offset = scheduling_metadata.get("date_partition_offset", 0) partitioning_type = None if metadata.bigquery and metadata.bigquery.time_partitioning: partitioning_type = metadata.bigquery.time_partitioning.type date_range = BackfillDateRange( start_date.date(), end_date.date(), excludes=[date.fromisoformat(x) for x in exclude], range_type=partitioning_type or PartitionType.DAY, ) if depends_on_past and exclude: click.echo( f"Warning: depends_on_past = True for {query_file_path} but the" f"following dates will be excluded from the backfill: {exclude}" ) client = bigquery.Client(project=project_id) try: project, dataset, table = extract_from_query_path(query_file_path) client.get_table(f"{project}.{dataset}.{table}") except NotFound: ctx.invoke( initialize, name=query_file, dry_run=dry_run, billing_project=billing_project, ) backfill_query = partial( _backfill_query, query_file_path, project_id, date_partition_parameter, date_partition_offset, max_rows, dry_run, scheduling_parameters, ctx.args, partitioning_type, destination_table=destination_table, run_checks=checks, checks_file_name=checks_file_name or DEFAULT_CHECKS_FILE_NAME, billing_project=billing_project, ) if not depends_on_past and parallelism > 0: # run backfill for dates in parallel if depends_on_past is false failed_backfills = [] with futures.ThreadPoolExecutor(max_workers=parallelism) as executor: future_to_date = { executor.submit(backfill_query, backfill_date): backfill_date for backfill_date in date_range } for future in futures.as_completed(future_to_date): backfill_date = future_to_date[future] try: future.result() except Exception as e: # TODO: More specific exception(s) print(f"Encountered exception {e}: {backfill_date}.") failed_backfills.append(backfill_date) else: print(f"Completed processing: {backfill_date}.") if failed_backfills: raise RuntimeError( f"Backfill processing failed for the following backfill dates: {failed_backfills}" ) else: # if data depends on previous runs, then execute backfill sequentially for backfill_date in date_range: backfill_query(backfill_date) @query.command( help="""Run a query. Additional parameters will get passed to bq.<br /> If a destination_table is set, the query result will be written to BigQuery. Without a destination_table specified, the results are not stored.<br /> If the `name` is not found within the `sql/` folder bqetl assumes it hasn't been generated yet and will start the generating process for all `sql_generators/` files. This generation process will take some time and run dryrun calls against BigQuery but this is expected. <br /> Additional parameters (all parameters that are not specified in the Options) must come after the query-name. Otherwise the first parameter that is not an option is interpreted as the query-name and since it can't be found the generation process will start. Examples: \b # Run a query by name ./bqetl query run telemetry_derived.ssl_ratios_v1 \b # Run a query file ./bqetl query run /path/to/query.sql \b # Run a query and save the result to BigQuery ./bqetl query run telemetry_derived.ssl_ratios_v1 \ --project_id=moz-fx-data-shared-prod \ --dataset_id=telemetry_derived \ --destination_table=ssl_ratios_v1 """, context_settings=dict( ignore_unknown_options=True, allow_extra_args=True, ), ) @click.argument("name") @sql_dir_option @project_id_option() @billing_project_option() @click.option( "--public_project_id", "--public-project-id", default=ConfigLoader.get( "default", "public_project", fallback="mozilla-public-data" ), help="Project with publicly accessible data", ) @click.option( "--destination_table", "--destination-table", required=False, help=( "Destination table name results are written to. " + "If not set, the query result will not be written to BigQuery." ), ) @click.option( "--dataset_id", "--dataset-id", required=False, help=( "Destination dataset results are written to. " + "If not set, determines destination dataset based on query." ), ) @click.pass_context def run( ctx, name, sql_dir, project_id, billing_project, public_project_id, destination_table, dataset_id, ): """Run a query.""" if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login --update-adc` " "and check that the project is set correctly." ) sys.exit(1) query_files = paths_matching_name_pattern(name, sql_dir, project_id) if query_files == []: # run SQL generators if no matching query has been found ctx.invoke( generate_all, output_dir=ctx.obj["TMP_DIR"], ignore=["derived_view_schemas", "stable_views", "country_code_lookup"], ) query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) if query_files == []: raise click.ClickException(f"No queries matching `{name}` were found.") _run_query( query_files, project_id, public_project_id, destination_table, dataset_id, ctx.args, billing_project=billing_project, ) def _run_query( query_files, project_id, public_project_id, destination_table, dataset_id, query_arguments, addl_templates: Optional[dict] = None, billing_project: Optional[str] = None, ): """Run a query. project_id is the default project to use with table/view/udf references in the query that do not have a project id qualifier. billing_project is the project to run the query in for the purposes of billing and slot reservation selection. This is project_id if billing_project is not set """ if billing_project is not None: query_arguments.append(f"--project_id={billing_project}") elif project_id is not None: query_arguments.append(f"--project_id={project_id}") if addl_templates is None: addl_templates = {} for query_file in query_files: use_public_table = False query_file = Path(query_file) try: metadata = Metadata.of_query_file(query_file) if metadata.is_public_bigquery(): if not validate_metadata.validate_public_data(metadata, query_file): sys.exit(1) # change the destination table to write results to the public dataset; # a view to the public table in the internal dataset is created # when CI runs if ( dataset_id is not None and destination_table is not None and re.match(DESTINATION_TABLE_RE, destination_table) ): destination_table = "{}:{}.{}".format( public_project_id, dataset_id, destination_table ) query_arguments.append( "--destination_table={}".format(destination_table) ) use_public_table = True else: print( "ERROR: Cannot run public dataset query. Parameters" " --destination_table=<table without dataset ID> and" " --dataset_id=<dataset> required" ) sys.exit(1) except yaml.YAMLError as e: logging.error(e) sys.exit(1) except FileNotFoundError: logging.warning("No metadata.yaml found for %s", query_file) if not use_public_table and destination_table is not None: # destination table was parsed by argparse, however if it wasn't modified to # point to a public table it needs to be passed as parameter for the query if re.match(QUALIFIED_TABLE_NAME_RE, destination_table): project, dataset, table = qualified_table_name_matching( destination_table ) destination_table = "{}:{}.{}".format(project, dataset, table) elif billing_project is not None: # add project and dataset to destination table if it isn't qualified if project_id is None or dataset_id is None: raise ValueError( "Cannot determine destination table without project_id and dataset_id" ) destination_table = "{}:{}.{}".format( project_id, dataset_id, destination_table ) query_arguments.append("--destination_table={}".format(destination_table)) if bool(list(filter(lambda x: x.startswith("--parameter"), query_arguments))): # need to do this as parameters are not supported with legacy sql query_arguments.append("--use_legacy_sql=False") # this assumed query command should always be passed inside query_arguments if "query" not in query_arguments: query_arguments = ["query"] + query_arguments query_text = render_template( query_file.name, template_folder=str(query_file.parent), templates_dir="", format=False, **addl_templates, ) # create a session, setting default project and dataset for the query # this is needed if the project the query is run in (billing_project) doesn't match the # project directory the query is in if billing_project is not None and billing_project != project_id: default_project, default_dataset, _ = extract_from_query_path(query_file) session_id = create_query_session( session_project=billing_project, default_project=project_id or default_project, default_dataset=dataset_id or default_dataset, ) query_arguments.append(f"--session_id={session_id}") # temp udfs cannot be used in a session when destination table is set if destination_table is not None and query_file.name != "script.sql": query_text = extract_and_run_temp_udfs( query_text=query_text, project_id=billing_project, session_id=session_id, ) # if billing_project is set, default dataset is set with the @@dataset_id variable instead elif dataset_id is not None: # dataset ID was parsed by argparse but needs to be passed as parameter # when running the query query_arguments.append(f"--dataset_id={dataset_id}") # write rendered query to a temporary file; # query string cannot be passed directly to bq as SQL comments will be interpreted as CLI arguments with tempfile.NamedTemporaryFile(mode="w+") as query_stream: query_stream.write(query_text) query_stream.seek(0) # run the query as shell command so that passed parameters can be used as is subprocess.check_call(["bq"] + query_arguments, stdin=query_stream) def create_query_session( session_project: str, default_project: Optional[str] = None, default_dataset: Optional[str] = None, ): """Create a bigquery session and return the session id. Optionally set the system variables @@dataset_project_id and @@dataset_id if project_id and dataset_id are given. This sets the default project_id or dataset_id for table/view/udf references that do not have a project or dataset qualifier. :param session_project: Project to create the session in :param default_project: Optional project to use in queries for object that do not have a project qualifier. :param default_dataset: Optional dataset to use in queries for object that do not have a project qualifier. """ query_parts = [] if default_project is not None: query_parts.append(f"SET @@dataset_project_id = '{default_project}'") if default_dataset is not None: query_parts.append(f"SET @@dataset_id = '{default_dataset}'") if len(query_parts) == 0: # need to run a non-empty query session_query = "SELECT 1" else: session_query = ";\n".join(query_parts) client = bigquery.Client(project=session_project) job_config = bigquery.QueryJobConfig( create_session=True, use_legacy_sql=False, ) job = client.query(session_query, job_config) job.result() if job.session_info is None: raise RuntimeError(f"Failed to get session id with job id {job.job_id}") return job.session_info.session_id def extract_and_run_temp_udfs(query_text: str, project_id: str, session_id: str) -> str: """Create temp udfs in the session and return the query without udf definitions. Does not support dry run because the query will fail dry run if udfs aren't defined. """ sql_statements = sqlparse.split(query_text) if len(sql_statements) == 1: return query_text client = bigquery.Client(project=project_id) job_config = bigquery.QueryJobConfig( use_legacy_sql=False, connection_properties=[bigquery.ConnectionProperty("session_id", session_id)], ) # assume query files only have temp udfs as additional statements udf_def_statement = "\n".join(sql_statements[:-1]) client.query_and_wait(udf_def_statement, job_config=job_config) return sql_statements[-1] @query.command( help="""Run a multipart query. Examples: \b # Run a multipart query ./bqetl query run_multipart /path/to/query.sql """, context_settings=dict( ignore_unknown_options=True, allow_extra_args=True, ), ) @click.argument( "query_dir", type=click.Path(file_okay=False), ) @click.option( "--using", default="document_id", help="comma separated list of join columns to use when combining results", ) @click.option( "--parallelism", default=4, type=int, help="Maximum number of queries to execute concurrently", ) @click.option( "--dataset_id", "--dataset-id", help="Default dataset, if not specified all tables must be qualified with dataset", ) @project_id_option() @temp_dataset_option() @click.option( "--destination_table", required=True, help="table where combined results will be written", ) @click.option( "--time_partitioning_field", type=lambda f: bigquery.TimePartitioning(field=f), help="time partition field on the destination table", ) @click.option( "--clustering_fields", type=lambda f: f.split(","), help="comma separated list of clustering fields on the destination table", ) @click.option( "--dry_run", "--dry-run", is_flag=True, default=False, help="Print bytes that would be processed for each part and don't run queries", ) @click.option( "--parameters", "--parameter", multiple=True, default=[], type=lambda p: bigquery.ScalarQueryParameter(*p.split(":", 2)), metavar="NAME:TYPE:VALUE", help="query parameter(s) to pass when running parts", ) @click.option( "--priority", default="INTERACTIVE", type=click.Choice(["BATCH", "INTERACTIVE"]), help=( "Priority for BigQuery query jobs; BATCH priority will significantly slow " "down queries if reserved slots are not enabled for the billing project; " "defaults to INTERACTIVE" ), ) @click.option( "--schema_update_options", "--schema_update_option", multiple=True, type=click.Choice( [ bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, # Airflow passes an empty string when the field addition date doesn't # match the run date. # See https://github.com/mozilla/telemetry-airflow/blob/ # e49fa7e6b3f5ec562dd248d257770c2303cf0cba/dags/utils/gcp.py#L515 "", ] ), default=[], help="Optional options for updating the schema.", ) def run_multipart( query_dir, using, parallelism, dataset_id, project_id, temp_dataset, destination_table, time_partitioning_field, clustering_fields, dry_run, parameters, priority, schema_update_options, ): """Run a multipart query.""" if dataset_id is not None and "." not in dataset_id and project_id is not None: dataset_id = f"{project_id}.{dataset_id}" if "." not in destination_table and dataset_id is not None: destination_table = f"{dataset_id}.{destination_table}" client = bigquery.Client(project_id) with ThreadPool(parallelism) as pool: parts = pool.starmap( _run_part, [ ( client, part, query_dir, temp_dataset, dataset_id, dry_run, parameters, priority, ) for part in sorted(next(os.walk(query_dir))[2]) if part.startswith("part") and part.endswith(".sql") ], chunksize=1, ) if not dry_run: total_bytes = sum(job.total_bytes_processed for _, job in parts) query = ( f"SELECT\n *\nFROM\n `{sql_table_id(parts[0][1].destination)}`" + "".join( f"\nFULL JOIN\n `{sql_table_id(job.destination)}`" f"\nUSING\n ({using})" for _, job in parts[1:] ) ) try: job = client.query( query=query, job_config=bigquery.QueryJobConfig( destination=destination_table, time_partitioning=time_partitioning_field, clustering_fields=clustering_fields, write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, use_legacy_sql=False, priority=priority, schema_update_options=schema_update_options, ), ) job.result() logging.info( f"Processed {job.total_bytes_processed:,d} bytes to combine results" ) total_bytes += job.total_bytes_processed logging.info(f"Processed {total_bytes:,d} bytes in total") finally: for _, job in parts: client.delete_table(sql_table_id(job.destination).split("$")[0]) logging.info(f"Deleted {len(parts)} temporary tables") def _run_part( client, part, query_dir, temp_dataset, dataset_id, dry_run, parameters, priority ): """Run a query part.""" with open(os.path.join(query_dir, part)) as sql_file: query = sql_file.read() job_config = bigquery.QueryJobConfig( destination=temp_dataset.temp_table(), default_dataset=dataset_id, use_legacy_sql=False, dry_run=dry_run, query_parameters=parameters, priority=priority, allow_large_results=True, ) job = client.query(query=query, job_config=job_config) if job.dry_run: logging.info(f"Would process {job.total_bytes_processed:,d} bytes for {part}") else: job.result() logging.info(f"Processed {job.total_bytes_processed:,d} bytes for {part}") return part, job @query.command( help="""Validate a query. Checks formatting, scheduling information and dry runs the query. Examples: ./bqetl query validate telemetry_derived.clients_daily_v6 \b # Validate query not in shared-prod ./bqetl query validate \\ --use_cloud_function=false \\ --project_id=moz-fx-data-marketing-prod \\ ga_derived.blogs_goals_v1 """, ) @click.argument("name", required=False) @sql_dir_option @project_id_option() @use_cloud_function_option @click.option( "--validate_schemas", "--validate-schemas", help="Require dry run schema to match destination table and file if present.", is_flag=True, default=False, ) @respect_dryrun_skip_option(default=False) @no_dryrun_option(default=False) @click.option("--skip_format_sql", "--skip-format-sql", is_flag=True, default=False) @click.pass_context def validate( ctx, name, sql_dir, project_id, use_cloud_function, validate_schemas, respect_dryrun_skip, no_dryrun, skip_format_sql, ): """Validate queries by dry running, formatting and checking scheduling configs.""" if name is None: name = "*.*" query_files = paths_matching_name_pattern(name, sql_dir, project_id) dataset_dirs = set() errors = [] for query in query_files: click.echo(f"Validating metadata for {query}") if not skip_format_sql: ctx.invoke(format, paths=[str(query)]) if not no_dryrun: ctx.invoke( dryrun, paths=[str(query)], use_cloud_function=use_cloud_function, project=project_id, validate_schemas=validate_schemas, respect_skip=respect_dryrun_skip, ) try: validate_metadata.validate(query.parent) except validate_metadata.MetadataValidationError as e: errors.append(str(e)) dataset_dirs.add(query.parent.parent) if no_dryrun: click.echo("Dry run skipped for query files.") for dataset_dir in dataset_dirs: try: validate_metadata.validate_datasets(dataset_dir) except validate_metadata.MetadataValidationError as e: errors.append(str(e)) if len(errors) > 0: click.echo( f"Failed to validate {len(errors)} metadata files (see above for error messages):" ) click.echo("\n".join(errors)) sys.exit(1) def _initialize_in_parallel( project, table, dataset, query_file, arguments, parallelism, sample_ids, addl_templates, billing_project, ): with ThreadPool(parallelism) as pool: # Process all sample_ids in parallel. pool.map( partial( _run_query, [query_file], project, None, table, dataset, addl_templates=addl_templates, billing_project=billing_project, ), [arguments + [f"--parameter=sample_id:INT64:{i}"] for i in sample_ids], ) @query.command( help="""Run a full backfill on the destination table for the query. Using this command will: - Create the table if it doesn't exist and run a full backfill. - Run a full backfill if the table exists and is empty. - Raise an exception if the table exists and has data, or if the table exists and the schema doesn't match the query. It supports `query.sql` files that use the is_init() pattern. To run in parallel per sample_id, include a @sample_id parameter in the query. Examples: - For init.sql files: ./bqetl query initialize telemetry_derived.ssl_ratios_v1 - For query.sql files and parallel run: ./bqetl query initialize sql/moz-fx-data-shared-prod/telemetry_derived/clients_first_seen_v2/query.sql """, ) @click.argument("name") @sql_dir_option @project_id_option() @billing_project_option() @click.option( "--dry_run/--no_dry_run", "--dry-run/--no-dry-run", help="Dry run the initialization", ) @parallelism_option(default=DEFAULT_INIT_PARALLELISM) @click.option( "--skip-existing", "--skip_existing", help="Skip initialization for existing artifacts, " "otherwise initialization is run for empty tables.", default=False, is_flag=True, ) @click.option( "--force/--noforce", help="Run the initialization even if the destination table contains data.", default=False, ) @click.pass_context def initialize( ctx, name, sql_dir, project_id, billing_project, dry_run, parallelism, skip_existing, force, ): """Create the destination table for the provided query.""" if not is_authenticated(): click.echo("Authentication required for creating tables.", err=True) sys.exit(1) if Path(name).exists(): # allow name to be a path query_files = [Path(name)] else: file_regex = re.compile( r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/" r"(?:query\.sql|init\.sql|materialized_view\.sql)$" ) query_files = paths_matching_name_pattern( name, sql_dir, project_id, file_regex=file_regex ) if not query_files: click.echo( f"Couldn't find directory matching `{name}`. Failed to initialize query.", err=True, ) sys.exit(1) def _initialize(query_file): project, dataset, destination_table = extract_from_query_path(query_file) client = bigquery.Client(project=project) full_table_id = f"{project}.{dataset}.{destination_table}" table = None sql_content = query_file.read_text() materialized_views = list( map( Path, glob(f"{query_file.parent}/**/materialized_view.sql", recursive=True), ) ) # check if the provided file can be initialized and whether existing ones should be skipped if "is_init()" in sql_content: try: table = client.get_table(full_table_id) if skip_existing: # table exists; skip initialization return if not force and table.num_rows > 0: raise click.ClickException( f"Table {full_table_id} already exists and contains data. The initialization process is terminated." " Use --force to overwrite the existing destination table." ) except NotFound: # continue with creating the table pass elif len(materialized_views) == 0: return try: # Enable initialization from query.sql files # Create the table by deploying the schema and metadata, then run the init. # This does not currently verify the accuracy of the schema or that it # matches the query. if "is_init()" in sql_content: if not table: ctx.invoke( update, name=full_table_id, sql_dir=sql_dir, project_id=project, update_downstream=False, is_init=True, ) ctx.invoke( deploy, name=full_table_id, sql_dir=sql_dir, project_id=project, force=True, respect_dryrun_skip=False, ) arguments = [ "query", "--use_legacy_sql=false", "--format=none", "--append_table", "--noreplace", ] if dry_run: arguments += ["--dry_run"] if "@sample_id" in sql_content: sample_ids = list(range(0, 100)) _initialize_in_parallel( project=project, table=full_table_id, dataset=dataset, query_file=query_file, arguments=arguments, parallelism=parallelism, sample_ids=sample_ids, addl_templates={ "is_init": lambda: True, }, billing_project=billing_project, ) else: _run_query( query_files=[query_file], project_id=project, public_project_id=None, destination_table=full_table_id, dataset_id=dataset, query_arguments=arguments, addl_templates={ "is_init": lambda: True, }, billing_project=billing_project, ) else: for file in materialized_views: with open(file) as init_file_stream: init_sql = init_file_stream.read() job_config = bigquery.QueryJobConfig( dry_run=dry_run, default_dataset=f"{project}.{dataset}", ) # only deploy materialized view if it doesn't exist # TODO: https://github.com/mozilla/bigquery-etl/issues/5804 try: materialized_view_table = client.get_table(full_table_id) # Best-effort check, don't fail if there's an error try: has_changes = materialized_view_has_changes( materialized_view_table.mview_query, init_sql ) except Exception as e: change_str = f"failed to compare changes: {e}" else: change_str = ( "sql changed" if has_changes else "sql not changed" ) click.echo( f"Skipping materialized view {full_table_id}, already exists, {change_str}" ) except NotFound: job = client.query(init_sql, job_config=job_config) if not dry_run: job.result() except Exception: print_exc() return query_file with ThreadPool(parallelism) as pool: failed_initializations = [r for r in pool.map(_initialize, query_files) if r] if len(failed_initializations) > 0: click.echo("The following tables could not be deployed:", err=True) for failed_deploy in failed_initializations: click.echo(failed_deploy, err=True) sys.exit(1) def materialized_view_has_changes(deployed_sql: str, file_sql: str) -> bool: """Return true if the sql in the materialized view file doesn't match the deployed sql.""" file_sql_formatted = sqlparse.format( re.sub( r"CREATE+(?:\s+OR\s+REPLACE)?\s+MATERIALIZED\s+VIEW.*?AS", "", file_sql, flags=re.DOTALL, ), strip_comments=True, strip_whitespace=True, ) deployed_sql_formatted = sqlparse.format( deployed_sql, strip_comments=True, strip_whitespace=True, ) return file_sql_formatted != deployed_sql_formatted @query.command( help="""Render a query Jinja template. Examples: ./bqetl query render telemetry_derived.ssl_ratios_v1 \\ --output-dir=/tmp """, context_settings=dict( ignore_unknown_options=True, allow_extra_args=True, ), ) @click.argument("name") @sql_dir_option @click.option( "--output-dir", "--output_dir", help="Output directory generated SQL is written to. " + "If not specified, rendered queries are printed to console.", type=click.Path(file_okay=False), required=False, ) @parallelism_option() def render(name, sql_dir, output_dir, parallelism): """Render a query Jinja template.""" if name is None: name = "*.*" query_files = paths_matching_name_pattern(name, sql_dir, project_id=None) resolved_sql_dir = Path(sql_dir).resolve() with Pool(parallelism) as p: p.map(partial(_render_query, output_dir, resolved_sql_dir), query_files) def _render_query(output_dir, resolved_sql_dir, query_file): table_name = query_file.parent.name dataset_id = query_file.parent.parent.name project_id = query_file.parent.parent.parent.name jinja_params = { "project_id": project_id, "dataset_id": dataset_id, "table_name": table_name, } rendered_sql = ( render_template( query_file.name, template_folder=query_file.parent, templates_dir="", format=False, **jinja_params, ) + "\n" ) if not any(s in str(query_file) for s in skip_format()): rendered_sql = reformat(rendered_sql, trailing_newline=True) if output_dir: output_file = output_dir / query_file.resolve().relative_to(resolved_sql_dir) output_file.parent.mkdir(parents=True, exist_ok=True) output_file.write_text(rendered_sql) else: click.echo(query_file) click.echo(rendered_sql) def _parse_partition_setting(partition_date): params = partition_date.split(":") if len(params) != 3: return None # Check date format try: datetime.datetime.strptime(params[2], "%Y-%m-%d").date() except ValueError: return None # Check column name if re.match(r"^\w+$", params[0]): return {params[0]: params[2]} def _validate_partition_date(ctx, param, partition_date): """Process the CLI parameter check_date and set the parameter for BigQuery.""" # Will be None if launched from Airflow. Also ctx.args is not populated at this stage. if partition_date: parsed = _parse_partition_setting(partition_date) if parsed is None: raise click.BadParameter("Format must be <column-name>::<yyyy-mm-dd>") return parsed return None def _parse_check_output(output: str) -> str: output = output.replace("\n", " ") if "ETL Data Check Failed:" in output: return f"ETL Data Check Failed:{output.split('ETL Data Check Failed:')[1]}" return output @query.group(help="Commands for managing query schemas.") def schema(): """Create the CLI group for the query schema command.""" pass @schema.command( help=""" Update the query schema based on the destination table schema and the query schema. If no schema.yaml file exists for a query, one will be created. Examples: ./bqetl query schema update telemetry_derived.clients_daily_v6 # Update schema including downstream dependencies (requires GCP) ./bqetl query schema update telemetry_derived.clients_daily_v6 --update-downstream """, ) @click.argument("name", nargs=-1) @sql_dir_option @click.option( "--project-id", "--project_id", help="GCP project ID", default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"), callback=is_valid_project, ) @click.option( "--update-downstream", "--update_downstream", help="Update downstream dependencies. GCP authentication required.", default=False, is_flag=True, ) @click.option( "--tmp-dataset", "--tmp_dataset", help="GCP datasets for creating updated tables temporarily.", default="tmp", ) @use_cloud_function_option @respect_dryrun_skip_option(default=True) @parallelism_option() @click.option( "--is-init", "--is_init", help="Indicates whether the `is_init()` condition should be set to true of false.", is_flag=True, default=False, ) def update( name, sql_dir, project_id, update_downstream, tmp_dataset, use_cloud_function, respect_dryrun_skip, parallelism, is_init, ): """CLI command for generating the query schema.""" if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login --update-adc` " "and check that the project is set correctly." ) sys.exit(1) query_files = paths_matching_name_pattern( name, sql_dir, project_id, files=["query.sql"] ) # skip updating schemas that are not to be deployed query_files = [ query_file for query_file in query_files if str(query_file) not in ConfigLoader.get("schema", "deploy", "skip", fallback=[]) ] dependency_graph = get_dependency_graph([sql_dir], without_views=True) manager = multiprocessing.Manager() tmp_tables = manager.dict({}) # order query files to make sure derived_from dependencies are resolved query_file_graph = {} for query_file in query_files: query_file_graph[query_file] = [] try: metadata = Metadata.of_query_file(str(query_file)) if metadata and metadata.schema and metadata.schema.derived_from: for derived_from in metadata.schema.derived_from: parent_queries = [ query for query in paths_matching_name_pattern( ".".join(derived_from.table), sql_dir, project_id ) ] if len(parent_queries) > 0: query_file_graph[query_file].append(parent_queries[0]) except FileNotFoundError: query_file_graph[query_file] = [] credentials = get_credentials() id_token = get_id_token(credentials=credentials) ts = ParallelTopologicalSorter( query_file_graph, parallelism=parallelism, with_follow_up=update_downstream ) ts.map( partial( _update_query_schema_with_downstream, sql_dir, project_id, tmp_dataset, dependency_graph, tmp_tables, use_cloud_function, respect_dryrun_skip, update_downstream, is_init=is_init, credentials=credentials, id_token=id_token, ) ) if len(tmp_tables) > 0: client = bigquery.Client() # delete temporary tables for _, table in tmp_tables.items(): client.delete_table(table, not_found_ok=True) def _update_query_schema_with_downstream( sql_dir, project_id, tmp_dataset, dependency_graph, tmp_tables={}, use_cloud_function=True, respect_dryrun_skip=True, update_downstream=False, query_file=None, follow_up_queue=None, is_init=False, credentials=None, id_token=None, ): try: changed = _update_query_schema( query_file, sql_dir, project_id, tmp_dataset, tmp_tables, use_cloud_function, respect_dryrun_skip, is_init, credentials, id_token, ) if update_downstream: # update downstream dependencies if changed: if not is_authenticated(): click.echo( "Cannot update downstream dependencies." "Authentication to GCP required. Run `gcloud auth login --update-adc` " "and check that the project is set correctly." ) sys.exit(1) project, dataset, table = extract_from_query_path(query_file) identifier = f"{project}.{dataset}.{table}" tmp_identifier = f"{project}.{tmp_dataset}.{table}_{random_str(12)}" # create temporary table with updated schema if identifier not in tmp_tables: schema = Schema.from_schema_file(query_file.parent / SCHEMA_FILE) schema.deploy(tmp_identifier) tmp_tables[identifier] = tmp_identifier # get downstream dependencies that will be updated in the next iteration dependencies = [ p for k, refs in dependency_graph.items() for p in paths_matching_name_pattern( k, sql_dir, project_id, files=("query.sql",) ) if identifier in refs ] for d in dependencies: click.echo(f"Update downstream dependency schema for {d}") if follow_up_queue: follow_up_queue.put(d) except Exception: print_exc() def _update_query_schema( query_file, sql_dir, project_id, tmp_dataset, tmp_tables={}, use_cloud_function=True, respect_dryrun_skip=True, is_init=False, credentials=None, id_token=None, ): """ Update the schema of a specific query file. Return True if the schema changed, False if it is unchanged. """ if respect_dryrun_skip and str(query_file) in DryRun.skipped_files(): click.echo(f"{query_file} dry runs are skipped. Cannot update schemas.") return tmp_tables = copy.deepcopy(tmp_tables) query_file_path = Path(query_file) existing_schema_path = query_file_path.parent / SCHEMA_FILE project_name, dataset_name, table_name = extract_from_query_path(query_file_path) try: metadata = Metadata.of_query_file(str(query_file_path)) except FileNotFoundError: metadata = None click.echo(f"No metadata defined for {query_file_path}") # pull in updates from parent schemas if metadata and metadata.schema and metadata.schema.derived_from: for derived_from in metadata.schema.derived_from: parent_queries = [ query for query in paths_matching_name_pattern( ".".join(derived_from.table), sql_dir, project_id ) ] if len(parent_queries) == 0: click.echo( f"derived_from query {derived_from.table} does not exist.", err=True, ) else: parent_schema = Schema.from_schema_file( parent_queries[0].parent / SCHEMA_FILE ) parent_project, parent_dataset, parent_table = extract_from_query_path( parent_queries[0] ) parent_identifier = f"{parent_project}.{parent_dataset}.{parent_table}" if parent_identifier not in tmp_tables: tmp_parent_identifier = ( f"{parent_project}.{tmp_dataset}.{parent_table}_" + random_str(12) ) parent_schema.deploy(tmp_parent_identifier) tmp_tables[parent_identifier] = tmp_parent_identifier if existing_schema_path.is_file(): existing_schema = Schema.from_schema_file(existing_schema_path) else: existing_schema = Schema.empty() existing_schema.merge(parent_schema, exclude=derived_from.exclude) # use temporary table tmp_identifier = ( f"{project_name}.{tmp_dataset}.{table_name}_{random_str(12)}" ) existing_schema.deploy(tmp_identifier) tmp_tables[f"{project_name}.{dataset_name}.{table_name}"] = ( tmp_identifier ) existing_schema.to_yaml_file(existing_schema_path) # replace temporary table references sql_content = render_template( query_file_path.name, template_folder=str(query_file_path.parent), templates_dir="", format=False, **{"is_init": lambda: is_init}, ) for orig_table, tmp_table in tmp_tables.items(): table_parts = orig_table.split(".") for i in range(len(table_parts)): if ".".join(table_parts[i:]) in sql_content: sql_content = sql_content.replace(".".join(table_parts[i:]), tmp_table) break query_schema = None try: query_schema = Schema.from_query_file( query_file_path, content=sql_content, use_cloud_function=use_cloud_function, respect_skip=respect_dryrun_skip, sql_dir=sql_dir, credentials=credentials, id_token=id_token, ) except Exception: if not existing_schema_path.exists(): click.echo( click.style( f"Cannot automatically update {query_file_path}. " f"Please update {query_file_path / SCHEMA_FILE} manually.", fg="red", ), err=True, ) return # update bigquery metadata try: client = bigquery.Client(credentials=credentials) table = client.get_table(f"{project_name}.{dataset_name}.{table_name}") metadata_file_path = query_file_path.parent / METADATA_FILE if ( table.time_partitioning and metadata and ( metadata.bigquery is None or metadata.bigquery.time_partitioning is None ) ): metadata.set_bigquery_partitioning( field=table.time_partitioning.field, partition_type=table.time_partitioning.type_.lower(), required=table.time_partitioning.require_partition_filter, expiration_days=( table.time_partitioning.expiration_ms / 86400000.0 if table.time_partitioning.expiration_ms else None ), ) click.echo(f"Partitioning metadata added to {metadata_file_path}") if ( table.clustering_fields and metadata and (metadata.bigquery is None or metadata.bigquery.clustering is None) ): metadata.set_bigquery_clustering(table.clustering_fields) click.echo(f"Clustering metadata added to {metadata_file_path}") if metadata: metadata.write(metadata_file_path) except NotFound: click.echo( f"Destination table {project_name}.{dataset_name}.{table_name} " "does not exist in BigQuery. Run bqetl query schema deploy " "<dataset>.<table> to create the destination table." ) except FileNotFoundError: click.echo( f"No metadata file for {project_name}.{dataset_name}.{table_name}." " Skip schema update." ) return partitioned_by = None try: metadata = Metadata.of_query_file(query_file_path) if metadata.bigquery and metadata.bigquery.time_partitioning: partitioned_by = metadata.bigquery.time_partitioning.field except FileNotFoundError: pass table_schema = Schema.for_table( project_name, dataset_name, table_name, partitioned_by=partitioned_by, use_cloud_function=use_cloud_function, respect_skip=respect_dryrun_skip, credentials=credentials, id_token=id_token, ) changed = True if existing_schema_path.is_file(): existing_schema = Schema.from_schema_file(existing_schema_path) old_schema = copy.deepcopy(existing_schema) if table_schema: existing_schema.merge(table_schema) if query_schema: existing_schema.merge(query_schema) existing_schema.to_yaml_file(existing_schema_path) changed = not existing_schema.equal(old_schema) else: query_schema.merge(table_schema) query_schema.to_yaml_file(existing_schema_path) click.echo(f"Schema {existing_schema_path} updated.") return changed @schema.command( help="""Deploy the query schema. Examples: ./bqetl query schema deploy telemetry_derived.clients_daily_v6 """, ) @click.argument("name", nargs=-1) @sql_dir_option @click.option( "--project-id", "--project_id", help="GCP project ID", default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"), callback=is_valid_project, ) @click.option( "--force/--noforce", help="Deploy the schema file without validating that it matches the query", default=False, ) @use_cloud_function_option @respect_dryrun_skip_option(default=True) @click.option( "--skip-existing", "--skip_existing", help="Skip updating existing tables. " + "This option ensures that only new tables get deployed.", default=False, is_flag=True, ) @click.option( "--skip-external-data", "--skip_external_data", help="Skip publishing external data, such as Google Sheets.", default=False, is_flag=True, ) @click.option( "--destination_table", "--destination-table", required=False, help=( "Destination table name results are written to. " + "If not set, determines destination table based on query. " + "Must be fully qualified (project.dataset.table)." ), ) @parallelism_option() @click.pass_context def deploy( ctx, name, sql_dir, project_id, force, use_cloud_function, respect_dryrun_skip, skip_existing, skip_external_data, destination_table, parallelism, ): """CLI command for deploying destination table schemas.""" if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login --update-adc` " "and check that the project is set correctly." ) sys.exit(1) query_files = paths_matching_name_pattern( name, sql_dir, project_id, ["query.*", "script.sql"] ) metadata_files = paths_matching_name_pattern( name, sql_dir, project_id, ["metadata.yaml"] ) if not query_files and not metadata_files: # run SQL generators if no matching query has been found ctx.invoke( generate_all, output_dir=ctx.obj["TMP_DIR"], ignore=["derived_view_schemas", "stable_views"], ) query_files = paths_matching_name_pattern( name, ctx.obj["TMP_DIR"], project_id, ["query.*", "script.sql"] ) metadata_files = paths_matching_name_pattern( name, ctx.obj["TMP_DIR"], project_id, ["metadata.yaml"] ) if not query_files and not metadata_files: raise click.ClickException(f"No queries matching `{name}` were found.") credentials = get_credentials() id_token = get_id_token(credentials=credentials) query_file_paths = [query_file.parent for query_file in query_files] metadata_files_without_query_file = [ metadata_file for metadata_file in metadata_files if metadata_file.parent not in query_file_paths and not any( file.suffix == ".sql" or file.name == "query.py" for file in metadata_file.parent.iterdir() if file.is_file() ) ] _deploy = partial( deploy_table, destination_table=destination_table, force=force, use_cloud_function=use_cloud_function, skip_existing=skip_existing, skip_external_data=skip_external_data, respect_dryrun_skip=respect_dryrun_skip, sql_dir=sql_dir, credentials=credentials, id_token=id_token, ) failed_deploys, skipped_deploys, external_deploys = [], [], [] with concurrent.futures.ThreadPoolExecutor(max_workers=parallelism) as executor: future_to_query = { executor.submit(_deploy, artifact_file): artifact_file for artifact_file in query_files + metadata_files_without_query_file if str(artifact_file) not in ConfigLoader.get("schema", "deploy", "skip", fallback=[]) } for future in futures.as_completed(future_to_query): artifact_file = future_to_query[future] try: future.result() except SkippedDeployException as e: print(f"Skipped deploy for {artifact_file}: ({e})") skipped_deploys.append(artifact_file) except FailedDeployException as e: print(f"Failed deploy for {artifact_file}: ({e})") failed_deploys.append(artifact_file) except SkippedExternalDataException as e: print(f"Skipping deploy for external data table {artifact_file}: ({e})") external_deploys.append(artifact_file) else: print(f"{artifact_file} successfully deployed!") if skipped_deploys: click.echo("The following deploys were skipped:") for skipped_deploy in skipped_deploys: click.echo(skipped_deploy) if external_deploys: click.echo("The following deploys of external data tables were skipped:") for external_deploy in external_deploys: click.echo(external_deploy) if failed_deploys: click.echo("The following tables could not be deployed:") for failed_deploy in failed_deploys: click.echo(failed_deploy) sys.exit(1) def _validate_schema_from_path( query_file_path, use_cloud_function=True, respect_dryrun_skip=True, credentials=None, id_token=None, ): """Dry Runs and validates a query schema from its path.""" return ( DryRun( query_file_path, use_cloud_function=use_cloud_function, respect_skip=respect_dryrun_skip, credentials=credentials, id_token=id_token, ).validate_schema(), query_file_path, ) @schema.command( help="""Validate the query schema Examples: ./bqetl query schema validate telemetry_derived.clients_daily_v6 """, name="validate", ) @click.argument("name") @sql_dir_option @click.option( "--project-id", "--project_id", help="GCP project ID", default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"), callback=is_valid_project, ) @use_cloud_function_option @respect_dryrun_skip_option(default=True) @click.pass_context def validate_schema( ctx, name, sql_dir, project_id, use_cloud_function, respect_dryrun_skip ): """Validate the defined query schema with the query and the destination table.""" query_files = paths_matching_name_pattern(name, sql_dir, project_id) if query_files == []: # run SQL generators if no matching query has been found ctx.invoke( generate_all, output_dir=ctx.obj["TMP_DIR"], ignore=["derived_view_schemas", "stable_views"], ) query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) if query_files == []: raise click.ClickException(f"No queries matching `{name}` were found.") credentials = get_credentials() id_token = get_id_token(credentials=credentials) _validate_schema = partial( _validate_schema_from_path, use_cloud_function=use_cloud_function, respect_dryrun_skip=respect_dryrun_skip, credentials=credentials, id_token=id_token, ) with Pool(8) as p: result = p.map(_validate_schema, query_files, chunksize=1) all_valid = True for is_valid, query_file_path in result: if is_valid is False: if all_valid: click.echo("\nSchemas for the following queries are invalid:") all_valid = False click.echo(query_file_path) if not all_valid: sys.exit(1) else: click.echo("\nAll schemas are valid.")