backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py (1,808 lines of code) (raw):

"""bigquery-etl CLI query command.""" import copy import datetime import logging import multiprocessing import os import re import string import subprocess import sys import tempfile import typing from datetime import date, timedelta from functools import partial from multiprocessing.pool import Pool, ThreadPool from pathlib import Path from tempfile import NamedTemporaryFile from timeit import default_timer from traceback import print_exc import click import yaml from dateutil.rrule import MONTHLY, rrule from google.cloud import bigquery from google.cloud.exceptions import NotFound, PreconditionFailed from ..backfill.utils import QUALIFIED_TABLE_NAME_RE, qualified_table_name_matching from ..cli.format import format from ..cli.utils import ( is_authenticated, is_valid_project, no_dryrun_option, parallelism_option, paths_matching_name_pattern, project_id_option, respect_dryrun_skip_option, sql_dir_option, temp_dataset_option, use_cloud_function_option, ) from ..config import ConfigLoader from ..dependency import get_dependency_graph from ..dryrun import DryRun from ..format_sql.format import skip_format from ..format_sql.formatter import reformat from ..metadata import validate_metadata from ..metadata.parse_metadata import ( METADATA_FILE, BigQueryMetadata, ClusteringMetadata, DatasetMetadata, ExternalDataFormat, Metadata, PartitionMetadata, PartitionType, ) from ..query_scheduling.dag_collection import DagCollection from ..query_scheduling.generate_airflow_dags import get_dags from ..schema import SCHEMA_FILE, Schema from ..util import extract_from_query_path from ..util.bigquery_id import sql_table_id from ..util.common import random_str from ..util.common import render as render_template from ..util.parallel_topological_sorter import ParallelTopologicalSorter from .dryrun import dryrun from .generate import generate_all QUERY_NAME_RE = re.compile(r"(?P<dataset>[a-zA-z0-9_]+)\.(?P<name>[a-zA-z0-9_]+)") VERSION_RE = re.compile(r"_v[0-9]+") DESTINATION_TABLE_RE = re.compile(r"^[a-zA-Z0-9_$]{0,1024}$") DEFAULT_DAG_NAME = "bqetl_default" DEFAULT_PARALLELISM = 10 @click.group(help="Commands for managing queries.") @click.pass_context def query(ctx): """Create the CLI group for the query command.""" # create temporary directory generated content is written to # the directory will be deleted automatically after the command exits ctx.ensure_object(dict) ctx.obj["TMP_DIR"] = ctx.with_resource(tempfile.TemporaryDirectory()) @query.command( help="""Create a new query with name <dataset>.<query_name>, for example: telemetry_derived.active_profiles. Use the `--project_id` option to change the project the query is added to; default is `moz-fx-data-shared-prod`. Views are automatically generated in the publicly facing dataset. Examples: \b ./bqetl query create telemetry_derived.deviations_v1 \\ --owner=example@mozilla.com \b # The query version gets autocompleted to v1. Queries are created in the # _derived dataset and accompanying views in the public dataset. ./bqetl query create telemetry.deviations --owner=example@mozilla.com """, ) @click.argument("name") @sql_dir_option @project_id_option( ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod") ) @click.option( "--owner", "-o", help="Owner of the query (email address)", default="example@mozilla.com", ) @click.option( "--init", "-i", help="Create an init.sql file to initialize the table", default=False, is_flag=True, ) @click.option( "--dag", "-d", help=( f"Name of the DAG the query should be scheduled under." "If there is no DAG name specified, the query is" f"scheduled by default in DAG {DEFAULT_DAG_NAME}." "To skip the automated scheduling use --no_schedule." "To see available DAGs run `bqetl dag info`." "To create a new DAG run `bqetl dag create`." ), default=DEFAULT_DAG_NAME, ) @click.option( "--no_schedule", "--no-schedule", help=( "Using this option creates the query without scheduling information." " Use `bqetl query schedule` to add it manually if required." ), default=False, is_flag=True, ) @click.pass_context def create(ctx, name, sql_dir, project_id, owner, init, dag, no_schedule): """CLI command for creating a new query.""" # create directory structure for query try: match = QUERY_NAME_RE.match(name) name = match.group("name") dataset = match.group("dataset") version = "_" + name.split("_")[-1] if not VERSION_RE.match(version): version = "_v1" else: name = "_".join(name.split("_")[:-1]) except AttributeError: click.echo( "New queries must be named like:" + " <dataset>.<table> or <dataset>.<table>_v[n]" ) sys.exit(1) derived_path = None view_path = None path = Path(sql_dir) if dataset.endswith("_derived"): # create a directory for the corresponding view derived_path = path / project_id / dataset / (name + version) derived_path.mkdir(parents=True) view_path = path / project_id / dataset.replace("_derived", "") / name view_path.mkdir(parents=True) else: # check if there is a corresponding derived dataset if (path / project_id / (dataset + "_derived")).exists(): derived_path = path / project_id / (dataset + "_derived") / (name + version) derived_path.mkdir(parents=True) view_path = path / project_id / dataset / name view_path.mkdir(parents=True) dataset = dataset + "_derived" else: # some dataset that is not specified as _derived # don't automatically create views derived_path = path / project_id / dataset / (name + version) derived_path.mkdir(parents=True) click.echo(f"Created query in {derived_path}") if view_path: click.echo(f"Created corresponding view in {view_path}") view_file = view_path / "view.sql" view_dataset = dataset.replace("_derived", "") view_file.write_text( reformat( f"""CREATE OR REPLACE VIEW `{project_id}.{view_dataset}.{name}` AS SELECT * FROM `{project_id}.{dataset}.{name}{version}`""" ) + "\n" ) # create query.sql file query_file = derived_path / "query.sql" query_file.write_text( reformat( f"""-- Query for {dataset}.{name}{version} -- For more information on writing queries see: -- https://docs.telemetry.mozilla.org/cookbooks/bigquery/querying.html SELECT * FROM table WHERE submission_date = @submission_date""" ) + "\n" ) # create default metadata.yaml metadata_file = derived_path / "metadata.yaml" metadata = Metadata( friendly_name=string.capwords(name.replace("_", " ")), description="Please provide a description for the query", owners=[owner], labels={"incremental": True}, bigquery=BigQueryMetadata( time_partitioning=PartitionMetadata(field="", type=PartitionType.DAY), clustering=ClusteringMetadata(fields=[]), ), ) metadata.write(metadata_file) # optionally create init.sql if init: init_file = derived_path / "init.sql" init_file.write_text( reformat( f""" -- SQL for initializing the query destination table. CREATE OR REPLACE TABLE `{ConfigLoader.get('default', 'project', fallback="moz-fx-data-shared-prod")}.{dataset}.{name}{version}` AS SELECT * FROM table""" ) + "\n" ) dataset_metadata_file = derived_path.parent / "dataset_metadata.yaml" if not dataset_metadata_file.exists(): dataset_name = str(dataset_metadata_file.parent.name) dataset_metadata = DatasetMetadata( friendly_name=string.capwords(dataset_name.replace("_", " ")), description="Please provide a description for the dataset", dataset_base_acl="derived", user_facing=False, ) dataset_metadata.write(dataset_metadata_file) click.echo(f"Created dataset metadata in {dataset_metadata_file}") if view_path: dataset_metadata_file = view_path.parent / "dataset_metadata.yaml" if not dataset_metadata_file.exists(): dataset_name = str(dataset_metadata_file.parent.name) dataset_metadata = DatasetMetadata( friendly_name=string.capwords(dataset_name.replace("_", " ")), description="Please provide a description for the dataset", dataset_base_acl="view", user_facing=True, ) dataset_metadata.write(dataset_metadata_file) click.echo(f"Created dataset metadata in {dataset_metadata_file}") if no_schedule: click.echo( click.style( "WARNING: This query has been created without " "scheduling information. Use `bqetl query schedule`" " to manually add it to a DAG or " "`bqetl query create --help` for more options.", fg="yellow", ) ) else: ctx.invoke(schedule, name=derived_path, dag=dag) @query.command( help="""Schedule an existing query Examples: \b ./bqetl query schedule telemetry_derived.deviations_v1 \\ --dag=bqetl_deviations \b # Set a specific name for the task ./bqetl query schedule telemetry_derived.deviations_v1 \\ --dag=bqetl_deviations \\ --task-name=deviations """, ) @click.argument("name") @sql_dir_option @project_id_option() @click.option( "--dag", "-d", help=( "Name of the DAG the query should be scheduled under. " "To see available DAGs run `bqetl dag info`. " "To create a new DAG run `bqetl dag create`." ), ) @click.option( "--depends_on_past", "--depends-on-past", help="Only execute query if previous scheduled run succeeded.", default=False, type=bool, ) @click.option( "--task_name", "--task-name", help=( "Custom name for the Airflow task. By default the task name is a " "combination of the dataset and table name." ), ) def schedule(name, sql_dir, project_id, dag, depends_on_past, task_name): """CLI command for scheduling a query.""" query_files = paths_matching_name_pattern(name, sql_dir, project_id) if query_files == []: click.echo(f"Name doesn't refer to any queries: {name}", err=True) sys.exit(1) sql_dir = Path(sql_dir) dags = DagCollection.from_file(sql_dir.parent / "dags.yaml") dags_to_be_generated = set() for query_file in query_files: try: metadata = Metadata.of_query_file(query_file) except FileNotFoundError: click.echo(f"Cannot schedule {query_file}. No metadata.yaml found.") continue if dag: # check if DAG already exists existing_dag = dags.dag_by_name(dag) if not existing_dag: click.echo( ( f"DAG {dag} does not exist. " "To see available DAGs run `bqetl dag info`. " "To create a new DAG run `bqetl dag create`." ), err=True, ) sys.exit(1) # write scheduling information to metadata file metadata.scheduling = {} metadata.scheduling["dag_name"] = dag if depends_on_past: metadata.scheduling["depends_on_past"] = depends_on_past if task_name: metadata.scheduling["task_name"] = task_name metadata.write(query_file.parent / METADATA_FILE) logging.info( f"Updated {query_file.parent / METADATA_FILE} with scheduling" " information. For more information about scheduling queries see: " "https://github.com/mozilla/bigquery-etl#scheduling-queries-in-airflow" ) # update dags since new task has been added dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir) dags_to_be_generated.add(dag) else: dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir) if metadata.scheduling == {}: click.echo(f"No scheduling information for: {query_file}", err=True) sys.exit(1) else: dags_to_be_generated.add(metadata.scheduling["dag_name"]) # re-run DAG generation for the affected DAG for d in dags_to_be_generated: existing_dag = dags.dag_by_name(d) logging.info(f"Running DAG generation for {existing_dag.name}") output_dir = sql_dir.parent / "dags" dags.dag_to_airflow(output_dir, existing_dag) @query.command( help="""Get information about all or specific queries. Examples: \b # Get info for specific queries ./bqetl query info telemetry_derived.* \b # Get cost and last update timestamp information ./bqetl query info telemetry_derived.clients_daily_v6 \\ --cost --last_updated """, ) @click.argument("name", required=False) @sql_dir_option @project_id_option() @click.option("--cost", help="Include information about query costs", is_flag=True) @click.option( "--last_updated", help="Include timestamps when destination tables were last updated", is_flag=True, ) @click.pass_context def info(ctx, name, sql_dir, project_id, cost, last_updated): """Return information about all or specific queries.""" if name is None: name = "*.*" query_files = paths_matching_name_pattern(name, sql_dir, project_id) if query_files == []: # run SQL generators if no matching query has been found ctx.invoke( generate_all, output_dir=ctx.obj["TMP_DIR"], ignore=["derived_view_schemas", "stable_views"], ) query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) for query_file in query_files: query_file_path = Path(query_file) table = query_file_path.parent.name dataset = query_file_path.parent.parent.name project = query_file_path.parent.parent.parent.name try: metadata = Metadata.of_query_file(query_file) except FileNotFoundError: metadata = None click.secho(f"{project}.{dataset}.{table}", bold=True) click.echo(f"path: {query_file}") if metadata is None: click.echo("No metadata") else: click.echo(f"description: {metadata.description}") click.echo(f"owners: {metadata.owners}") if metadata.scheduling == {}: click.echo("scheduling: not scheduled") else: click.echo("scheduling:") click.echo(f" dag_name: {metadata.scheduling['dag_name']}") if cost or last_updated: if not is_authenticated(): click.echo( "Authentication to GCP required for " "accessing cost and last_updated." ) else: client = bigquery.Client() end_date = date.today().strftime("%Y-%m-%d") start_date = (date.today() - timedelta(7)).strftime("%Y-%m-%d") result = client.query( f""" SELECT SUM(cost_usd) AS cost, MAX(creation_time) AS last_updated FROM `moz-fx-data-shared-prod.monitoring_derived.bigquery_etl_scheduled_queries_cost_v1` WHERE submission_date BETWEEN '{start_date}' AND '{end_date}' AND dataset = '{dataset}' AND table = '{table}' """ # noqa E501 ).result() if result.total_rows == 0: if last_updated: click.echo("last_updated: never") if cost: click.echo("Cost over the last 7 days: none") for row in result: if last_updated: click.echo(f" last_updated: {row.last_updated}") if cost: click.echo( f" Cost over the last 7 days: {round(row.cost, 2)} USD" ) click.echo("") def _backfill_query( query_file_path, project_id, date_partition_parameter, exclude, max_rows, dry_run, no_partition, args, partitioning_type, backfill_date, destination_table, ): """Run a query backfill for a specific date.""" project, dataset, table = extract_from_query_path(query_file_path) match partitioning_type: case PartitionType.DAY: partition = backfill_date.strftime("%Y%m%d") case PartitionType.MONTH: partition = backfill_date.strftime("%Y%m") case _: raise ValueError(f"Unsupported partitioning type: {partitioning_type}") backfill_date = backfill_date.strftime("%Y-%m-%d") if backfill_date not in exclude: if destination_table is None: destination_table = f"{project}.{dataset}.{table}" if not no_partition: destination_table = f"{destination_table}${partition}" if not QUALIFIED_TABLE_NAME_RE.match(destination_table): click.echo( "Destination table must be named like:" + " <project>.<dataset>.<table>" ) sys.exit(1) click.echo( f"Run backfill for {destination_table} " f"with @{date_partition_parameter}={backfill_date}" ) arguments = [ "query", f"--parameter={date_partition_parameter}:DATE:{backfill_date}", "--use_legacy_sql=false", "--replace", f"--max_rows={max_rows}", f"--project_id={project_id}", "--format=none", ] + args if dry_run: arguments += ["--dry_run"] _run_query( [query_file_path], project_id=project_id, dataset_id=dataset, destination_table=destination_table, public_project_id=ConfigLoader.get( "default", "public_project", fallback="mozilla-public-data" ), query_arguments=arguments, ) else: click.echo( f"Skip {query_file_path} with @{date_partition_parameter}={backfill_date}" ) return True @query.command( help="""Run a backfill for a query. Additional parameters will get passed to bq. Examples: \b # Backfill for specific date range # second comment line ./bqetl query backfill telemetry_derived.ssl_ratios_v1 \\ --start_date=2021-03-01 \\ --end_date=2021-03-31 \b # Dryrun backfill for specific date range and exclude date ./bqetl query backfill telemetry_derived.ssl_ratios_v1 \\ --start_date=2021-03-01 \\ --end_date=2021-03-31 \\ --exclude=2021-03-03 \\ --dry_run """, context_settings=dict( ignore_unknown_options=True, allow_extra_args=True, ), ) @click.argument("name") @sql_dir_option @project_id_option(required=True) @click.option( "--start_date", "--start-date", "-s", help="First date to be backfilled", type=click.DateTime(formats=["%Y-%m-%d"]), required=True, ) @click.option( "--end_date", "--end-date", "-e", help="Last date to be backfilled", type=click.DateTime(formats=["%Y-%m-%d"]), default=str(date.today()), ) @click.option( "--exclude", "-x", multiple=True, help="Dates excluded from backfill. Date format: yyyy-mm-dd", default=[], ) @click.option( "--dry_run/--no_dry_run", "--dry-run/--no-dry-run", help="Dry run the backfill" ) @click.option( "--max_rows", "-n", type=int, default=100, help="How many rows to return in the result", ) @click.option( "--parallelism", "-p", type=int, default=8, help="How many threads to run backfill in parallel", ) @click.option( "--no_partition", "--no-partition", is_flag=True, default=False, help="Disable writing results to a partition. Overwrites entire destination table.", ) @click.option( "--destination_table", "--destination-table", required=False, help=( "Destination table name results are written to. " + "If not set, determines destination table based on query." ), ) @click.pass_context def backfill( ctx, name, sql_dir, project_id, start_date, end_date, exclude, dry_run, max_rows, parallelism, no_partition, destination_table, ): """Run a backfill.""" if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login` " "and check that the project is set correctly." ) sys.exit(1) query_files = paths_matching_name_pattern(name, sql_dir, project_id) if query_files == []: # run SQL generators if no matching query has been found ctx.invoke( generate_all, output_dir=ctx.obj["TMP_DIR"], ignore=["derived_view_schemas", "stable_views", "country_code_lookup"], ) query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) dates = [start_date + timedelta(i) for i in range((end_date - start_date).days + 1)] for query_file in query_files: query_file_path = Path(query_file) depends_on_past = False date_partition_parameter = "submission_date" try: metadata = Metadata.of_query_file(str(query_file_path)) depends_on_past = metadata.scheduling.get( "depends_on_past", depends_on_past ) date_partition_parameter = metadata.scheduling.get( "date_partition_parameter", date_partition_parameter ) # For backwards compatibility assume partitioning type is day # in case metadata is missing if metadata.bigquery: partitioning_type = metadata.bigquery.time_partitioning.type else: partitioning_type = PartitionType.DAY click.echo( "Bigquery partitioning type not set. Using PartitionType.DAY" ) match partitioning_type: case PartitionType.DAY: dates = [ start_date + timedelta(i) for i in range((end_date - start_date).days + 1) ] case PartitionType.MONTH: dates = list( rrule( freq=MONTHLY, dtstart=start_date.replace(day=1), until=end_date, ) ) # Dates in excluded must be the first day of the month to match `dates` exclude = [ date.fromisoformat(day).replace(day=1).strftime("%Y-%m-%d") for day in exclude ] case _: raise ValueError( f"Unsupported partitioning type: {partitioning_type}" ) except FileNotFoundError: click.echo(f"No metadata defined for {query_file_path}") if depends_on_past and exclude != []: click.echo( f"Warning: depends_on_past = True for {query_file_path} but the" f"following dates will be excluded from the backfill: {exclude}" ) client = bigquery.Client(project=project_id) try: project, dataset, table = extract_from_query_path(query_file_path) client.get_table(f"{project}.{dataset}.{table}") except NotFound: ctx.invoke(initialize, name=query_file, dry_run=dry_run) backfill_query = partial( _backfill_query, query_file_path, project_id, date_partition_parameter, exclude, max_rows, dry_run, no_partition, ctx.args, partitioning_type, destination_table=destination_table, ) if not depends_on_past: # run backfill for dates in parallel if depends_on_past is false with Pool(parallelism) as p: result = p.map(backfill_query, dates, chunksize=1) if not all(result): sys.exit(1) else: # if data depends on previous runs, then execute backfill sequentially for backfill_date in dates: backfill_query(backfill_date) @query.command( help="""Run a query. Additional parameters will get passed to bq.<br /> If a destination_table is set, the query result will be written to BigQuery. Without a destination_table specified, the results are not stored.<br /> If the `name` is not found within the `sql/` folder bqetl assumes it hasn't been generated yet and will start the generating process for all `sql_generators/` files. This generation process will take some time and run dryrun calls against BigQuery but this is expected. <br /> Additional parameters (all parameters that are not specified in the Options) must come after the query-name. Otherwise the first parameter that is not an option is interpreted as the query-name and since it can't be found the generation process will start. Examples: \b # Run a query by name ./bqetl query run telemetry_derived.ssl_ratios_v1 \b # Run a query file ./bqetl query run /path/to/query.sql \b # Run a query and save the result to BigQuery ./bqetl query run telemetry_derived.ssl_ratios_v1 \ --project_id=moz-fx-data-shared-prod \ --dataset_id=telemetry_derived \ --destination_table=ssl_ratios_v1 """, context_settings=dict( ignore_unknown_options=True, allow_extra_args=True, ), ) @click.argument("name") @sql_dir_option @project_id_option() @click.option( "--public_project_id", "--public-project-id", default=ConfigLoader.get( "default", "public_project", fallback="mozilla-public-data" ), help="Project with publicly accessible data", ) @click.option( "--destination_table", "--destination-table", required=False, help=( "Destination table name results are written to. " + "If not set, the query result will not be written to BigQuery." ), ) @click.option( "--dataset_id", "--dataset-id", required=False, help=( "Destination dataset results are written to. " + "If not set, determines destination dataset based on query." ), ) @click.pass_context def run( ctx, name, sql_dir, project_id, public_project_id, destination_table, dataset_id, ): """Run a query.""" if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login` " "and check that the project is set correctly." ) sys.exit(1) query_files = paths_matching_name_pattern(name, sql_dir, project_id) if query_files == []: # run SQL generators if no matching query has been found ctx.invoke( generate_all, output_dir=ctx.obj["TMP_DIR"], ignore=["derived_view_schemas", "stable_views", "country_code_lookup"], ) query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) _run_query( query_files, project_id, public_project_id, destination_table, dataset_id, ctx.args, ) def _run_query( query_files, project_id, public_project_id, destination_table, dataset_id, query_arguments, addl_templates: typing.Optional[dict] = None, mapped_values=None, ): client = bigquery.Client(project_id) """Run a query.""" if dataset_id is not None: # dataset ID was parsed by argparse but needs to be passed as parameter # when running the query query_arguments.append("--dataset_id={}".format(dataset_id)) if project_id is not None: query_arguments.append(f"--project_id={project_id}") if addl_templates is None: addl_templates = {} for query_file in query_files: use_public_table = False query_file = Path(query_file) try: metadata = Metadata.of_query_file(query_file) if metadata.is_public_bigquery(): if not validate_metadata.validate_public_data(metadata, query_file): sys.exit(1) # Change the destination table to write results to the public dataset; # a view to the public table in the internal dataset is created # when CI runs if ( dataset_id is not None and destination_table is not None and re.match(DESTINATION_TABLE_RE, destination_table) ): destination_table = "{}:{}.{}".format( public_project_id, dataset_id, destination_table ) query_arguments.append( "--destination_table={}".format(destination_table) ) use_public_table = True else: print( "ERROR: Cannot run public dataset query. Parameters" " --destination_table=<table without dataset ID> and" " --dataset_id=<dataset> required" ) sys.exit(1) except yaml.YAMLError as e: logging.error(e) sys.exit(1) except FileNotFoundError: logging.warning("No metadata.yaml found for {}", query_file) if not use_public_table and destination_table is not None: # destination table was parsed by argparse, however if it wasn't modified to # point to a public table it needs to be passed as parameter for the query if re.match(QUALIFIED_TABLE_NAME_RE, destination_table): project, dataset, table = qualified_table_name_matching( destination_table ) destination_table = "{}:{}.{}".format(project, dataset, table) query_arguments.append("--destination_table={}".format(destination_table)) if bool(list(filter(lambda x: x.startswith("--parameter"), query_arguments))): # need to do this as parameters are not supported with legacy sql query_arguments.append("--use_legacy_sql=False") # this assumed query command should always be passed inside query_arguments if "query" not in query_arguments: query_arguments = ["query"] + query_arguments # write rendered query to a temporary file; # query string cannot be passed directly to bq as SQL comments will be interpreted as CLI arguments with tempfile.NamedTemporaryFile(mode="w+") as query_stream: query_stream.write( render_template( query_file.name, template_folder=str(query_file.parent), templates_dir="", format=False, **addl_templates, ) ) query_stream.seek(0) query_content = query_stream.read() if mapped_values is not None and "{mapped_values}" in query_content: # Format the query template and run the query. print(f"Running for {mapped_values}...") query = query_content.format(mapped_values=mapped_values) job = client.query( query=query, job_config=bigquery.QueryJobConfig( use_query_cache=False, use_legacy_sql=False, ), ) job.result() else: # run the query as shell command so that passed parameters can be used as is. subprocess.check_call(["bq"] + query_arguments, stdin=query_stream) @query.command( help="""Run a multipart query. Examples: \b # Run a multipart query ./bqetl query run_multipart /path/to/query.sql """, context_settings=dict( ignore_unknown_options=True, allow_extra_args=True, ), ) @click.argument( "query_dir", type=click.Path(file_okay=False), ) @click.option( "--using", default="document_id", help="comma separated list of join columns to use when combining results", ) @click.option( "--parallelism", default=4, type=int, help="Maximum number of queries to execute concurrently", ) @click.option( "--dataset_id", "--dataset-id", help="Default dataset, if not specified all tables must be qualified with dataset", ) @project_id_option() @temp_dataset_option() @click.option( "--destination_table", required=True, help="table where combined results will be written", ) @click.option( "--time_partitioning_field", type=lambda f: bigquery.TimePartitioning(field=f), help="time partition field on the destination table", ) @click.option( "--clustering_fields", type=lambda f: f.split(","), help="comma separated list of clustering fields on the destination table", ) @click.option( "--dry_run", "--dry-run", is_flag=True, default=False, help="Print bytes that would be processed for each part and don't run queries", ) @click.option( "--parameters", "--parameter", multiple=True, default=[], type=lambda p: bigquery.ScalarQueryParameter(*p.split(":", 2)), metavar="NAME:TYPE:VALUE", help="query parameter(s) to pass when running parts", ) @click.option( "--priority", default="INTERACTIVE", type=click.Choice(["BATCH", "INTERACTIVE"]), help=( "Priority for BigQuery query jobs; BATCH priority will significantly slow " "down queries if reserved slots are not enabled for the billing project; " "defaults to INTERACTIVE" ), ) @click.option( "--schema_update_options", "--schema_update_option", multiple=True, type=click.Choice( [ bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, # Airflow passes an empty string when the field addition date doesn't # match the run date. # See https://github.com/mozilla/telemetry-airflow/blob/ # e49fa7e6b3f5ec562dd248d257770c2303cf0cba/dags/utils/gcp.py#L515 "", ] ), default=[], help="Optional options for updating the schema.", ) def run_multipart( query_dir, using, parallelism, dataset_id, project_id, temp_dataset, destination_table, time_partitioning_field, clustering_fields, dry_run, parameters, priority, schema_update_options, ): """Run a multipart query.""" if dataset_id is not None and "." not in dataset_id and project_id is not None: dataset_id = f"{project_id}.{dataset_id}" if "." not in destination_table and dataset_id is not None: destination_table = f"{dataset_id}.{destination_table}" client = bigquery.Client(project_id) with ThreadPool(parallelism) as pool: parts = pool.starmap( _run_part, [ ( client, part, query_dir, temp_dataset, dataset_id, dry_run, parameters, priority, ) for part in sorted(next(os.walk(query_dir))[2]) if part.startswith("part") and part.endswith(".sql") ], chunksize=1, ) if not dry_run: total_bytes = sum(job.total_bytes_processed for _, job in parts) query = ( f"SELECT\n *\nFROM\n `{sql_table_id(parts[0][1].destination)}`" + "".join( f"\nFULL JOIN\n `{sql_table_id(job.destination)}`" f"\nUSING\n ({using})" for _, job in parts[1:] ) ) try: job = client.query( query=query, job_config=bigquery.QueryJobConfig( destination=destination_table, time_partitioning=time_partitioning_field, clustering_fields=clustering_fields, write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, use_legacy_sql=False, priority=priority, schema_update_options=schema_update_options, ), ) job.result() logging.info( f"Processed {job.total_bytes_processed:,d} bytes to combine results" ) total_bytes += job.total_bytes_processed logging.info(f"Processed {total_bytes:,d} bytes in total") finally: for _, job in parts: client.delete_table(sql_table_id(job.destination).split("$")[0]) logging.info(f"Deleted {len(parts)} temporary tables") def _run_part( client, part, query_dir, temp_dataset, dataset_id, dry_run, parameters, priority ): """Run a query part.""" with open(os.path.join(query_dir, part)) as sql_file: query = sql_file.read() job_config = bigquery.QueryJobConfig( destination=temp_dataset.temp_table(), default_dataset=dataset_id, use_legacy_sql=False, dry_run=dry_run, query_parameters=parameters, priority=priority, allow_large_results=True, ) job = client.query(query=query, job_config=job_config) if job.dry_run: logging.info(f"Would process {job.total_bytes_processed:,d} bytes for {part}") else: job.result() logging.info(f"Processed {job.total_bytes_processed:,d} bytes for {part}") return part, job @query.command( help="""Validate a query. Checks formatting, scheduling information and dry runs the query. Examples: ./bqetl query validate telemetry_derived.clients_daily_v6 \b # Validate query not in shared-prod ./bqetl query validate \\ --use_cloud_function=false \\ --project_id=moz-fx-data-marketing-prod \\ ga_derived.blogs_goals_v1 """, ) @click.argument("name", required=False) @sql_dir_option @project_id_option() @use_cloud_function_option @click.option( "--validate_schemas", "--validate-schemas", help="Require dry run schema to match destination table and file if present.", is_flag=True, default=False, ) @respect_dryrun_skip_option(default=False) @no_dryrun_option(default=False) @click.pass_context def validate( ctx, name, sql_dir, project_id, use_cloud_function, validate_schemas, respect_dryrun_skip, no_dryrun, ): """Validate queries by dry running, formatting and checking scheduling configs.""" if name is None: name = "*.*" query_files = paths_matching_name_pattern(name, sql_dir, project_id) dataset_dirs = set() for query in query_files: ctx.invoke(format, paths=[str(query)]) if not no_dryrun: ctx.invoke( dryrun, paths=[str(query)], use_cloud_function=use_cloud_function, project=project_id, validate_schemas=validate_schemas, respect_skip=respect_dryrun_skip, ) validate_metadata.validate(query.parent) dataset_dirs.add(query.parent.parent) if no_dryrun: click.echo("Dry run skipped for query files.") for dataset_dir in dataset_dirs: validate_metadata.validate_datasets(dataset_dir) def _initialize_in_parallel( project, table, dataset, query_file, arguments, parallelism, addl_templates, ): mapped_values = [f"sample_id = {i}" for i in list(range(0, 100))] with ThreadPool(parallelism) as pool: start = default_timer() # Process all subsets in the query in parallel (eg. all sample_ids). pool.map( partial( _run_query, [query_file], project, None, table, dataset, arguments, addl_templates, ), mapped_values, ) print(f"Job completed in {default_timer() - start}") @query.command( help="""Create and initialize the destination table for the query. Only for queries that have an `init.sql` file. Examples: ./bqetl query initialize telemetry_derived.ssl_ratios_v1 """, ) @click.argument("name") @sql_dir_option @project_id_option() @click.option( "--dry_run/--no_dry_run", "--dry-run/--no-dry-run", help="Dry run the initialization", ) @click.pass_context def initialize(ctx, name, sql_dir, project_id, dry_run): """Create the destination table for the provided query.""" if not is_authenticated(): click.echo("Authentication required for creating tables.", err=True) sys.exit(1) if Path(name).exists(): # Allow name to be a path query_files = [Path(name)] else: query_files = paths_matching_name_pattern(name, sql_dir, project_id) if not query_files: click.echo( f"Couldn't find directory matching `{name}`. Failed to initialize query.", err=True, ) sys.exit(1) for query_file in query_files: sql_content = query_file.read_text() client = bigquery.Client() # Enable initialization from query.sql files # Create the table by deploying the schema and metadata, then run the init. # This does not currently verify the accuracy of the schema or that it # matches the query. if "is_init()" in sql_content: project = query_file.parent.parent.parent.name dataset = query_file.parent.parent.name destination_table = query_file.parent.name full_table_id = f"{project}.{dataset}.{destination_table}" try: table = client.get_table(full_table_id) except NotFound: table = bigquery.Table(full_table_id) if table.created: raise PreconditionFailed( f"Table {full_table_id} already exists. The initialization process is terminated." ) ctx.invoke(deploy, name=full_table_id, force=True) arguments = [ "query", "--use_legacy_sql=false", "--replace", "--format=none", ] if dry_run: arguments += ["--dry_run"] if "parallel_run" in sql_content: _initialize_in_parallel( project=project, table=destination_table, dataset=dataset, query_file=query_file, arguments=arguments, parallelism=DEFAULT_PARALLELISM, addl_templates={ "is_init": lambda: True, "parallel_run": lambda: True, }, ) else: _run_query( project_id=project, public_project_id=None, destination_table=destination_table, dataset_id=dataset, query_arguments=arguments, addl_templates={ "is_init": lambda: True, }, ) else: init_files = Path(query_file.parent).rglob("init.sql") for init_file in init_files: project = init_file.parent.parent.parent.name with open(init_file) as init_file_stream: init_sql = init_file_stream.read() dataset = Path(init_file).parent.parent.name destination_table = query_file.parent.name job_config = bigquery.QueryJobConfig( dry_run=dry_run, default_dataset=f"{project}.{dataset}", destination=f"{project}.{dataset}.{destination_table}", ) if "CREATE MATERIALIZED VIEW" in init_sql: click.echo(f"Create materialized view for {init_file}") # existing materialized view have to be deleted before re-creation view_name = query_file.parent.name client.delete_table( f"{project}.{dataset}.{view_name}", not_found_ok=True ) else: click.echo(f"Create destination table for {init_file}") job = client.query(init_sql, job_config=job_config) if not dry_run: job.result() @query.command( help="""Render a query Jinja template. Examples: ./bqetl query render telemetry_derived.ssl_ratios_v1 \\ --output-dir=/tmp """, context_settings=dict( ignore_unknown_options=True, allow_extra_args=True, ), ) @click.argument("name") @sql_dir_option @click.option( "--output-dir", "--output_dir", help="Output directory generated SQL is written to. " + "If not specified, rendered queries are printed to console.", type=click.Path(file_okay=False), required=False, ) def render(name, sql_dir, output_dir): """Render a query Jinja template.""" if name is None: name = "*.*" query_files = paths_matching_name_pattern(name, sql_dir, project_id=None) resolved_sql_dir = Path(sql_dir).resolve() for query_file in query_files: table_name = query_file.parent.name dataset_id = query_file.parent.parent.name project_id = query_file.parent.parent.parent.name jinja_params = { **{ "project_id": project_id, "dataset_id": dataset_id, "table_name": table_name, }, } rendered_sql = ( render_template( query_file.name, template_folder=query_file.parent, templates_dir="", format=False, **jinja_params, ) + "\n" ) if not any(s in str(query_file) for s in skip_format()): rendered_sql = reformat(rendered_sql, trailing_newline=True) if output_dir: output_file = output_dir / query_file.resolve().relative_to( resolved_sql_dir ) output_file.parent.mkdir(parents=True, exist_ok=True) output_file.write_text(rendered_sql) else: click.echo(query_file) click.echo(rendered_sql) def _parse_partition_setting(partition_date): params = partition_date.split(":") if len(params) != 3: return None # Check date format try: datetime.datetime.strptime(params[2], "%Y-%m-%d").date() except ValueError: return None # Check column name if re.match(r"^\w+$", params[0]): return {params[0]: params[2]} def _validate_partition_date(ctx, param, partition_date): """Process the CLI parameter check_date and set the parameter for BigQuery.""" # Will be None if launched from Airflow. Also ctx.args is not populated at this stage. if partition_date: parsed = _parse_partition_setting(partition_date) if parsed is None: raise click.BadParameter("Format must be <column-name>::<yyyy-mm-dd>") return parsed return None def _parse_check_output(output: str) -> str: output = output.replace("\n", " ") if "ETL Data Check Failed:" in output: return f"ETL Data Check Failed:{output.split('ETL Data Check Failed:')[1]}" return output @query.group(help="Commands for managing query schemas.") def schema(): """Create the CLI group for the query schema command.""" pass @schema.command( help=""" Update the query schema based on the destination table schema and the query schema. If no schema.yaml file exists for a query, one will be created. Examples: ./bqetl query schema update telemetry_derived.clients_daily_v6 # Update schema including downstream dependencies (requires GCP) ./bqetl query schema update telemetry_derived.clients_daily_v6 --update-downstream """, ) @click.argument("name") @sql_dir_option @click.option( "--project-id", "--project_id", help="GCP project ID", default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"), callback=is_valid_project, ) @click.option( "--update-downstream", "--update_downstream", help="Update downstream dependencies. GCP authentication required.", default=False, is_flag=True, ) @click.option( "--tmp-dataset", "--tmp_dataset", help="GCP datasets for creating updated tables temporarily.", default="tmp", ) @use_cloud_function_option @respect_dryrun_skip_option(default=True) @parallelism_option def update( name, sql_dir, project_id, update_downstream, tmp_dataset, use_cloud_function, respect_dryrun_skip, parallelism, ): """CLI command for generating the query schema.""" if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login` " "and check that the project is set correctly." ) sys.exit(1) query_files = paths_matching_name_pattern( name, sql_dir, project_id, files=["query.sql"] ) dependency_graph = get_dependency_graph([sql_dir], without_views=True) manager = multiprocessing.Manager() tmp_tables = manager.dict({}) # order query files to make sure derived_from dependencies are resolved query_file_graph = {} for query_file in query_files: query_file_graph[query_file] = [] try: metadata = Metadata.of_query_file(str(query_file)) if metadata and metadata.schema and metadata.schema.derived_from: for derived_from in metadata.schema.derived_from: parent_queries = [ query for query in paths_matching_name_pattern( ".".join(derived_from.table), sql_dir, project_id ) ] if len(parent_queries) > 0: query_file_graph[query_file].append(parent_queries[0]) except FileNotFoundError: query_file_graph[query_file] = [] ts = ParallelTopologicalSorter( query_file_graph, parallelism=parallelism, with_follow_up=update_downstream ) ts.map( partial( _update_query_schema_with_downstream, sql_dir, project_id, tmp_dataset, dependency_graph, tmp_tables, use_cloud_function, respect_dryrun_skip, update_downstream, ) ) if len(tmp_tables) > 0: client = bigquery.Client() # delete temporary tables for _, table in tmp_tables.items(): client.delete_table(table, not_found_ok=True) def _update_query_schema_with_downstream( sql_dir, project_id, tmp_dataset, dependency_graph, tmp_tables={}, use_cloud_function=True, respect_dryrun_skip=True, update_downstream=False, query_file=None, follow_up_queue=None, ): try: changed = _update_query_schema( query_file, sql_dir, project_id, tmp_dataset, tmp_tables, use_cloud_function, respect_dryrun_skip, ) if update_downstream: # update downstream dependencies if changed: if not is_authenticated(): click.echo( "Cannot update downstream dependencies." "Authentication to GCP required. Run `gcloud auth login` " "and check that the project is set correctly." ) sys.exit(1) project, dataset, table = extract_from_query_path(query_file) identifier = f"{project}.{dataset}.{table}" tmp_identifier = f"{project}.{tmp_dataset}.{table}_{random_str(12)}" # create temporary table with updated schema if identifier not in tmp_tables: schema = Schema.from_schema_file(query_file.parent / SCHEMA_FILE) schema.deploy(tmp_identifier) tmp_tables[identifier] = tmp_identifier # get downstream dependencies that will be updated in the next iteration dependencies = [ p for k, refs in dependency_graph.items() for p in paths_matching_name_pattern( k, sql_dir, project_id, files=("query.sql",) ) if identifier in refs ] for d in dependencies: click.echo(f"Update downstream dependency schema for {d}") if follow_up_queue: follow_up_queue.put(d) except Exception: print_exc() def _update_query_schema( query_file, sql_dir, project_id, tmp_dataset, tmp_tables={}, use_cloud_function=True, respect_dryrun_skip=True, ): """ Update the schema of a specific query file. Return True if the schema changed, False if it is unchanged. """ if respect_dryrun_skip and str(query_file) in DryRun.skipped_files(): click.echo(f"{query_file} dry runs are skipped. Cannot update schemas.") return tmp_tables = copy.deepcopy(tmp_tables) query_file_path = Path(query_file) existing_schema_path = query_file_path.parent / SCHEMA_FILE project_name, dataset_name, table_name = extract_from_query_path(query_file_path) try: metadata = Metadata.of_query_file(str(query_file_path)) except FileNotFoundError: metadata = None click.echo(f"No metadata defined for {query_file_path}") # pull in updates from parent schemas if metadata and metadata.schema and metadata.schema.derived_from: for derived_from in metadata.schema.derived_from: parent_queries = [ query for query in paths_matching_name_pattern( ".".join(derived_from.table), sql_dir, project_id ) ] if len(parent_queries) == 0: click.echo( f"derived_from query {derived_from.table} does not exist.", err=True, ) else: parent_schema = Schema.from_schema_file( parent_queries[0].parent / SCHEMA_FILE ) parent_project, parent_dataset, parent_table = extract_from_query_path( parent_queries[0] ) parent_identifier = f"{parent_project}.{parent_dataset}.{parent_table}" if parent_identifier not in tmp_tables: tmp_parent_identifier = ( f"{parent_project}.{tmp_dataset}.{parent_table}_" + random_str(12) ) parent_schema.deploy(tmp_parent_identifier) tmp_tables[parent_identifier] = tmp_parent_identifier if existing_schema_path.is_file(): existing_schema = Schema.from_schema_file(existing_schema_path) else: existing_schema = Schema.empty() existing_schema.merge(parent_schema, exclude=derived_from.exclude) # use temporary table tmp_identifier = ( f"{project_name}.{tmp_dataset}.{table_name}_{random_str(12)}" ) existing_schema.deploy(tmp_identifier) tmp_tables[ f"{project_name}.{dataset_name}.{table_name}" ] = tmp_identifier existing_schema.to_yaml_file(existing_schema_path) # replace temporary table references sql_content = render_template( query_file_path.name, template_folder=str(query_file_path.parent), templates_dir="", format=False, ) for orig_table, tmp_table in tmp_tables.items(): table_parts = orig_table.split(".") for i in range(len(table_parts)): if ".".join(table_parts[i:]) in sql_content: sql_content = sql_content.replace(".".join(table_parts[i:]), tmp_table) break query_schema = None try: query_schema = Schema.from_query_file( query_file_path, content=sql_content, use_cloud_function=use_cloud_function, respect_skip=respect_dryrun_skip, ) except Exception: if not existing_schema_path.exists(): click.echo( click.style( f"Cannot automatically update {query_file_path}. " f"Please update {query_file_path / SCHEMA_FILE} manually.", fg="red", ), err=True, ) return # update bigquery metadata try: client = bigquery.Client() table = client.get_table(f"{project_name}.{dataset_name}.{table_name}") metadata_file_path = query_file_path.parent / METADATA_FILE if ( table.time_partitioning and metadata and ( metadata.bigquery is None or metadata.bigquery.time_partitioning is None ) ): metadata.set_bigquery_partitioning( field=table.time_partitioning.field, partition_type=table.time_partitioning.type_.lower(), required=table.time_partitioning.require_partition_filter, expiration_days=table.time_partitioning.expiration_ms / 86400000.0 if table.time_partitioning.expiration_ms else None, ) click.echo(f"Partitioning metadata added to {metadata_file_path}") if ( table.clustering_fields and metadata and (metadata.bigquery is None or metadata.bigquery.clustering is None) ): metadata.set_bigquery_clustering(table.clustering_fields) click.echo(f"Clustering metadata added to {metadata_file_path}") if metadata: metadata.write(metadata_file_path) except NotFound: click.echo( f"Destination table {project_name}.{dataset_name}.{table_name} " "does not exist in BigQuery. Run bqetl query schema deploy " "<dataset>.<table> to create the destination table." ) except FileNotFoundError: click.echo( f"No metadata file for {project_name}.{dataset_name}.{table_name}." " Skip schema update." ) return partitioned_by = None try: metadata = Metadata.of_query_file(query_file_path) if metadata.bigquery and metadata.bigquery.time_partitioning: partitioned_by = metadata.bigquery.time_partitioning.field except FileNotFoundError: pass table_schema = Schema.for_table( project_name, dataset_name, table_name, partitioned_by, use_cloud_function=use_cloud_function, respect_skip=respect_dryrun_skip, ) changed = True if existing_schema_path.is_file(): existing_schema = Schema.from_schema_file(existing_schema_path) old_schema = copy.deepcopy(existing_schema) if table_schema: existing_schema.merge(table_schema) if query_schema: existing_schema.merge(query_schema) existing_schema.to_yaml_file(existing_schema_path) changed = not existing_schema.equal(old_schema) else: query_schema.merge(table_schema) query_schema.to_yaml_file(existing_schema_path) click.echo(f"Schema {existing_schema_path} updated.") return changed @schema.command( help="""Deploy the query schema. Examples: ./bqetl query schema deploy telemetry_derived.clients_daily_v6 """, ) @click.argument("name") @sql_dir_option @click.option( "--project-id", "--project_id", help="GCP project ID", default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"), callback=is_valid_project, ) @click.option( "--force/--noforce", help="Deploy the schema file without validating that it matches the query", default=False, ) @use_cloud_function_option @respect_dryrun_skip_option(default=True) @click.option( "--skip-existing", "--skip_existing", help="Skip updating existing tables. " + "This option ensures that only new tables get deployed.", default=False, is_flag=True, ) @click.option( "--skip-external-data", "--skip_external_data", help="Skip publishing external data, such as Google Sheets.", default=False, is_flag=True, ) @click.option( "--destination_table", "--destination-table", required=False, help=( "Destination table name results are written to. " + "If not set, determines destination table based on query. " + "Must be fully qualified (project.dataset.table)." ), ) @parallelism_option @click.pass_context def deploy( ctx, name, sql_dir, project_id, force, use_cloud_function, respect_dryrun_skip, skip_existing, skip_external_data, destination_table, parallelism, ): """CLI command for deploying destination table schemas.""" if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login` " "and check that the project is set correctly." ) sys.exit(1) client = bigquery.Client() query_files = paths_matching_name_pattern(name, sql_dir, project_id, ["query.*"]) if not query_files: # run SQL generators if no matching query has been found ctx.invoke( generate_all, output_dir=ctx.obj["TMP_DIR"], ignore=["derived_view_schemas", "stable_views"], ) query_files = paths_matching_name_pattern( name, ctx.obj["TMP_DIR"], project_id, ["query.*"] ) def _deploy(query_file): if respect_dryrun_skip and str(query_file) in DryRun.skipped_files(): click.echo(f"{query_file} dry runs are skipped. Cannot validate schemas.") return query_file_path = Path(query_file) existing_schema_path = query_file_path.parent / SCHEMA_FILE if not existing_schema_path.is_file(): click.echo(f"No schema file found for {query_file}") return try: table_name = query_file_path.parent.name dataset_name = query_file_path.parent.parent.name project_name = query_file_path.parent.parent.parent.name if destination_table: full_table_id = destination_table else: full_table_id = f"{project_name}.{dataset_name}.{table_name}" existing_schema = Schema.from_schema_file(existing_schema_path) if not force and str(query_file_path).endswith("query.sql"): query_schema = Schema.from_query_file( query_file_path, use_cloud_function=use_cloud_function, respect_skip=respect_dryrun_skip, ) if not existing_schema.equal(query_schema): click.echo( f"Query {query_file_path} does not match " f"schema in {existing_schema_path}. " f"To update the local schema file, " f"run `./bqetl query schema update " f"{dataset_name}.{table_name}`", err=True, ) sys.exit(1) with NamedTemporaryFile(suffix=".json") as tmp_schema_file: existing_schema.to_json_file(Path(tmp_schema_file.name)) bigquery_schema = client.schema_from_json(tmp_schema_file.name) try: table = client.get_table(full_table_id) except NotFound: table = bigquery.Table(full_table_id) table.schema = bigquery_schema _attach_metadata(query_file_path, table) if not table.created: client.create_table(table) click.echo(f"Destination table {full_table_id} created.") elif not skip_existing: client.update_table( table, [ "schema", "friendly_name", "description", "time_partitioning", "clustering_fields", "labels", ], ) click.echo(f"Schema (and metadata) updated for {full_table_id}.") except Exception: print_exc() return query_file with ThreadPool(parallelism) as pool: failed_deploys = [r for r in pool.map(_deploy, query_files) if r] if not skip_external_data: failed_external_deploys = _deploy_external_data( name, sql_dir, project_id, skip_existing ) failed_deploys += failed_external_deploys if len(failed_deploys) > 0: click.echo("The following tables could not be deployed:") for failed_deploy in failed_deploys: click.echo(failed_deploy) sys.exit(1) click.echo("All tables have been deployed.") def _attach_metadata(query_file_path: Path, table: bigquery.Table) -> None: """Add metadata from query file's metadata.yaml to table object.""" try: metadata = Metadata.of_query_file(query_file_path) except FileNotFoundError: return table.description = metadata.description table.friendly_name = metadata.friendly_name if metadata.bigquery and metadata.bigquery.time_partitioning: table.time_partitioning = bigquery.TimePartitioning( metadata.bigquery.time_partitioning.type.bigquery_type, field=metadata.bigquery.time_partitioning.field, require_partition_filter=( metadata.bigquery.time_partitioning.require_partition_filter ), expiration_ms=metadata.bigquery.time_partitioning.expiration_ms, ) if metadata.bigquery and metadata.bigquery.clustering: table.clustering_fields = metadata.bigquery.clustering.fields # BigQuery only allows for string type labels with specific requirements to be published: # https://cloud.google.com/bigquery/docs/labels-intro#requirements if metadata.labels: table.labels = { key: value for key, value in metadata.labels.items() if isinstance(value, str) } def _deploy_external_data( name, sql_dir, project_id, skip_existing, ) -> list: """Publish external data tables.""" # whether a table should be created from external data is defined in the metadata metadata_files = paths_matching_name_pattern( name, sql_dir, project_id, ["metadata.yaml"] ) client = bigquery.Client() failed_deploys = [] for metadata_file_path in metadata_files: metadata = Metadata.from_file(metadata_file_path) if not metadata.external_data: # skip all tables that are not created from external data continue existing_schema_path = metadata_file_path.parent / SCHEMA_FILE if not existing_schema_path.is_file(): # tables created from external data must specify a schema click.echo(f"No schema file found for {metadata_file_path}") continue try: table_name = metadata_file_path.parent.name dataset_name = metadata_file_path.parent.parent.name project_name = metadata_file_path.parent.parent.parent.name full_table_id = f"{project_name}.{dataset_name}.{table_name}" existing_schema = Schema.from_schema_file(existing_schema_path) try: table = client.get_table(full_table_id) except NotFound: table = bigquery.Table(full_table_id) with NamedTemporaryFile(suffix=".json") as tmp_schema_file: existing_schema.to_json_file(Path(tmp_schema_file.name)) bigquery_schema = client.schema_from_json(tmp_schema_file.name) table.schema = bigquery_schema _attach_metadata(metadata_file_path, table) if not table.created: if metadata.external_data.format in ( ExternalDataFormat.GOOGLE_SHEETS, ExternalDataFormat.CSV, ): external_config = bigquery.ExternalConfig( metadata.external_data.format.value.upper() ) external_config.source_uris = metadata.external_data.source_uris external_config.ignore_unknown_values = True external_config.autodetect = False for key, v in metadata.external_data.options.items(): setattr(external_config.options, key, v) table.external_data_configuration = external_config table = client.create_table(table) click.echo(f"Destination table {full_table_id} created.") else: click.echo( f"External data format {metadata.external_data.format} unsupported." ) elif not skip_existing: client.update_table( table, [ "schema", "friendly_name", "description", "labels", ], ) click.echo(f"Schema (and metadata) updated for {full_table_id}.") except Exception: print_exc() failed_deploys.append(metadata_file_path) return failed_deploys def _validate_schema_from_path( query_file_path, use_cloud_function=True, respect_dryrun_skip=True ): """Dry Runs and validates a query schema from its path.""" return ( DryRun( query_file_path, use_cloud_function=use_cloud_function, respect_skip=respect_dryrun_skip, ).validate_schema(), query_file_path, ) @schema.command( help="""Validate the query schema Examples: ./bqetl query schema validate telemetry_derived.clients_daily_v6 """, name="validate", ) @click.argument("name") @sql_dir_option @click.option( "--project-id", "--project_id", help="GCP project ID", default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"), callback=is_valid_project, ) @use_cloud_function_option @respect_dryrun_skip_option(default=True) @click.pass_context def validate_schema( ctx, name, sql_dir, project_id, use_cloud_function, respect_dryrun_skip ): """Validate the defined query schema with the query and the destination table.""" query_files = paths_matching_name_pattern(name, sql_dir, project_id) if query_files == []: # run SQL generators if no matching query has been found ctx.invoke( generate_all, output_dir=ctx.obj["TMP_DIR"], ignore=["derived_view_schemas", "stable_views"], ) query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) _validate_schema = partial( _validate_schema_from_path, use_cloud_function=use_cloud_function, respect_dryrun_skip=respect_dryrun_skip, ) with Pool(8) as p: result = p.map(_validate_schema, query_files, chunksize=1) all_valid = True for is_valid, query_file_path in result: if is_valid is False: if all_valid: click.echo("\nSchemas for the following queries are invalid:") all_valid = False click.echo(query_file_path) if not all_valid: sys.exit(1) else: click.echo("\nAll schemas are valid.")