bigquery_etl/cli/view.py (337 lines of code) (raw):

"""bigquery-etl CLI view command.""" import logging import re import string import sys from fnmatch import fnmatchcase from functools import partial from graphlib import TopologicalSorter from multiprocessing.pool import Pool, ThreadPool from traceback import print_exc import rich_click as click from google.cloud import bigquery from ..cli.utils import ( parallelism_option, paths_matching_name_pattern, project_id_option, respect_dryrun_skip_option, sql_dir_option, ) from ..config import ConfigLoader from ..dryrun import DryRun, get_credentials, get_id_token from ..metadata.parse_metadata import METADATA_FILE, Metadata from ..util.bigquery_id import sql_table_id from ..util.client_queue import ClientQueue from ..view import View, broken_views VIEW_NAME_RE = re.compile(r"(?P<dataset>[a-zA-z0-9_]+)\.(?P<name>[a-zA-z0-9_]+)") @click.group(help="Commands for managing views.") def view(): """Create the CLI group for the view command.""" pass @view.command( help="""Create a new view with name <dataset>.<view_name>, for example: telemetry_derived.active_profiles. Use the `--project_id` option to change the project the view is added to; default is `moz-fx-data-shared-prod`. Examples: \b ./bqetl view create telemetry_derived.deviations \\ --owner=example@mozilla.com """, ) @click.argument("name") @sql_dir_option @project_id_option( ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod") ) @click.option( "--owner", "-o", help="Owner of the query (email address)", default="example@mozilla.com", ) def create(name, sql_dir, project_id, owner): """Create a new view.""" # with dataset metadata try: match = VIEW_NAME_RE.match(name) name = match.group("name") dataset = match.group("dataset") except AttributeError: click.echo("New views must be named like: <dataset>.<view>") sys.exit(1) view = View.create(project_id, dataset, name, sql_dir) metadata = Metadata( friendly_name=string.capwords(name.replace("_", " ")), description="Please provide a description for the view", owners=[owner], ) metadata.write(view.path.parent / METADATA_FILE) click.echo(f"Created new view {view.path}") @view.command( help="""Validate a view. Checks formatting, naming, references and dry runs the view. Examples: ./bqetl view validate telemetry.clients_daily """ ) @click.argument("name", required=False) @sql_dir_option @project_id_option(default=None) @parallelism_option() def validate( name, sql_dir, project_id, parallelism, ): """Validate the view definition.""" view_files = paths_matching_name_pattern( name, sql_dir, project_id, files=("view.sql",) ) id_token = get_id_token() views = [View.from_file(f, id_token=id_token) for f in view_files] with Pool(parallelism) as p: result = p.map(_view_is_valid, views) if not all(result): sys.exit(1) click.echo("All views are valid.") def _view_is_valid(v: View) -> bool: return v.is_valid() @view.command( help="""Publish views. Examples: # Publish all views ./bqetl view publish # Publish a specific view ./bqetl view publish telemetry.clients_daily """ ) @click.argument("name", required=False) @sql_dir_option @project_id_option(default=None) @click.option( "--target-project", help=( "If specified, create views in the target project rather than" " the project specified in the file. Only views for " " moz-fx-data-shared-prod will be published if this is set." ), ) @click.option("--log-level", default="INFO", help="Defaults to INFO") @parallelism_option() @click.option( "--dry_run", "--dry-run", is_flag=True, help="Validate view definitions, but do not publish them.", ) @click.option( "--user-facing-only", "--user_facing_only", is_flag=True, help=( "Publish user-facing views only. User-facing views are views" " part of datasets without suffixes (such as telemetry," " but not telemetry_derived)." ), ) @click.option( "--skip-authorized", "--skip_authorized", is_flag=True, help="Don't publish views with labels: {authorized: true} in metadata.yaml", ) @click.option( "--force", is_flag=True, help="Publish views even if there are no changes to the view query", ) @click.option( "--add-managed-label", "--add_managed_label", is_flag=True, help=( 'Add a label "managed" to views, that can be used to remove views from BigQuery' " when they are removed from --sql-dir." ), ) @respect_dryrun_skip_option(default=False) def publish( name, sql_dir, project_id, target_project, log_level, parallelism, dry_run, user_facing_only, skip_authorized, force, add_managed_label, respect_dryrun_skip, ): """Publish views.""" # set log level try: logging.basicConfig(level=log_level, format="%(levelname)s %(message)s") except ValueError as e: raise click.ClickException(f"argument --log-level: {e}") credentials = get_credentials() views = _collect_views(name, sql_dir, project_id, user_facing_only, skip_authorized) if respect_dryrun_skip: views = [view for view in views if view.path not in DryRun.skipped_files()] if add_managed_label: for view in views: view.labels["managed"] = "" if not force: has_changes = partial(_view_has_changes, target_project, credentials) # only views with changes with Pool(parallelism) as p: changes = p.map(has_changes, views) views = [v for v, has_changes in zip(views, changes) if has_changes] views_by_id = {v.view_identifier: v for v in views} view_id_graph = { view.view_identifier: { ref for ref in view.table_references if ref in views_by_id } for view in views } view_id_order = TopologicalSorter(view_id_graph).static_order() client = bigquery.Client(credentials=credentials) result = [] for view_id in view_id_order: try: result.append(views_by_id[view_id].publish(target_project, dry_run, client)) except Exception: print(f"Failed to publish view: {view_id}") print_exc() result.append(False) if not all(result): sys.exit(1) click.echo("All have been published.") def _view_has_changes(target_project, credentials, view): return view.has_changes(target_project, credentials) def _collect_views(name, sql_dir, project_id, user_facing_only, skip_authorized): view_files = paths_matching_name_pattern( name, sql_dir, project_id, files=("view.sql",) ) id_token = get_id_token() views = [View.from_file(f, id_token=id_token) for f in view_files] if user_facing_only: views = [v for v in views if v.is_user_facing] if skip_authorized: views = [ v for v in views if not ( v.metadata and v.metadata.labels # labels with boolean true are translated to "" and v.metadata.labels.get("authorized") == "" ) ] return views @view.command( help="""Remove managed views that are not present in the sql dir. Examples: # Clean managed views in shared prod ./bqetl view clean --target-project=moz-fx-data-shared-prod --skip-authorized # Clean managed user facing views in mozdata ./bqetl view clean --target-project=mozdata --user-facing-only --skip-authorized """ ) @click.argument("name", required=False) @sql_dir_option @project_id_option(default=None) @click.option( "--target-project", help=( "If specified, clean views in the target project rather than" " the project specified in the file. Only views for " " moz-fx-data-shared-prod will be included if this is set." ), ) @click.option("--log-level", default="INFO", help="Defaults to INFO") @parallelism_option() @click.option( "--dry_run", "--dry-run", is_flag=True, help="Identify views to delete, but do not delete them.", ) @click.option( "--user-facing-only", "--user_facing_only", is_flag=True, help=( "Remove user-facing views only. User-facing views are views" " part of datasets without suffixes (such as telemetry," " but not telemetry_derived)." ), ) @click.option( "--skip-authorized", "--skip_authorized", is_flag=True, help="Don't publish views with labels: {authorized: true} in metadata.yaml", ) def clean( name, sql_dir, project_id, target_project, log_level, parallelism, dry_run, user_facing_only, skip_authorized, ): """Clean managed views.""" # set log level try: logging.basicConfig(level=log_level, format="%(levelname)s %(message)s") except ValueError as e: raise click.ClickException(f"argument --log-level: {e}") if project_id is None and target_project is None: raise click.ClickException("command requires --project-id or --target-project") expected_view_ids = { view.target_view_identifier(target_project) for view in _collect_views( name, sql_dir, project_id, user_facing_only, skip_authorized ) } client_q = ClientQueue([project_id], parallelism) with client_q.client() as client: datasets = [ dataset for dataset in client.list_datasets(target_project) if not user_facing_only or not dataset.dataset_id.endswith( tuple( ConfigLoader.get( "default", "non_user_facing_dataset_suffixes", fallback=[] ) ) ) ] with ThreadPool(parallelism) as p: managed_view_ids = { view for views in p.starmap( client_q.with_client, ( (_list_managed_views, dataset, name, skip_authorized) for dataset in datasets ), chunksize=1, ) for view in views } remove_view_ids = sorted(managed_view_ids - expected_view_ids) p.starmap( client_q.with_client, ((_remove_view, view_id, dry_run) for view_id in remove_view_ids), chunksize=1, ) def _list_managed_views(client, dataset, pattern, skip_authorized): query = f""" SELECT table_catalog || "." || table_schema || "." || table_name AS table_id, CONTAINS_SUBSTR(option_value, 'STRUCT("authorized", "")') AS is_authorized FROM `{dataset.project}.{dataset.dataset_id}.INFORMATION_SCHEMA.VIEWS` INNER JOIN `{dataset.project}.{dataset.dataset_id}.INFORMATION_SCHEMA.TABLE_OPTIONS` USING (table_catalog, table_schema, table_name) WHERE option_name = "labels" AND CONTAINS_SUBSTR(option_value, 'STRUCT("managed", "")') """ # running a query against information schema instead of using the API to list tables is much faster job = client.query(query) result = list(job.result()) return [ row.table_id for row in result if (pattern is None or fnmatchcase(sql_table_id(row.table_id), f"*{pattern}")) and (not skip_authorized or not row.is_authorized) ] def _remove_view(client, view_id, dry_run): if dry_run: click.echo(f"Would delete {view_id}") else: click.echo(f"Deleting {view_id}") client.delete_table(view_id) @view.command( help="""List broken views. Examples: # Publish all views ./bqetl view list-broken # Publish a specific view ./bqetl view list-broken --only telemetry """ ) @project_id_option() @parallelism_option() @click.option( "--only", "-o", help="Process only the given tables", ) @click.option( "--log-level", "--log_level", help="Log level.", default=logging.getLevelName(logging.INFO), type=str.upper, ) def list_broken(project_id, parallelism, only, log_level): """List broken views.""" broken_views.list_broken_views(project_id, parallelism, only, log_level)