sql/moz-fx-data-shared-prod/telemetry_derived/clients_daily_scalar_aggregates_v1.sql.py (261 lines of code) (raw):

#!/usr/bin/env python3 """clients_daily_scalar_aggregates query generator.""" import argparse import json import subprocess import sys import textwrap import urllib.request from pathlib import Path from time import sleep from bigquery_etl.format_sql.formatter import reformat from bigquery_etl.util import probe_filters from bigquery_etl.util.common import snake_case sys.path.append(str(Path(__file__).parent.parent.parent.resolve())) PROBE_INFO_SERVICE = ( "https://probeinfo.telemetry.mozilla.org/firefox/all/main/all_probes" ) p = argparse.ArgumentParser() p.add_argument( "--agg-type", type=str, help="One of scalars/keyed_scalars/keyed_booleans", required=True, ) p.add_argument( "--json-output", action="store_true", help="Output the result wrapped in json parseable as an XCOM", ) p.add_argument( "--wait-seconds", type=int, default=0, help="Add a delay before executing the script to allow time for the xcom sidecar to complete startup", ) def generate_sql( agg_type, aggregates, additional_queries, additional_partitions, select_clause, querying_table, json_output, ): """Create a SQL query for the clients_daily_scalar_aggregates dataset.""" query = textwrap.dedent( f"""-- Query generated by: sql/telemetry_derived/clients_daily_scalar_aggregates.sql.py --agg-type {agg_type} WITH valid_build_ids AS ( SELECT DISTINCT(build.build.id) AS build_id FROM `moz-fx-data-shared-prod.telemetry.buildhub2` ), filtered AS ( SELECT *, SPLIT(application.version, '.')[OFFSET(0)] AS app_version, DATE(submission_timestamp) as submission_date, normalized_os as os, application.build_id AS app_build_id, normalized_channel AS channel FROM `moz-fx-data-shared-prod.telemetry_stable.main_v5` INNER JOIN valid_build_ids ON (application.build_id = build_id) WHERE DATE(submission_timestamp) = @submission_date AND normalized_channel in ( "release", "beta", "nightly" ) AND client_id IS NOT NULL), {additional_queries} sampled_data AS ( SELECT * FROM {querying_table} WHERE channel IN ("nightly", "beta") OR (channel = "release" AND os != "Windows") OR (channel = "release" AND os = "Windows" AND MOD(sample_id, @sample_size) = 0) ), -- Using `min` for when `agg_type` is `count` returns null when all rows are null aggregated AS ( SELECT submission_date, sample_id, client_id, profile_group_id, os, app_version, app_build_id, channel, {aggregates} FROM sampled_data GROUP BY submission_date, sample_id, client_id, profile_group_id, os, app_version, app_build_id, channel {additional_partitions}) {select_clause} """ ) if json_output: return json.dumps(query) else: return query def _get_generic_keyed_scalar_sql(probes, value_type): probes_struct = [] for probe, processes in probes.items(): for process in processes: probes_struct.append( f"('{probe}', '{process}', payload.processes.{process}.keyed_scalars.{probe})" ) probes_struct.sort() probes_arr = ",\n\t\t\t".join(probes_struct) additional_queries = f""" grouped_metrics AS (SELECT sample_id, client_id, profile_group_id, submission_date, os, app_version, app_build_id, channel, ARRAY<STRUCT< name STRING, process STRING, value ARRAY<STRUCT<key STRING, value {value_type}>> >>[ {probes_arr} ] as metrics FROM filtered), flattened_metrics AS (SELECT sample_id, client_id, profile_group_id, submission_date, os, app_version, app_build_id, channel, metrics.name AS metric, metrics.process AS process, value.key AS key, value.value AS value FROM grouped_metrics CROSS JOIN UNNEST(metrics) AS metrics, UNNEST(metrics.value) AS value), """ querying_table = "flattened_metrics" additional_partitions = """, metric, process, key """ return { "additional_queries": additional_queries, "additional_partitions": additional_partitions, "querying_table": querying_table, } def get_keyed_boolean_probes_sql_string(probes): """Put together the subsets of SQL required to query keyed booleans.""" sql_strings = _get_generic_keyed_scalar_sql(probes, "BOOLEAN") sql_strings[ "probes_string" ] = """ metric, key, process, SUM(CASE WHEN value = True THEN 1 ELSE 0 END) AS true_col, SUM(CASE WHEN value = False THEN 1 ELSE 0 END) AS false_col """ sql_strings[ "select_clause" ] = """ SELECT sample_id, client_id, profile_group_id, submission_date, os, app_version, app_build_id, channel, ARRAY_CONCAT_AGG(ARRAY<STRUCT< metric STRING, metric_type STRING, key STRING, process STRING, agg_type STRING, value FLOAT64 >> [ (metric, 'keyed-scalar-boolean', key, process, 'true', true_col), (metric, 'keyed-scalar-boolean', key, process, 'false', false_col) ] ) AS scalar_aggregates FROM aggregated GROUP BY sample_id, client_id, profile_group_id, submission_date, os, app_version, app_build_id, channel """ return sql_strings def get_keyed_scalar_probes_sql_string(probes): """Put together the subsets of SQL required to query keyed scalars.""" sql_strings = _get_generic_keyed_scalar_sql(probes, "INT64") sql_strings[ "probes_string" ] = """ metric, process, key, MAX(value) AS max, MIN(value) AS min, AVG(value) AS avg, SUM(value) AS sum, IF(MIN(value) IS NULL, NULL, COUNT(*)) AS count """ sql_strings[ "select_clause" ] = """ SELECT sample_id, client_id, profile_group_id, submission_date, os, app_version, app_build_id, channel, ARRAY_CONCAT_AGG(ARRAY<STRUCT< metric STRING, metric_type STRING, key STRING, process STRING, agg_type STRING, value FLOAT64 >> [ (metric, 'keyed-scalar', key, process, 'max', max), (metric, 'keyed-scalar', key, process, 'min', min), (metric, 'keyed-scalar', key, process, 'avg', avg), (metric, 'keyed-scalar', key, process, 'sum', sum), (metric, 'keyed-scalar', key, process, 'count', count) ] ) AS scalar_aggregates FROM aggregated GROUP BY sample_id, client_id, profile_group_id, submission_date, os, app_version, app_build_id, channel """ return sql_strings def get_scalar_probes_sql_strings(probes, scalar_type): """Put together the subsets of SQL required to query scalars or booleans.""" if scalar_type == "keyed_scalars": return get_keyed_scalar_probes_sql_string(probes["keyed"]) if scalar_type == "keyed_booleans": return get_keyed_boolean_probes_sql_string(probes["keyed_boolean"]) probe_structs = [] for probe, processes in probes["scalars"].items(): for process in processes: probe_structs.append( ( f"('{probe}', 'scalar', '', '{process}', 'max', " f"max(CAST(payload.processes.{process}.scalars.{probe} AS INT64)))" ) ) probe_structs.append( ( f"('{probe}', 'scalar', '', '{process}', 'avg', " f"avg(CAST(payload.processes.{process}.scalars.{probe} AS INT64)))" ) ) probe_structs.append( ( f"('{probe}', 'scalar', '', '{process}', 'min', " f"min(CAST(payload.processes.{process}.scalars.{probe} AS INT64)))" ) ) probe_structs.append( ( f"('{probe}', 'scalar', '', '{process}', 'sum', " f"sum(CAST(payload.processes.{process}.scalars.{probe} AS INT64)))" ) ) probe_structs.append( f"('{probe}', 'scalar', '', '{process}', 'count', IF(MIN(payload.processes.{process}.scalars.{probe}) IS NULL, NULL, COUNT(*)))" ) for probe, processes in probes["booleans"].items(): for process in processes: probe_structs.append( ( f"('{probe}', 'boolean', '', '{process}', 'false', " f"SUM(case when payload.processes.{process}.scalars.{probe} = False " "THEN 1 ELSE 0 END))" ) ) probe_structs.append( ( f"('{probe}', 'boolean', '', '{process}', 'true', " f"SUM(case when payload.processes.{process}.scalars.{probe} = True " "THEN 1 ELSE 0 END))" ) ) probe_structs.sort() probes_arr = ",\n\t\t\t".join(probe_structs) probes_string = f""" ARRAY<STRUCT< metric STRING, metric_type STRING, key STRING, process STRING, agg_type STRING, value FLOAT64 >> [ {probes_arr} ] AS scalar_aggregates """ select_clause = """ SELECT * FROM aggregated """ return {"probes_string": probes_string, "select_clause": select_clause} def save_scalars_by_type(scalars_dict, scalar, process): """Add scalars to scalars_dict based on their type.""" if scalars_dict is None: return processes = scalars_dict.setdefault(scalar, set()) processes.add(process) scalars_dict[scalar] = processes def filter_scalars_dict(scalars_dict, required_probes): """Filter out scalars that are not required.""" return { scalar: process for scalar, process in scalars_dict.items() if scalar in required_probes } def get_scalar_probes(scalar_type): """Find all scalar probes in main summary. Note: that non-integer scalar probes are not included. """ project = "moz-fx-data-shared-prod" main_summary_scalars = {} main_summary_record_scalars = {} main_summary_boolean_record_scalars = {} main_summary_boolean_scalars = {} process = subprocess.Popen( [ "bq", "show", "--schema", "--format=json", f"{project}:telemetry_stable.main_v5", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate() if process.returncode > 0: raise Exception( f"Call to bq exited non-zero: {process.returncode}", stdout, stderr ) main_summary_schema = json.loads(stdout) scalars_fields = [] for field in main_summary_schema: if field["name"] != "payload": continue for payload_field in field["fields"]: if payload_field["name"] == "processes": for processes_field in payload_field["fields"]: if processes_field["name"] in ["parent", "content", "gpu"]: process_field = processes_field["name"] for type_field in processes_field["fields"]: if type_field["name"] == scalar_type: scalars_fields.append( {"scalars": type_field, "process": process_field} ) break if len(scalars_fields) == 0: return for scalars_and_process in scalars_fields: for scalar in scalars_and_process["scalars"].get("fields", {}): scalars_dict = None if "name" not in scalar: continue if scalar.get("type", "") == "INTEGER": scalars_dict = main_summary_scalars elif scalar.get("type", "") == "BOOLEAN": scalars_dict = main_summary_boolean_scalars elif scalar.get("type", "") == "RECORD": if scalar["fields"][1]["type"] == "BOOLEAN": scalars_dict = main_summary_boolean_record_scalars else: scalars_dict = main_summary_record_scalars save_scalars_by_type( scalars_dict, scalar["name"], scalars_and_process["process"] ) # Find the intersection between relevant scalar probes # and those that exist in main summary with urllib.request.urlopen(PROBE_INFO_SERVICE) as url: data = json.loads(url.read()) excluded_probes = probe_filters.get_etl_excluded_probes_quickfix("desktop") scalar_probes = ( set( [ snake_case(x.replace("scalar/", "")) for x in data.keys() if x.startswith("scalar/") ] ) - excluded_probes ) return { "scalars": filter_scalars_dict(main_summary_scalars, scalar_probes), "booleans": filter_scalars_dict( main_summary_boolean_scalars, scalar_probes ), "keyed": filter_scalars_dict(main_summary_record_scalars, scalar_probes), "keyed_boolean": filter_scalars_dict( main_summary_boolean_record_scalars, scalar_probes ), } def main(argv, out=print): """Print a clients_daily_scalar_aggregates query to stdout.""" opts = vars(p.parse_args(argv[1:])) sql_string = "" if opts["agg_type"] in ("scalars", "keyed_scalars", "keyed_booleans"): scalar_type = ( opts["agg_type"] if (opts["agg_type"] == "scalars") else "keyed_scalars" ) scalar_probes = get_scalar_probes(scalar_type) sql_string = get_scalar_probes_sql_strings(scalar_probes, opts["agg_type"]) else: raise ValueError( "agg-type must be one of scalars, keyed_scalars, keyed_booleans" ) sleep(opts["wait_seconds"]) out( reformat( generate_sql( opts["agg_type"], sql_string["probes_string"], sql_string.get("additional_queries", ""), sql_string.get("additional_partitions", ""), sql_string["select_clause"], sql_string.get("querying_table", "filtered"), opts["json_output"], ) ) ) if __name__ == "__main__": main(sys.argv)