bigquery_etl/glam/clients_daily_scalar_aggregates.py (132 lines of code) (raw):

#!/usr/bin/env python3 """clients_daily_scalar_aggregates query generator.""" import argparse import sys from typing import Dict, List from jinja2 import Environment, PackageLoader from bigquery_etl.format_sql.formatter import reformat from bigquery_etl.util.probe_filters import get_etl_excluded_probes_quickfix from .utils import get_schema, ping_type_from_table ATTRIBUTES = ",".join( [ "client_id", "ping_type", "submission_date", "os", "app_version", "app_build_id", "channel", ] ) def render_main(**kwargs): """Create a SQL query for the clients_daily_scalar_aggregates dataset.""" env = Environment(loader=PackageLoader("bigquery_etl", "glam/templates")) main_sql = env.get_template("clients_daily_scalar_aggregates_v1.sql") return reformat(main_sql.render(**kwargs)) def get_labeled_metrics_sql(probes: Dict[str, List[str]]) -> str: """Get the SQL for labeled scalar metrics.""" probes_struct = [] for metric_type, _probes in probes.items(): for probe in _probes: probes_struct.append( f"('{probe}', '{metric_type}', metrics.{metric_type}.{probe})" ) probes_struct.sort() probes_arr = ",\n".join(probes_struct) return probes_arr def get_unlabeled_metrics_sql(probes: Dict[str, List[str]]) -> str: """Put together the subsets of SQL required to query scalars or booleans.""" probe_structs = [] for probe in probes.pop("boolean", []): probe_structs.append( ( f"('{probe}', 'boolean', '', 'false', " f"SUM(CAST(NOT metrics.boolean.{probe} AS INT64)))" ) ) probe_structs.append( ( f"('{probe}', 'boolean', '', 'true', " f"SUM(CAST(metrics.boolean.{probe} AS INT64)))" ) ) for metric_type, _probes in probes.items(): # timespans are nested within an object that also carries the unit of # of time associated with the value suffix = ".value" if metric_type == "timespan" else "" for probe in _probes: for agg_func in ["max", "avg", "min", "sum"]: probe_structs.append( ( f"('{probe}', '{metric_type}', '', '{agg_func}', " f"{agg_func}(CAST(metrics.{metric_type}.{probe}{suffix} AS NUMERIC)))" ) ) probe_structs.append( f"('{probe}', '{metric_type}', '', 'count', " f"IF(MIN(metrics.{metric_type}.{probe}{suffix}) IS NULL, NULL, COUNT(*)))" ) probe_structs.sort() probes_arr = ",\n".join(probe_structs) return probes_arr def get_scalar_metrics(schema: Dict, scalar_type: str) -> Dict[str, List[str]]: """Find all scalar probes in a Glean table. Metric types are defined in the Glean documentation found here: https://mozilla.github.io/glean/book/user/metrics/index.html """ assert scalar_type in ("unlabeled", "labeled") metric_type_set = { "unlabeled": ["boolean", "counter", "quantity", "timespan"], "labeled": ["labeled_counter"], } scalars: Dict[str, List[str]] = { metric_type: [] for metric_type in metric_type_set[scalar_type] } excluded_metrics = get_etl_excluded_probes_quickfix("fenix") # Iterate over every element in the schema under the metrics section and # collect a list of metric names. for root_field in schema: if root_field["name"] != "metrics": continue for metric_field in root_field["fields"]: metric_type = metric_field["name"] if metric_type not in metric_type_set[scalar_type]: continue for field in metric_field["fields"]: if field["name"] not in excluded_metrics: scalars[metric_type].append(field["name"]) return scalars def main(): """Print a clients_daily_scalar_aggregates query to stdout.""" parser = argparse.ArgumentParser() parser.add_argument( "--no-parameterize", action="store_true", help="Generate a query without parameters", ) parser.add_argument("--source-table", type=str, help="Name of Glean table") parser.add_argument( "--product", type=str, default="org_mozilla_fenix", ) args = parser.parse_args() # If set to 1 day, then runs of copy_deduplicate may not be done yet submission_date = ( "date_sub(current_date, interval 2 day)" if args.no_parameterize else "@submission_date" ) header = ( "-- Query generated by: python3 -m " "bigquery_etl.glam.clients_daily_scalar_aggregates " f"--source-table {args.source_table}" + (" --no-parameterize" if args.no_parameterize else "") ) schema = get_schema(args.source_table) unlabeled_metric_names = get_scalar_metrics(schema, "unlabeled") labeled_metric_names = get_scalar_metrics(schema, "labeled") unlabeled_metrics = get_unlabeled_metrics_sql(unlabeled_metric_names).strip() labeled_metrics = get_labeled_metrics_sql(labeled_metric_names).strip() if not unlabeled_metrics and not labeled_metrics: print(header) print("-- Empty query: no probes found!") sys.exit(1) print( render_main( header=header, source_table=args.source_table, submission_date=submission_date, attributes=ATTRIBUTES, unlabeled_metrics=unlabeled_metrics, labeled_metrics=labeled_metrics, ping_type=ping_type_from_table(args.source_table), ) ) if __name__ == "__main__": main()