backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py (1,808 lines of code) (raw):
"""bigquery-etl CLI query command."""
import copy
import datetime
import logging
import multiprocessing
import os
import re
import string
import subprocess
import sys
import tempfile
import typing
from datetime import date, timedelta
from functools import partial
from multiprocessing.pool import Pool, ThreadPool
from pathlib import Path
from tempfile import NamedTemporaryFile
from timeit import default_timer
from traceback import print_exc
import click
import yaml
from dateutil.rrule import MONTHLY, rrule
from google.cloud import bigquery
from google.cloud.exceptions import NotFound, PreconditionFailed
from ..backfill.utils import QUALIFIED_TABLE_NAME_RE, qualified_table_name_matching
from ..cli.format import format
from ..cli.utils import (
is_authenticated,
is_valid_project,
no_dryrun_option,
parallelism_option,
paths_matching_name_pattern,
project_id_option,
respect_dryrun_skip_option,
sql_dir_option,
temp_dataset_option,
use_cloud_function_option,
)
from ..config import ConfigLoader
from ..dependency import get_dependency_graph
from ..dryrun import DryRun
from ..format_sql.format import skip_format
from ..format_sql.formatter import reformat
from ..metadata import validate_metadata
from ..metadata.parse_metadata import (
METADATA_FILE,
BigQueryMetadata,
ClusteringMetadata,
DatasetMetadata,
ExternalDataFormat,
Metadata,
PartitionMetadata,
PartitionType,
)
from ..query_scheduling.dag_collection import DagCollection
from ..query_scheduling.generate_airflow_dags import get_dags
from ..schema import SCHEMA_FILE, Schema
from ..util import extract_from_query_path
from ..util.bigquery_id import sql_table_id
from ..util.common import random_str
from ..util.common import render as render_template
from ..util.parallel_topological_sorter import ParallelTopologicalSorter
from .dryrun import dryrun
from .generate import generate_all
QUERY_NAME_RE = re.compile(r"(?P<dataset>[a-zA-z0-9_]+)\.(?P<name>[a-zA-z0-9_]+)")
VERSION_RE = re.compile(r"_v[0-9]+")
DESTINATION_TABLE_RE = re.compile(r"^[a-zA-Z0-9_$]{0,1024}$")
DEFAULT_DAG_NAME = "bqetl_default"
DEFAULT_PARALLELISM = 10
@click.group(help="Commands for managing queries.")
@click.pass_context
def query(ctx):
"""Create the CLI group for the query command."""
# create temporary directory generated content is written to
# the directory will be deleted automatically after the command exits
ctx.ensure_object(dict)
ctx.obj["TMP_DIR"] = ctx.with_resource(tempfile.TemporaryDirectory())
@query.command(
help="""Create a new query with name
<dataset>.<query_name>, for example: telemetry_derived.active_profiles.
Use the `--project_id` option to change the project the query is added to;
default is `moz-fx-data-shared-prod`. Views are automatically generated
in the publicly facing dataset.
Examples:
\b
./bqetl query create telemetry_derived.deviations_v1 \\
--owner=example@mozilla.com
\b
# The query version gets autocompleted to v1. Queries are created in the
# _derived dataset and accompanying views in the public dataset.
./bqetl query create telemetry.deviations --owner=example@mozilla.com
""",
)
@click.argument("name")
@sql_dir_option
@project_id_option(
ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod")
)
@click.option(
"--owner",
"-o",
help="Owner of the query (email address)",
default="example@mozilla.com",
)
@click.option(
"--init",
"-i",
help="Create an init.sql file to initialize the table",
default=False,
is_flag=True,
)
@click.option(
"--dag",
"-d",
help=(
f"Name of the DAG the query should be scheduled under."
"If there is no DAG name specified, the query is"
f"scheduled by default in DAG {DEFAULT_DAG_NAME}."
"To skip the automated scheduling use --no_schedule."
"To see available DAGs run `bqetl dag info`."
"To create a new DAG run `bqetl dag create`."
),
default=DEFAULT_DAG_NAME,
)
@click.option(
"--no_schedule",
"--no-schedule",
help=(
"Using this option creates the query without scheduling information."
" Use `bqetl query schedule` to add it manually if required."
),
default=False,
is_flag=True,
)
@click.pass_context
def create(ctx, name, sql_dir, project_id, owner, init, dag, no_schedule):
"""CLI command for creating a new query."""
# create directory structure for query
try:
match = QUERY_NAME_RE.match(name)
name = match.group("name")
dataset = match.group("dataset")
version = "_" + name.split("_")[-1]
if not VERSION_RE.match(version):
version = "_v1"
else:
name = "_".join(name.split("_")[:-1])
except AttributeError:
click.echo(
"New queries must be named like:"
+ " <dataset>.<table> or <dataset>.<table>_v[n]"
)
sys.exit(1)
derived_path = None
view_path = None
path = Path(sql_dir)
if dataset.endswith("_derived"):
# create a directory for the corresponding view
derived_path = path / project_id / dataset / (name + version)
derived_path.mkdir(parents=True)
view_path = path / project_id / dataset.replace("_derived", "") / name
view_path.mkdir(parents=True)
else:
# check if there is a corresponding derived dataset
if (path / project_id / (dataset + "_derived")).exists():
derived_path = path / project_id / (dataset + "_derived") / (name + version)
derived_path.mkdir(parents=True)
view_path = path / project_id / dataset / name
view_path.mkdir(parents=True)
dataset = dataset + "_derived"
else:
# some dataset that is not specified as _derived
# don't automatically create views
derived_path = path / project_id / dataset / (name + version)
derived_path.mkdir(parents=True)
click.echo(f"Created query in {derived_path}")
if view_path:
click.echo(f"Created corresponding view in {view_path}")
view_file = view_path / "view.sql"
view_dataset = dataset.replace("_derived", "")
view_file.write_text(
reformat(
f"""CREATE OR REPLACE VIEW
`{project_id}.{view_dataset}.{name}`
AS SELECT * FROM
`{project_id}.{dataset}.{name}{version}`"""
)
+ "\n"
)
# create query.sql file
query_file = derived_path / "query.sql"
query_file.write_text(
reformat(
f"""-- Query for {dataset}.{name}{version}
-- For more information on writing queries see:
-- https://docs.telemetry.mozilla.org/cookbooks/bigquery/querying.html
SELECT * FROM table WHERE submission_date = @submission_date"""
)
+ "\n"
)
# create default metadata.yaml
metadata_file = derived_path / "metadata.yaml"
metadata = Metadata(
friendly_name=string.capwords(name.replace("_", " ")),
description="Please provide a description for the query",
owners=[owner],
labels={"incremental": True},
bigquery=BigQueryMetadata(
time_partitioning=PartitionMetadata(field="", type=PartitionType.DAY),
clustering=ClusteringMetadata(fields=[]),
),
)
metadata.write(metadata_file)
# optionally create init.sql
if init:
init_file = derived_path / "init.sql"
init_file.write_text(
reformat(
f"""
-- SQL for initializing the query destination table.
CREATE OR REPLACE TABLE
`{ConfigLoader.get('default', 'project', fallback="moz-fx-data-shared-prod")}.{dataset}.{name}{version}`
AS SELECT * FROM table"""
)
+ "\n"
)
dataset_metadata_file = derived_path.parent / "dataset_metadata.yaml"
if not dataset_metadata_file.exists():
dataset_name = str(dataset_metadata_file.parent.name)
dataset_metadata = DatasetMetadata(
friendly_name=string.capwords(dataset_name.replace("_", " ")),
description="Please provide a description for the dataset",
dataset_base_acl="derived",
user_facing=False,
)
dataset_metadata.write(dataset_metadata_file)
click.echo(f"Created dataset metadata in {dataset_metadata_file}")
if view_path:
dataset_metadata_file = view_path.parent / "dataset_metadata.yaml"
if not dataset_metadata_file.exists():
dataset_name = str(dataset_metadata_file.parent.name)
dataset_metadata = DatasetMetadata(
friendly_name=string.capwords(dataset_name.replace("_", " ")),
description="Please provide a description for the dataset",
dataset_base_acl="view",
user_facing=True,
)
dataset_metadata.write(dataset_metadata_file)
click.echo(f"Created dataset metadata in {dataset_metadata_file}")
if no_schedule:
click.echo(
click.style(
"WARNING: This query has been created without "
"scheduling information. Use `bqetl query schedule`"
" to manually add it to a DAG or "
"`bqetl query create --help` for more options.",
fg="yellow",
)
)
else:
ctx.invoke(schedule, name=derived_path, dag=dag)
@query.command(
help="""Schedule an existing query
Examples:
\b
./bqetl query schedule telemetry_derived.deviations_v1 \\
--dag=bqetl_deviations
\b
# Set a specific name for the task
./bqetl query schedule telemetry_derived.deviations_v1 \\
--dag=bqetl_deviations \\
--task-name=deviations
""",
)
@click.argument("name")
@sql_dir_option
@project_id_option()
@click.option(
"--dag",
"-d",
help=(
"Name of the DAG the query should be scheduled under. "
"To see available DAGs run `bqetl dag info`. "
"To create a new DAG run `bqetl dag create`."
),
)
@click.option(
"--depends_on_past",
"--depends-on-past",
help="Only execute query if previous scheduled run succeeded.",
default=False,
type=bool,
)
@click.option(
"--task_name",
"--task-name",
help=(
"Custom name for the Airflow task. By default the task name is a "
"combination of the dataset and table name."
),
)
def schedule(name, sql_dir, project_id, dag, depends_on_past, task_name):
"""CLI command for scheduling a query."""
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
if query_files == []:
click.echo(f"Name doesn't refer to any queries: {name}", err=True)
sys.exit(1)
sql_dir = Path(sql_dir)
dags = DagCollection.from_file(sql_dir.parent / "dags.yaml")
dags_to_be_generated = set()
for query_file in query_files:
try:
metadata = Metadata.of_query_file(query_file)
except FileNotFoundError:
click.echo(f"Cannot schedule {query_file}. No metadata.yaml found.")
continue
if dag:
# check if DAG already exists
existing_dag = dags.dag_by_name(dag)
if not existing_dag:
click.echo(
(
f"DAG {dag} does not exist. "
"To see available DAGs run `bqetl dag info`. "
"To create a new DAG run `bqetl dag create`."
),
err=True,
)
sys.exit(1)
# write scheduling information to metadata file
metadata.scheduling = {}
metadata.scheduling["dag_name"] = dag
if depends_on_past:
metadata.scheduling["depends_on_past"] = depends_on_past
if task_name:
metadata.scheduling["task_name"] = task_name
metadata.write(query_file.parent / METADATA_FILE)
logging.info(
f"Updated {query_file.parent / METADATA_FILE} with scheduling"
" information. For more information about scheduling queries see: "
"https://github.com/mozilla/bigquery-etl#scheduling-queries-in-airflow"
)
# update dags since new task has been added
dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir)
dags_to_be_generated.add(dag)
else:
dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir)
if metadata.scheduling == {}:
click.echo(f"No scheduling information for: {query_file}", err=True)
sys.exit(1)
else:
dags_to_be_generated.add(metadata.scheduling["dag_name"])
# re-run DAG generation for the affected DAG
for d in dags_to_be_generated:
existing_dag = dags.dag_by_name(d)
logging.info(f"Running DAG generation for {existing_dag.name}")
output_dir = sql_dir.parent / "dags"
dags.dag_to_airflow(output_dir, existing_dag)
@query.command(
help="""Get information about all or specific queries.
Examples:
\b
# Get info for specific queries
./bqetl query info telemetry_derived.*
\b
# Get cost and last update timestamp information
./bqetl query info telemetry_derived.clients_daily_v6 \\
--cost --last_updated
""",
)
@click.argument("name", required=False)
@sql_dir_option
@project_id_option()
@click.option("--cost", help="Include information about query costs", is_flag=True)
@click.option(
"--last_updated",
help="Include timestamps when destination tables were last updated",
is_flag=True,
)
@click.pass_context
def info(ctx, name, sql_dir, project_id, cost, last_updated):
"""Return information about all or specific queries."""
if name is None:
name = "*.*"
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
if query_files == []:
# run SQL generators if no matching query has been found
ctx.invoke(
generate_all,
output_dir=ctx.obj["TMP_DIR"],
ignore=["derived_view_schemas", "stable_views"],
)
query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id)
for query_file in query_files:
query_file_path = Path(query_file)
table = query_file_path.parent.name
dataset = query_file_path.parent.parent.name
project = query_file_path.parent.parent.parent.name
try:
metadata = Metadata.of_query_file(query_file)
except FileNotFoundError:
metadata = None
click.secho(f"{project}.{dataset}.{table}", bold=True)
click.echo(f"path: {query_file}")
if metadata is None:
click.echo("No metadata")
else:
click.echo(f"description: {metadata.description}")
click.echo(f"owners: {metadata.owners}")
if metadata.scheduling == {}:
click.echo("scheduling: not scheduled")
else:
click.echo("scheduling:")
click.echo(f" dag_name: {metadata.scheduling['dag_name']}")
if cost or last_updated:
if not is_authenticated():
click.echo(
"Authentication to GCP required for "
"accessing cost and last_updated."
)
else:
client = bigquery.Client()
end_date = date.today().strftime("%Y-%m-%d")
start_date = (date.today() - timedelta(7)).strftime("%Y-%m-%d")
result = client.query(
f"""
SELECT
SUM(cost_usd) AS cost,
MAX(creation_time) AS last_updated
FROM `moz-fx-data-shared-prod.monitoring_derived.bigquery_etl_scheduled_queries_cost_v1`
WHERE submission_date BETWEEN '{start_date}' AND '{end_date}'
AND dataset = '{dataset}'
AND table = '{table}'
""" # noqa E501
).result()
if result.total_rows == 0:
if last_updated:
click.echo("last_updated: never")
if cost:
click.echo("Cost over the last 7 days: none")
for row in result:
if last_updated:
click.echo(f" last_updated: {row.last_updated}")
if cost:
click.echo(
f" Cost over the last 7 days: {round(row.cost, 2)} USD"
)
click.echo("")
def _backfill_query(
query_file_path,
project_id,
date_partition_parameter,
exclude,
max_rows,
dry_run,
no_partition,
args,
partitioning_type,
backfill_date,
destination_table,
):
"""Run a query backfill for a specific date."""
project, dataset, table = extract_from_query_path(query_file_path)
match partitioning_type:
case PartitionType.DAY:
partition = backfill_date.strftime("%Y%m%d")
case PartitionType.MONTH:
partition = backfill_date.strftime("%Y%m")
case _:
raise ValueError(f"Unsupported partitioning type: {partitioning_type}")
backfill_date = backfill_date.strftime("%Y-%m-%d")
if backfill_date not in exclude:
if destination_table is None:
destination_table = f"{project}.{dataset}.{table}"
if not no_partition:
destination_table = f"{destination_table}${partition}"
if not QUALIFIED_TABLE_NAME_RE.match(destination_table):
click.echo(
"Destination table must be named like:" + " <project>.<dataset>.<table>"
)
sys.exit(1)
click.echo(
f"Run backfill for {destination_table} "
f"with @{date_partition_parameter}={backfill_date}"
)
arguments = [
"query",
f"--parameter={date_partition_parameter}:DATE:{backfill_date}",
"--use_legacy_sql=false",
"--replace",
f"--max_rows={max_rows}",
f"--project_id={project_id}",
"--format=none",
] + args
if dry_run:
arguments += ["--dry_run"]
_run_query(
[query_file_path],
project_id=project_id,
dataset_id=dataset,
destination_table=destination_table,
public_project_id=ConfigLoader.get(
"default", "public_project", fallback="mozilla-public-data"
),
query_arguments=arguments,
)
else:
click.echo(
f"Skip {query_file_path} with @{date_partition_parameter}={backfill_date}"
)
return True
@query.command(
help="""Run a backfill for a query. Additional parameters will get passed to bq.
Examples:
\b
# Backfill for specific date range
# second comment line
./bqetl query backfill telemetry_derived.ssl_ratios_v1 \\
--start_date=2021-03-01 \\
--end_date=2021-03-31
\b
# Dryrun backfill for specific date range and exclude date
./bqetl query backfill telemetry_derived.ssl_ratios_v1 \\
--start_date=2021-03-01 \\
--end_date=2021-03-31 \\
--exclude=2021-03-03 \\
--dry_run
""",
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.argument("name")
@sql_dir_option
@project_id_option(required=True)
@click.option(
"--start_date",
"--start-date",
"-s",
help="First date to be backfilled",
type=click.DateTime(formats=["%Y-%m-%d"]),
required=True,
)
@click.option(
"--end_date",
"--end-date",
"-e",
help="Last date to be backfilled",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=str(date.today()),
)
@click.option(
"--exclude",
"-x",
multiple=True,
help="Dates excluded from backfill. Date format: yyyy-mm-dd",
default=[],
)
@click.option(
"--dry_run/--no_dry_run", "--dry-run/--no-dry-run", help="Dry run the backfill"
)
@click.option(
"--max_rows",
"-n",
type=int,
default=100,
help="How many rows to return in the result",
)
@click.option(
"--parallelism",
"-p",
type=int,
default=8,
help="How many threads to run backfill in parallel",
)
@click.option(
"--no_partition",
"--no-partition",
is_flag=True,
default=False,
help="Disable writing results to a partition. Overwrites entire destination table.",
)
@click.option(
"--destination_table",
"--destination-table",
required=False,
help=(
"Destination table name results are written to. "
+ "If not set, determines destination table based on query."
),
)
@click.pass_context
def backfill(
ctx,
name,
sql_dir,
project_id,
start_date,
end_date,
exclude,
dry_run,
max_rows,
parallelism,
no_partition,
destination_table,
):
"""Run a backfill."""
if not is_authenticated():
click.echo(
"Authentication to GCP required. Run `gcloud auth login` "
"and check that the project is set correctly."
)
sys.exit(1)
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
if query_files == []:
# run SQL generators if no matching query has been found
ctx.invoke(
generate_all,
output_dir=ctx.obj["TMP_DIR"],
ignore=["derived_view_schemas", "stable_views", "country_code_lookup"],
)
query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id)
dates = [start_date + timedelta(i) for i in range((end_date - start_date).days + 1)]
for query_file in query_files:
query_file_path = Path(query_file)
depends_on_past = False
date_partition_parameter = "submission_date"
try:
metadata = Metadata.of_query_file(str(query_file_path))
depends_on_past = metadata.scheduling.get(
"depends_on_past", depends_on_past
)
date_partition_parameter = metadata.scheduling.get(
"date_partition_parameter", date_partition_parameter
)
# For backwards compatibility assume partitioning type is day
# in case metadata is missing
if metadata.bigquery:
partitioning_type = metadata.bigquery.time_partitioning.type
else:
partitioning_type = PartitionType.DAY
click.echo(
"Bigquery partitioning type not set. Using PartitionType.DAY"
)
match partitioning_type:
case PartitionType.DAY:
dates = [
start_date + timedelta(i)
for i in range((end_date - start_date).days + 1)
]
case PartitionType.MONTH:
dates = list(
rrule(
freq=MONTHLY,
dtstart=start_date.replace(day=1),
until=end_date,
)
)
# Dates in excluded must be the first day of the month to match `dates`
exclude = [
date.fromisoformat(day).replace(day=1).strftime("%Y-%m-%d")
for day in exclude
]
case _:
raise ValueError(
f"Unsupported partitioning type: {partitioning_type}"
)
except FileNotFoundError:
click.echo(f"No metadata defined for {query_file_path}")
if depends_on_past and exclude != []:
click.echo(
f"Warning: depends_on_past = True for {query_file_path} but the"
f"following dates will be excluded from the backfill: {exclude}"
)
client = bigquery.Client(project=project_id)
try:
project, dataset, table = extract_from_query_path(query_file_path)
client.get_table(f"{project}.{dataset}.{table}")
except NotFound:
ctx.invoke(initialize, name=query_file, dry_run=dry_run)
backfill_query = partial(
_backfill_query,
query_file_path,
project_id,
date_partition_parameter,
exclude,
max_rows,
dry_run,
no_partition,
ctx.args,
partitioning_type,
destination_table=destination_table,
)
if not depends_on_past:
# run backfill for dates in parallel if depends_on_past is false
with Pool(parallelism) as p:
result = p.map(backfill_query, dates, chunksize=1)
if not all(result):
sys.exit(1)
else:
# if data depends on previous runs, then execute backfill sequentially
for backfill_date in dates:
backfill_query(backfill_date)
@query.command(
help="""Run a query. Additional parameters will get passed to bq.<br />
If a destination_table is set, the query result will be written to BigQuery. Without a destination_table specified, the results are not stored.<br />
If the `name` is not found within the `sql/` folder bqetl assumes it hasn't been generated yet
and will start the generating process for all `sql_generators/` files.
This generation process will take some time and run dryrun calls against BigQuery but this is expected. <br />
Additional parameters (all parameters that are not specified in the Options) must come after the query-name.
Otherwise the first parameter that is not an option is interpreted as the query-name and since it can't be found the generation process will start.
Examples:
\b
# Run a query by name
./bqetl query run telemetry_derived.ssl_ratios_v1
\b
# Run a query file
./bqetl query run /path/to/query.sql
\b
# Run a query and save the result to BigQuery
./bqetl query run telemetry_derived.ssl_ratios_v1 \
--project_id=moz-fx-data-shared-prod \
--dataset_id=telemetry_derived \
--destination_table=ssl_ratios_v1
""",
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.argument("name")
@sql_dir_option
@project_id_option()
@click.option(
"--public_project_id",
"--public-project-id",
default=ConfigLoader.get(
"default", "public_project", fallback="mozilla-public-data"
),
help="Project with publicly accessible data",
)
@click.option(
"--destination_table",
"--destination-table",
required=False,
help=(
"Destination table name results are written to. "
+ "If not set, the query result will not be written to BigQuery."
),
)
@click.option(
"--dataset_id",
"--dataset-id",
required=False,
help=(
"Destination dataset results are written to. "
+ "If not set, determines destination dataset based on query."
),
)
@click.pass_context
def run(
ctx,
name,
sql_dir,
project_id,
public_project_id,
destination_table,
dataset_id,
):
"""Run a query."""
if not is_authenticated():
click.echo(
"Authentication to GCP required. Run `gcloud auth login` "
"and check that the project is set correctly."
)
sys.exit(1)
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
if query_files == []:
# run SQL generators if no matching query has been found
ctx.invoke(
generate_all,
output_dir=ctx.obj["TMP_DIR"],
ignore=["derived_view_schemas", "stable_views", "country_code_lookup"],
)
query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id)
_run_query(
query_files,
project_id,
public_project_id,
destination_table,
dataset_id,
ctx.args,
)
def _run_query(
query_files,
project_id,
public_project_id,
destination_table,
dataset_id,
query_arguments,
addl_templates: typing.Optional[dict] = None,
mapped_values=None,
):
client = bigquery.Client(project_id)
"""Run a query."""
if dataset_id is not None:
# dataset ID was parsed by argparse but needs to be passed as parameter
# when running the query
query_arguments.append("--dataset_id={}".format(dataset_id))
if project_id is not None:
query_arguments.append(f"--project_id={project_id}")
if addl_templates is None:
addl_templates = {}
for query_file in query_files:
use_public_table = False
query_file = Path(query_file)
try:
metadata = Metadata.of_query_file(query_file)
if metadata.is_public_bigquery():
if not validate_metadata.validate_public_data(metadata, query_file):
sys.exit(1)
# Change the destination table to write results to the public dataset;
# a view to the public table in the internal dataset is created
# when CI runs
if (
dataset_id is not None
and destination_table is not None
and re.match(DESTINATION_TABLE_RE, destination_table)
):
destination_table = "{}:{}.{}".format(
public_project_id, dataset_id, destination_table
)
query_arguments.append(
"--destination_table={}".format(destination_table)
)
use_public_table = True
else:
print(
"ERROR: Cannot run public dataset query. Parameters"
" --destination_table=<table without dataset ID> and"
" --dataset_id=<dataset> required"
)
sys.exit(1)
except yaml.YAMLError as e:
logging.error(e)
sys.exit(1)
except FileNotFoundError:
logging.warning("No metadata.yaml found for {}", query_file)
if not use_public_table and destination_table is not None:
# destination table was parsed by argparse, however if it wasn't modified to
# point to a public table it needs to be passed as parameter for the query
if re.match(QUALIFIED_TABLE_NAME_RE, destination_table):
project, dataset, table = qualified_table_name_matching(
destination_table
)
destination_table = "{}:{}.{}".format(project, dataset, table)
query_arguments.append("--destination_table={}".format(destination_table))
if bool(list(filter(lambda x: x.startswith("--parameter"), query_arguments))):
# need to do this as parameters are not supported with legacy sql
query_arguments.append("--use_legacy_sql=False")
# this assumed query command should always be passed inside query_arguments
if "query" not in query_arguments:
query_arguments = ["query"] + query_arguments
# write rendered query to a temporary file;
# query string cannot be passed directly to bq as SQL comments will be interpreted as CLI arguments
with tempfile.NamedTemporaryFile(mode="w+") as query_stream:
query_stream.write(
render_template(
query_file.name,
template_folder=str(query_file.parent),
templates_dir="",
format=False,
**addl_templates,
)
)
query_stream.seek(0)
query_content = query_stream.read()
if mapped_values is not None and "{mapped_values}" in query_content:
# Format the query template and run the query.
print(f"Running for {mapped_values}...")
query = query_content.format(mapped_values=mapped_values)
job = client.query(
query=query,
job_config=bigquery.QueryJobConfig(
use_query_cache=False,
use_legacy_sql=False,
),
)
job.result()
else:
# run the query as shell command so that passed parameters can be used as is.
subprocess.check_call(["bq"] + query_arguments, stdin=query_stream)
@query.command(
help="""Run a multipart query.
Examples:
\b
# Run a multipart query
./bqetl query run_multipart /path/to/query.sql
""",
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.argument(
"query_dir",
type=click.Path(file_okay=False),
)
@click.option(
"--using",
default="document_id",
help="comma separated list of join columns to use when combining results",
)
@click.option(
"--parallelism",
default=4,
type=int,
help="Maximum number of queries to execute concurrently",
)
@click.option(
"--dataset_id",
"--dataset-id",
help="Default dataset, if not specified all tables must be qualified with dataset",
)
@project_id_option()
@temp_dataset_option()
@click.option(
"--destination_table",
required=True,
help="table where combined results will be written",
)
@click.option(
"--time_partitioning_field",
type=lambda f: bigquery.TimePartitioning(field=f),
help="time partition field on the destination table",
)
@click.option(
"--clustering_fields",
type=lambda f: f.split(","),
help="comma separated list of clustering fields on the destination table",
)
@click.option(
"--dry_run",
"--dry-run",
is_flag=True,
default=False,
help="Print bytes that would be processed for each part and don't run queries",
)
@click.option(
"--parameters",
"--parameter",
multiple=True,
default=[],
type=lambda p: bigquery.ScalarQueryParameter(*p.split(":", 2)),
metavar="NAME:TYPE:VALUE",
help="query parameter(s) to pass when running parts",
)
@click.option(
"--priority",
default="INTERACTIVE",
type=click.Choice(["BATCH", "INTERACTIVE"]),
help=(
"Priority for BigQuery query jobs; BATCH priority will significantly slow "
"down queries if reserved slots are not enabled for the billing project; "
"defaults to INTERACTIVE"
),
)
@click.option(
"--schema_update_options",
"--schema_update_option",
multiple=True,
type=click.Choice(
[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION,
# Airflow passes an empty string when the field addition date doesn't
# match the run date.
# See https://github.com/mozilla/telemetry-airflow/blob/
# e49fa7e6b3f5ec562dd248d257770c2303cf0cba/dags/utils/gcp.py#L515
"",
]
),
default=[],
help="Optional options for updating the schema.",
)
def run_multipart(
query_dir,
using,
parallelism,
dataset_id,
project_id,
temp_dataset,
destination_table,
time_partitioning_field,
clustering_fields,
dry_run,
parameters,
priority,
schema_update_options,
):
"""Run a multipart query."""
if dataset_id is not None and "." not in dataset_id and project_id is not None:
dataset_id = f"{project_id}.{dataset_id}"
if "." not in destination_table and dataset_id is not None:
destination_table = f"{dataset_id}.{destination_table}"
client = bigquery.Client(project_id)
with ThreadPool(parallelism) as pool:
parts = pool.starmap(
_run_part,
[
(
client,
part,
query_dir,
temp_dataset,
dataset_id,
dry_run,
parameters,
priority,
)
for part in sorted(next(os.walk(query_dir))[2])
if part.startswith("part") and part.endswith(".sql")
],
chunksize=1,
)
if not dry_run:
total_bytes = sum(job.total_bytes_processed for _, job in parts)
query = (
f"SELECT\n *\nFROM\n `{sql_table_id(parts[0][1].destination)}`"
+ "".join(
f"\nFULL JOIN\n `{sql_table_id(job.destination)}`"
f"\nUSING\n ({using})"
for _, job in parts[1:]
)
)
try:
job = client.query(
query=query,
job_config=bigquery.QueryJobConfig(
destination=destination_table,
time_partitioning=time_partitioning_field,
clustering_fields=clustering_fields,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
use_legacy_sql=False,
priority=priority,
schema_update_options=schema_update_options,
),
)
job.result()
logging.info(
f"Processed {job.total_bytes_processed:,d} bytes to combine results"
)
total_bytes += job.total_bytes_processed
logging.info(f"Processed {total_bytes:,d} bytes in total")
finally:
for _, job in parts:
client.delete_table(sql_table_id(job.destination).split("$")[0])
logging.info(f"Deleted {len(parts)} temporary tables")
def _run_part(
client, part, query_dir, temp_dataset, dataset_id, dry_run, parameters, priority
):
"""Run a query part."""
with open(os.path.join(query_dir, part)) as sql_file:
query = sql_file.read()
job_config = bigquery.QueryJobConfig(
destination=temp_dataset.temp_table(),
default_dataset=dataset_id,
use_legacy_sql=False,
dry_run=dry_run,
query_parameters=parameters,
priority=priority,
allow_large_results=True,
)
job = client.query(query=query, job_config=job_config)
if job.dry_run:
logging.info(f"Would process {job.total_bytes_processed:,d} bytes for {part}")
else:
job.result()
logging.info(f"Processed {job.total_bytes_processed:,d} bytes for {part}")
return part, job
@query.command(
help="""Validate a query.
Checks formatting, scheduling information and dry runs the query.
Examples:
./bqetl query validate telemetry_derived.clients_daily_v6
\b
# Validate query not in shared-prod
./bqetl query validate \\
--use_cloud_function=false \\
--project_id=moz-fx-data-marketing-prod \\
ga_derived.blogs_goals_v1
""",
)
@click.argument("name", required=False)
@sql_dir_option
@project_id_option()
@use_cloud_function_option
@click.option(
"--validate_schemas",
"--validate-schemas",
help="Require dry run schema to match destination table and file if present.",
is_flag=True,
default=False,
)
@respect_dryrun_skip_option(default=False)
@no_dryrun_option(default=False)
@click.pass_context
def validate(
ctx,
name,
sql_dir,
project_id,
use_cloud_function,
validate_schemas,
respect_dryrun_skip,
no_dryrun,
):
"""Validate queries by dry running, formatting and checking scheduling configs."""
if name is None:
name = "*.*"
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
dataset_dirs = set()
for query in query_files:
ctx.invoke(format, paths=[str(query)])
if not no_dryrun:
ctx.invoke(
dryrun,
paths=[str(query)],
use_cloud_function=use_cloud_function,
project=project_id,
validate_schemas=validate_schemas,
respect_skip=respect_dryrun_skip,
)
validate_metadata.validate(query.parent)
dataset_dirs.add(query.parent.parent)
if no_dryrun:
click.echo("Dry run skipped for query files.")
for dataset_dir in dataset_dirs:
validate_metadata.validate_datasets(dataset_dir)
def _initialize_in_parallel(
project,
table,
dataset,
query_file,
arguments,
parallelism,
addl_templates,
):
mapped_values = [f"sample_id = {i}" for i in list(range(0, 100))]
with ThreadPool(parallelism) as pool:
start = default_timer()
# Process all subsets in the query in parallel (eg. all sample_ids).
pool.map(
partial(
_run_query,
[query_file],
project,
None,
table,
dataset,
arguments,
addl_templates,
),
mapped_values,
)
print(f"Job completed in {default_timer() - start}")
@query.command(
help="""Create and initialize the destination table for the query.
Only for queries that have an `init.sql` file.
Examples:
./bqetl query initialize telemetry_derived.ssl_ratios_v1
""",
)
@click.argument("name")
@sql_dir_option
@project_id_option()
@click.option(
"--dry_run/--no_dry_run",
"--dry-run/--no-dry-run",
help="Dry run the initialization",
)
@click.pass_context
def initialize(ctx, name, sql_dir, project_id, dry_run):
"""Create the destination table for the provided query."""
if not is_authenticated():
click.echo("Authentication required for creating tables.", err=True)
sys.exit(1)
if Path(name).exists():
# Allow name to be a path
query_files = [Path(name)]
else:
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
if not query_files:
click.echo(
f"Couldn't find directory matching `{name}`. Failed to initialize query.",
err=True,
)
sys.exit(1)
for query_file in query_files:
sql_content = query_file.read_text()
client = bigquery.Client()
# Enable initialization from query.sql files
# Create the table by deploying the schema and metadata, then run the init.
# This does not currently verify the accuracy of the schema or that it
# matches the query.
if "is_init()" in sql_content:
project = query_file.parent.parent.parent.name
dataset = query_file.parent.parent.name
destination_table = query_file.parent.name
full_table_id = f"{project}.{dataset}.{destination_table}"
try:
table = client.get_table(full_table_id)
except NotFound:
table = bigquery.Table(full_table_id)
if table.created:
raise PreconditionFailed(
f"Table {full_table_id} already exists. The initialization process is terminated."
)
ctx.invoke(deploy, name=full_table_id, force=True)
arguments = [
"query",
"--use_legacy_sql=false",
"--replace",
"--format=none",
]
if dry_run:
arguments += ["--dry_run"]
if "parallel_run" in sql_content:
_initialize_in_parallel(
project=project,
table=destination_table,
dataset=dataset,
query_file=query_file,
arguments=arguments,
parallelism=DEFAULT_PARALLELISM,
addl_templates={
"is_init": lambda: True,
"parallel_run": lambda: True,
},
)
else:
_run_query(
project_id=project,
public_project_id=None,
destination_table=destination_table,
dataset_id=dataset,
query_arguments=arguments,
addl_templates={
"is_init": lambda: True,
},
)
else:
init_files = Path(query_file.parent).rglob("init.sql")
for init_file in init_files:
project = init_file.parent.parent.parent.name
with open(init_file) as init_file_stream:
init_sql = init_file_stream.read()
dataset = Path(init_file).parent.parent.name
destination_table = query_file.parent.name
job_config = bigquery.QueryJobConfig(
dry_run=dry_run,
default_dataset=f"{project}.{dataset}",
destination=f"{project}.{dataset}.{destination_table}",
)
if "CREATE MATERIALIZED VIEW" in init_sql:
click.echo(f"Create materialized view for {init_file}")
# existing materialized view have to be deleted before re-creation
view_name = query_file.parent.name
client.delete_table(
f"{project}.{dataset}.{view_name}", not_found_ok=True
)
else:
click.echo(f"Create destination table for {init_file}")
job = client.query(init_sql, job_config=job_config)
if not dry_run:
job.result()
@query.command(
help="""Render a query Jinja template.
Examples:
./bqetl query render telemetry_derived.ssl_ratios_v1 \\
--output-dir=/tmp
""",
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.argument("name")
@sql_dir_option
@click.option(
"--output-dir",
"--output_dir",
help="Output directory generated SQL is written to. "
+ "If not specified, rendered queries are printed to console.",
type=click.Path(file_okay=False),
required=False,
)
def render(name, sql_dir, output_dir):
"""Render a query Jinja template."""
if name is None:
name = "*.*"
query_files = paths_matching_name_pattern(name, sql_dir, project_id=None)
resolved_sql_dir = Path(sql_dir).resolve()
for query_file in query_files:
table_name = query_file.parent.name
dataset_id = query_file.parent.parent.name
project_id = query_file.parent.parent.parent.name
jinja_params = {
**{
"project_id": project_id,
"dataset_id": dataset_id,
"table_name": table_name,
},
}
rendered_sql = (
render_template(
query_file.name,
template_folder=query_file.parent,
templates_dir="",
format=False,
**jinja_params,
)
+ "\n"
)
if not any(s in str(query_file) for s in skip_format()):
rendered_sql = reformat(rendered_sql, trailing_newline=True)
if output_dir:
output_file = output_dir / query_file.resolve().relative_to(
resolved_sql_dir
)
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text(rendered_sql)
else:
click.echo(query_file)
click.echo(rendered_sql)
def _parse_partition_setting(partition_date):
params = partition_date.split(":")
if len(params) != 3:
return None
# Check date format
try:
datetime.datetime.strptime(params[2], "%Y-%m-%d").date()
except ValueError:
return None
# Check column name
if re.match(r"^\w+$", params[0]):
return {params[0]: params[2]}
def _validate_partition_date(ctx, param, partition_date):
"""Process the CLI parameter check_date and set the parameter for BigQuery."""
# Will be None if launched from Airflow. Also ctx.args is not populated at this stage.
if partition_date:
parsed = _parse_partition_setting(partition_date)
if parsed is None:
raise click.BadParameter("Format must be <column-name>::<yyyy-mm-dd>")
return parsed
return None
def _parse_check_output(output: str) -> str:
output = output.replace("\n", " ")
if "ETL Data Check Failed:" in output:
return f"ETL Data Check Failed:{output.split('ETL Data Check Failed:')[1]}"
return output
@query.group(help="Commands for managing query schemas.")
def schema():
"""Create the CLI group for the query schema command."""
pass
@schema.command(
help="""
Update the query schema based on the destination table schema and the query schema.
If no schema.yaml file exists for a query, one will be created.
Examples:
./bqetl query schema update telemetry_derived.clients_daily_v6
# Update schema including downstream dependencies (requires GCP)
./bqetl query schema update telemetry_derived.clients_daily_v6 --update-downstream
""",
)
@click.argument("name")
@sql_dir_option
@click.option(
"--project-id",
"--project_id",
help="GCP project ID",
default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"),
callback=is_valid_project,
)
@click.option(
"--update-downstream",
"--update_downstream",
help="Update downstream dependencies. GCP authentication required.",
default=False,
is_flag=True,
)
@click.option(
"--tmp-dataset",
"--tmp_dataset",
help="GCP datasets for creating updated tables temporarily.",
default="tmp",
)
@use_cloud_function_option
@respect_dryrun_skip_option(default=True)
@parallelism_option
def update(
name,
sql_dir,
project_id,
update_downstream,
tmp_dataset,
use_cloud_function,
respect_dryrun_skip,
parallelism,
):
"""CLI command for generating the query schema."""
if not is_authenticated():
click.echo(
"Authentication to GCP required. Run `gcloud auth login` "
"and check that the project is set correctly."
)
sys.exit(1)
query_files = paths_matching_name_pattern(
name, sql_dir, project_id, files=["query.sql"]
)
dependency_graph = get_dependency_graph([sql_dir], without_views=True)
manager = multiprocessing.Manager()
tmp_tables = manager.dict({})
# order query files to make sure derived_from dependencies are resolved
query_file_graph = {}
for query_file in query_files:
query_file_graph[query_file] = []
try:
metadata = Metadata.of_query_file(str(query_file))
if metadata and metadata.schema and metadata.schema.derived_from:
for derived_from in metadata.schema.derived_from:
parent_queries = [
query
for query in paths_matching_name_pattern(
".".join(derived_from.table), sql_dir, project_id
)
]
if len(parent_queries) > 0:
query_file_graph[query_file].append(parent_queries[0])
except FileNotFoundError:
query_file_graph[query_file] = []
ts = ParallelTopologicalSorter(
query_file_graph, parallelism=parallelism, with_follow_up=update_downstream
)
ts.map(
partial(
_update_query_schema_with_downstream,
sql_dir,
project_id,
tmp_dataset,
dependency_graph,
tmp_tables,
use_cloud_function,
respect_dryrun_skip,
update_downstream,
)
)
if len(tmp_tables) > 0:
client = bigquery.Client()
# delete temporary tables
for _, table in tmp_tables.items():
client.delete_table(table, not_found_ok=True)
def _update_query_schema_with_downstream(
sql_dir,
project_id,
tmp_dataset,
dependency_graph,
tmp_tables={},
use_cloud_function=True,
respect_dryrun_skip=True,
update_downstream=False,
query_file=None,
follow_up_queue=None,
):
try:
changed = _update_query_schema(
query_file,
sql_dir,
project_id,
tmp_dataset,
tmp_tables,
use_cloud_function,
respect_dryrun_skip,
)
if update_downstream:
# update downstream dependencies
if changed:
if not is_authenticated():
click.echo(
"Cannot update downstream dependencies."
"Authentication to GCP required. Run `gcloud auth login` "
"and check that the project is set correctly."
)
sys.exit(1)
project, dataset, table = extract_from_query_path(query_file)
identifier = f"{project}.{dataset}.{table}"
tmp_identifier = f"{project}.{tmp_dataset}.{table}_{random_str(12)}"
# create temporary table with updated schema
if identifier not in tmp_tables:
schema = Schema.from_schema_file(query_file.parent / SCHEMA_FILE)
schema.deploy(tmp_identifier)
tmp_tables[identifier] = tmp_identifier
# get downstream dependencies that will be updated in the next iteration
dependencies = [
p
for k, refs in dependency_graph.items()
for p in paths_matching_name_pattern(
k, sql_dir, project_id, files=("query.sql",)
)
if identifier in refs
]
for d in dependencies:
click.echo(f"Update downstream dependency schema for {d}")
if follow_up_queue:
follow_up_queue.put(d)
except Exception:
print_exc()
def _update_query_schema(
query_file,
sql_dir,
project_id,
tmp_dataset,
tmp_tables={},
use_cloud_function=True,
respect_dryrun_skip=True,
):
"""
Update the schema of a specific query file.
Return True if the schema changed, False if it is unchanged.
"""
if respect_dryrun_skip and str(query_file) in DryRun.skipped_files():
click.echo(f"{query_file} dry runs are skipped. Cannot update schemas.")
return
tmp_tables = copy.deepcopy(tmp_tables)
query_file_path = Path(query_file)
existing_schema_path = query_file_path.parent / SCHEMA_FILE
project_name, dataset_name, table_name = extract_from_query_path(query_file_path)
try:
metadata = Metadata.of_query_file(str(query_file_path))
except FileNotFoundError:
metadata = None
click.echo(f"No metadata defined for {query_file_path}")
# pull in updates from parent schemas
if metadata and metadata.schema and metadata.schema.derived_from:
for derived_from in metadata.schema.derived_from:
parent_queries = [
query
for query in paths_matching_name_pattern(
".".join(derived_from.table), sql_dir, project_id
)
]
if len(parent_queries) == 0:
click.echo(
f"derived_from query {derived_from.table} does not exist.",
err=True,
)
else:
parent_schema = Schema.from_schema_file(
parent_queries[0].parent / SCHEMA_FILE
)
parent_project, parent_dataset, parent_table = extract_from_query_path(
parent_queries[0]
)
parent_identifier = f"{parent_project}.{parent_dataset}.{parent_table}"
if parent_identifier not in tmp_tables:
tmp_parent_identifier = (
f"{parent_project}.{tmp_dataset}.{parent_table}_"
+ random_str(12)
)
parent_schema.deploy(tmp_parent_identifier)
tmp_tables[parent_identifier] = tmp_parent_identifier
if existing_schema_path.is_file():
existing_schema = Schema.from_schema_file(existing_schema_path)
else:
existing_schema = Schema.empty()
existing_schema.merge(parent_schema, exclude=derived_from.exclude)
# use temporary table
tmp_identifier = (
f"{project_name}.{tmp_dataset}.{table_name}_{random_str(12)}"
)
existing_schema.deploy(tmp_identifier)
tmp_tables[
f"{project_name}.{dataset_name}.{table_name}"
] = tmp_identifier
existing_schema.to_yaml_file(existing_schema_path)
# replace temporary table references
sql_content = render_template(
query_file_path.name,
template_folder=str(query_file_path.parent),
templates_dir="",
format=False,
)
for orig_table, tmp_table in tmp_tables.items():
table_parts = orig_table.split(".")
for i in range(len(table_parts)):
if ".".join(table_parts[i:]) in sql_content:
sql_content = sql_content.replace(".".join(table_parts[i:]), tmp_table)
break
query_schema = None
try:
query_schema = Schema.from_query_file(
query_file_path,
content=sql_content,
use_cloud_function=use_cloud_function,
respect_skip=respect_dryrun_skip,
)
except Exception:
if not existing_schema_path.exists():
click.echo(
click.style(
f"Cannot automatically update {query_file_path}. "
f"Please update {query_file_path / SCHEMA_FILE} manually.",
fg="red",
),
err=True,
)
return
# update bigquery metadata
try:
client = bigquery.Client()
table = client.get_table(f"{project_name}.{dataset_name}.{table_name}")
metadata_file_path = query_file_path.parent / METADATA_FILE
if (
table.time_partitioning
and metadata
and (
metadata.bigquery is None or metadata.bigquery.time_partitioning is None
)
):
metadata.set_bigquery_partitioning(
field=table.time_partitioning.field,
partition_type=table.time_partitioning.type_.lower(),
required=table.time_partitioning.require_partition_filter,
expiration_days=table.time_partitioning.expiration_ms / 86400000.0
if table.time_partitioning.expiration_ms
else None,
)
click.echo(f"Partitioning metadata added to {metadata_file_path}")
if (
table.clustering_fields
and metadata
and (metadata.bigquery is None or metadata.bigquery.clustering is None)
):
metadata.set_bigquery_clustering(table.clustering_fields)
click.echo(f"Clustering metadata added to {metadata_file_path}")
if metadata:
metadata.write(metadata_file_path)
except NotFound:
click.echo(
f"Destination table {project_name}.{dataset_name}.{table_name} "
"does not exist in BigQuery. Run bqetl query schema deploy "
"<dataset>.<table> to create the destination table."
)
except FileNotFoundError:
click.echo(
f"No metadata file for {project_name}.{dataset_name}.{table_name}."
" Skip schema update."
)
return
partitioned_by = None
try:
metadata = Metadata.of_query_file(query_file_path)
if metadata.bigquery and metadata.bigquery.time_partitioning:
partitioned_by = metadata.bigquery.time_partitioning.field
except FileNotFoundError:
pass
table_schema = Schema.for_table(
project_name,
dataset_name,
table_name,
partitioned_by,
use_cloud_function=use_cloud_function,
respect_skip=respect_dryrun_skip,
)
changed = True
if existing_schema_path.is_file():
existing_schema = Schema.from_schema_file(existing_schema_path)
old_schema = copy.deepcopy(existing_schema)
if table_schema:
existing_schema.merge(table_schema)
if query_schema:
existing_schema.merge(query_schema)
existing_schema.to_yaml_file(existing_schema_path)
changed = not existing_schema.equal(old_schema)
else:
query_schema.merge(table_schema)
query_schema.to_yaml_file(existing_schema_path)
click.echo(f"Schema {existing_schema_path} updated.")
return changed
@schema.command(
help="""Deploy the query schema.
Examples:
./bqetl query schema deploy telemetry_derived.clients_daily_v6
""",
)
@click.argument("name")
@sql_dir_option
@click.option(
"--project-id",
"--project_id",
help="GCP project ID",
default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"),
callback=is_valid_project,
)
@click.option(
"--force/--noforce",
help="Deploy the schema file without validating that it matches the query",
default=False,
)
@use_cloud_function_option
@respect_dryrun_skip_option(default=True)
@click.option(
"--skip-existing",
"--skip_existing",
help="Skip updating existing tables. "
+ "This option ensures that only new tables get deployed.",
default=False,
is_flag=True,
)
@click.option(
"--skip-external-data",
"--skip_external_data",
help="Skip publishing external data, such as Google Sheets.",
default=False,
is_flag=True,
)
@click.option(
"--destination_table",
"--destination-table",
required=False,
help=(
"Destination table name results are written to. "
+ "If not set, determines destination table based on query. "
+ "Must be fully qualified (project.dataset.table)."
),
)
@parallelism_option
@click.pass_context
def deploy(
ctx,
name,
sql_dir,
project_id,
force,
use_cloud_function,
respect_dryrun_skip,
skip_existing,
skip_external_data,
destination_table,
parallelism,
):
"""CLI command for deploying destination table schemas."""
if not is_authenticated():
click.echo(
"Authentication to GCP required. Run `gcloud auth login` "
"and check that the project is set correctly."
)
sys.exit(1)
client = bigquery.Client()
query_files = paths_matching_name_pattern(name, sql_dir, project_id, ["query.*"])
if not query_files:
# run SQL generators if no matching query has been found
ctx.invoke(
generate_all,
output_dir=ctx.obj["TMP_DIR"],
ignore=["derived_view_schemas", "stable_views"],
)
query_files = paths_matching_name_pattern(
name, ctx.obj["TMP_DIR"], project_id, ["query.*"]
)
def _deploy(query_file):
if respect_dryrun_skip and str(query_file) in DryRun.skipped_files():
click.echo(f"{query_file} dry runs are skipped. Cannot validate schemas.")
return
query_file_path = Path(query_file)
existing_schema_path = query_file_path.parent / SCHEMA_FILE
if not existing_schema_path.is_file():
click.echo(f"No schema file found for {query_file}")
return
try:
table_name = query_file_path.parent.name
dataset_name = query_file_path.parent.parent.name
project_name = query_file_path.parent.parent.parent.name
if destination_table:
full_table_id = destination_table
else:
full_table_id = f"{project_name}.{dataset_name}.{table_name}"
existing_schema = Schema.from_schema_file(existing_schema_path)
if not force and str(query_file_path).endswith("query.sql"):
query_schema = Schema.from_query_file(
query_file_path,
use_cloud_function=use_cloud_function,
respect_skip=respect_dryrun_skip,
)
if not existing_schema.equal(query_schema):
click.echo(
f"Query {query_file_path} does not match "
f"schema in {existing_schema_path}. "
f"To update the local schema file, "
f"run `./bqetl query schema update "
f"{dataset_name}.{table_name}`",
err=True,
)
sys.exit(1)
with NamedTemporaryFile(suffix=".json") as tmp_schema_file:
existing_schema.to_json_file(Path(tmp_schema_file.name))
bigquery_schema = client.schema_from_json(tmp_schema_file.name)
try:
table = client.get_table(full_table_id)
except NotFound:
table = bigquery.Table(full_table_id)
table.schema = bigquery_schema
_attach_metadata(query_file_path, table)
if not table.created:
client.create_table(table)
click.echo(f"Destination table {full_table_id} created.")
elif not skip_existing:
client.update_table(
table,
[
"schema",
"friendly_name",
"description",
"time_partitioning",
"clustering_fields",
"labels",
],
)
click.echo(f"Schema (and metadata) updated for {full_table_id}.")
except Exception:
print_exc()
return query_file
with ThreadPool(parallelism) as pool:
failed_deploys = [r for r in pool.map(_deploy, query_files) if r]
if not skip_external_data:
failed_external_deploys = _deploy_external_data(
name, sql_dir, project_id, skip_existing
)
failed_deploys += failed_external_deploys
if len(failed_deploys) > 0:
click.echo("The following tables could not be deployed:")
for failed_deploy in failed_deploys:
click.echo(failed_deploy)
sys.exit(1)
click.echo("All tables have been deployed.")
def _attach_metadata(query_file_path: Path, table: bigquery.Table) -> None:
"""Add metadata from query file's metadata.yaml to table object."""
try:
metadata = Metadata.of_query_file(query_file_path)
except FileNotFoundError:
return
table.description = metadata.description
table.friendly_name = metadata.friendly_name
if metadata.bigquery and metadata.bigquery.time_partitioning:
table.time_partitioning = bigquery.TimePartitioning(
metadata.bigquery.time_partitioning.type.bigquery_type,
field=metadata.bigquery.time_partitioning.field,
require_partition_filter=(
metadata.bigquery.time_partitioning.require_partition_filter
),
expiration_ms=metadata.bigquery.time_partitioning.expiration_ms,
)
if metadata.bigquery and metadata.bigquery.clustering:
table.clustering_fields = metadata.bigquery.clustering.fields
# BigQuery only allows for string type labels with specific requirements to be published:
# https://cloud.google.com/bigquery/docs/labels-intro#requirements
if metadata.labels:
table.labels = {
key: value
for key, value in metadata.labels.items()
if isinstance(value, str)
}
def _deploy_external_data(
name,
sql_dir,
project_id,
skip_existing,
) -> list:
"""Publish external data tables."""
# whether a table should be created from external data is defined in the metadata
metadata_files = paths_matching_name_pattern(
name, sql_dir, project_id, ["metadata.yaml"]
)
client = bigquery.Client()
failed_deploys = []
for metadata_file_path in metadata_files:
metadata = Metadata.from_file(metadata_file_path)
if not metadata.external_data:
# skip all tables that are not created from external data
continue
existing_schema_path = metadata_file_path.parent / SCHEMA_FILE
if not existing_schema_path.is_file():
# tables created from external data must specify a schema
click.echo(f"No schema file found for {metadata_file_path}")
continue
try:
table_name = metadata_file_path.parent.name
dataset_name = metadata_file_path.parent.parent.name
project_name = metadata_file_path.parent.parent.parent.name
full_table_id = f"{project_name}.{dataset_name}.{table_name}"
existing_schema = Schema.from_schema_file(existing_schema_path)
try:
table = client.get_table(full_table_id)
except NotFound:
table = bigquery.Table(full_table_id)
with NamedTemporaryFile(suffix=".json") as tmp_schema_file:
existing_schema.to_json_file(Path(tmp_schema_file.name))
bigquery_schema = client.schema_from_json(tmp_schema_file.name)
table.schema = bigquery_schema
_attach_metadata(metadata_file_path, table)
if not table.created:
if metadata.external_data.format in (
ExternalDataFormat.GOOGLE_SHEETS,
ExternalDataFormat.CSV,
):
external_config = bigquery.ExternalConfig(
metadata.external_data.format.value.upper()
)
external_config.source_uris = metadata.external_data.source_uris
external_config.ignore_unknown_values = True
external_config.autodetect = False
for key, v in metadata.external_data.options.items():
setattr(external_config.options, key, v)
table.external_data_configuration = external_config
table = client.create_table(table)
click.echo(f"Destination table {full_table_id} created.")
else:
click.echo(
f"External data format {metadata.external_data.format} unsupported."
)
elif not skip_existing:
client.update_table(
table,
[
"schema",
"friendly_name",
"description",
"labels",
],
)
click.echo(f"Schema (and metadata) updated for {full_table_id}.")
except Exception:
print_exc()
failed_deploys.append(metadata_file_path)
return failed_deploys
def _validate_schema_from_path(
query_file_path, use_cloud_function=True, respect_dryrun_skip=True
):
"""Dry Runs and validates a query schema from its path."""
return (
DryRun(
query_file_path,
use_cloud_function=use_cloud_function,
respect_skip=respect_dryrun_skip,
).validate_schema(),
query_file_path,
)
@schema.command(
help="""Validate the query schema
Examples:
./bqetl query schema validate telemetry_derived.clients_daily_v6
""",
name="validate",
)
@click.argument("name")
@sql_dir_option
@click.option(
"--project-id",
"--project_id",
help="GCP project ID",
default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"),
callback=is_valid_project,
)
@use_cloud_function_option
@respect_dryrun_skip_option(default=True)
@click.pass_context
def validate_schema(
ctx, name, sql_dir, project_id, use_cloud_function, respect_dryrun_skip
):
"""Validate the defined query schema with the query and the destination table."""
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
if query_files == []:
# run SQL generators if no matching query has been found
ctx.invoke(
generate_all,
output_dir=ctx.obj["TMP_DIR"],
ignore=["derived_view_schemas", "stable_views"],
)
query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id)
_validate_schema = partial(
_validate_schema_from_path,
use_cloud_function=use_cloud_function,
respect_dryrun_skip=respect_dryrun_skip,
)
with Pool(8) as p:
result = p.map(_validate_schema, query_files, chunksize=1)
all_valid = True
for is_valid, query_file_path in result:
if is_valid is False:
if all_valid:
click.echo("\nSchemas for the following queries are invalid:")
all_valid = False
click.echo(query_file_path)
if not all_valid:
sys.exit(1)
else:
click.echo("\nAll schemas are valid.")