sql_generators/glean_usage/__init__.py (165 lines of code) (raw):

"""GLEAN Usage.""" from functools import cache, partial from pathlib import Path import click from pathos.multiprocessing import ProcessingPool from bigquery_etl.cli.utils import ( is_valid_project, table_matches_patterns, use_cloud_function_option, ) from bigquery_etl.config import ConfigLoader from bigquery_etl.dryrun import get_id_token from sql_generators.glean_usage import ( baseline_clients_daily, baseline_clients_first_seen, baseline_clients_last_seen, clients_last_seen_joined, event_error_monitoring, event_flow_monitoring, event_monitoring_live, events_stream, events_unnested, glean_app_ping_views, metrics_clients_daily, metrics_clients_last_seen, ) from sql_generators.glean_usage.common import get_app_info, list_tables # list of methods for generating queries GLEAN_TABLES = [ glean_app_ping_views.GleanAppPingViews(), baseline_clients_daily.BaselineClientsDailyTable(), baseline_clients_first_seen.BaselineClientsFirstSeenTable(), baseline_clients_last_seen.BaselineClientsLastSeenTable(), events_unnested.EventsUnnestedTable(), metrics_clients_daily.MetricsClientsDaily(), metrics_clients_last_seen.MetricsClientsLastSeen(), clients_last_seen_joined.ClientsLastSeenJoined(), event_monitoring_live.EventMonitoringLive(), event_error_monitoring.EventErrorMonitoring(), event_flow_monitoring.EventFlowMonitoring(), events_stream.EventsStreamTable(), ] @click.command() @click.option( "--target-project", "--target_project", help="GCP project ID", default="moz-fx-data-shared-prod", callback=is_valid_project, ) @click.option( "--output-dir", "--output_dir", help="Output directory generated SQL is written to", type=click.Path(file_okay=False), default="sql", ) @click.option( "--parallelism", "-p", help="Maximum number of tasks to execute concurrently", default=8, ) @click.option( "--except", "-x", "exclude", help="Process all tables except for the given tables", ) @click.option( "--only", "-o", help="Process only the given tables", ) @click.option( "--app_name", "--app-name", help="App to generate per-app dataset metadata and union views for.", ) @use_cloud_function_option def generate( target_project, output_dir, parallelism, exclude, only, app_name, use_cloud_function ): """Generate per-app_id queries and views, and per-app dataset metadata and union views. Note that a file won't be generated if a corresponding file is already present in the target directory, which allows manual overrides of generated files by checking them into the sql/ tree of the default branch of the repository. """ table_filter = partial(table_matches_patterns, "*", False) if only: table_filter = partial(table_matches_patterns, only, False) elif exclude: table_filter = partial(table_matches_patterns, exclude, True) @cache def get_tables(table_name="baseline_v1"): baseline_tables = list_tables( project_id=target_project, only_tables=[only] if only else None, table_filter=table_filter, table_name=table_name, ) # filter out skipped apps return [ baseline_table for baseline_table in baseline_tables if baseline_table.split(".")[1] not in [ f"{skipped_app}_stable" for skipped_app in ConfigLoader.get( "generate", "glean_usage", "skip_apps", fallback=[] ) ] ] output_dir = Path(output_dir) / target_project # per app specific datasets app_info = get_app_info() if app_name: app_info = {name: info for name, info in app_info.items() if name == app_name} app_info = [ info for name, info in app_info.items() if name not in ConfigLoader.get("generate", "glean_usage", "skip_apps", fallback=[]) ] id_token = get_id_token() # Prepare parameters so that generation of all Glean datasets can be done in parallel # Parameters to generate per-app_id datasets consist of the function to be called # and baseline tables generate_per_app_id = [ ( partial( table.generate_per_app_id, target_project, output_dir=output_dir, use_cloud_function=use_cloud_function, app_info=app_info, parallelism=parallelism, id_token=id_token, ), baseline_table, ) for table in GLEAN_TABLES for baseline_table in get_tables(table_name=table.base_table_name) ] # Parameters to generate per-app datasets consist of the function to be called # and app_info generate_per_app = [ ( partial( table.generate_per_app, target_project, output_dir=output_dir, use_cloud_function=use_cloud_function, parallelism=parallelism, id_token=id_token, ), info, ) for info in app_info for table in GLEAN_TABLES ] # Parameters to generate datasets that union all app datasets generate_across_apps = [ ( partial( table.generate_across_apps, target_project, output_dir=output_dir, use_cloud_function=use_cloud_function, parallelism=parallelism, ), app_info, ) for table in GLEAN_TABLES ] with ProcessingPool(parallelism) as pool: pool.map( lambda f: f[0](f[1]), generate_per_app_id + generate_per_app + generate_across_apps, )