def get_scalar_probes()

in sql/moz-fx-data-shared-prod/telemetry_derived/clients_daily_scalar_aggregates_v1.sql.py [0:0]


def get_scalar_probes(scalar_type):
    """Find all scalar probes in main summary.

    Note: that non-integer scalar probes are not included.
    """
    project = "moz-fx-data-shared-prod"
    main_summary_scalars = {}
    main_summary_record_scalars = {}
    main_summary_boolean_record_scalars = {}
    main_summary_boolean_scalars = {}

    process = subprocess.Popen(
        [
            "bq",
            "show",
            "--schema",
            "--format=json",
            f"{project}:telemetry_stable.main_v5",
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    stdout, stderr = process.communicate()
    if process.returncode > 0:
        raise Exception(
            f"Call to bq exited non-zero: {process.returncode}", stdout, stderr
        )
    main_summary_schema = json.loads(stdout)

    scalars_fields = []
    for field in main_summary_schema:
        if field["name"] != "payload":
            continue

        for payload_field in field["fields"]:
            if payload_field["name"] == "processes":
                for processes_field in payload_field["fields"]:
                    if processes_field["name"] in ["parent", "content", "gpu"]:
                        process_field = processes_field["name"]
                        for type_field in processes_field["fields"]:
                            if type_field["name"] == scalar_type:
                                scalars_fields.append(
                                    {"scalars": type_field, "process": process_field}
                                )
                                break

    if len(scalars_fields) == 0:
        return

    for scalars_and_process in scalars_fields:
        for scalar in scalars_and_process["scalars"].get("fields", {}):
            scalars_dict = None
            if "name" not in scalar:
                continue

            if scalar.get("type", "") == "INTEGER":
                scalars_dict = main_summary_scalars
            elif scalar.get("type", "") == "BOOLEAN":
                scalars_dict = main_summary_boolean_scalars
            elif scalar.get("type", "") == "RECORD":
                if scalar["fields"][1]["type"] == "BOOLEAN":
                    scalars_dict = main_summary_boolean_record_scalars
                else:
                    scalars_dict = main_summary_record_scalars

            save_scalars_by_type(
                scalars_dict, scalar["name"], scalars_and_process["process"]
            )

    # Find the intersection between relevant scalar probes
    # and those that exist in main summary
    with urllib.request.urlopen(PROBE_INFO_SERVICE) as url:
        data = json.loads(url.read())
        excluded_probes = probe_filters.get_etl_excluded_probes_quickfix("desktop")
        scalar_probes = (
            set(
                [
                    snake_case(x.replace("scalar/", ""))
                    for x in data.keys()
                    if x.startswith("scalar/")
                ]
            )
            - excluded_probes
        )

        return {
            "scalars": filter_scalars_dict(main_summary_scalars, scalar_probes),
            "booleans": filter_scalars_dict(
                main_summary_boolean_scalars, scalar_probes
            ),
            "keyed": filter_scalars_dict(main_summary_record_scalars, scalar_probes),
            "keyed_boolean": filter_scalars_dict(
                main_summary_boolean_record_scalars, scalar_probes
            ),
        }