in sql/moz-fx-data-shared-prod/telemetry_derived/clients_daily_scalar_aggregates_v1.sql.py [0:0]
def get_scalar_probes(scalar_type):
"""Find all scalar probes in main summary.
Note: that non-integer scalar probes are not included.
"""
project = "moz-fx-data-shared-prod"
main_summary_scalars = {}
main_summary_record_scalars = {}
main_summary_boolean_record_scalars = {}
main_summary_boolean_scalars = {}
process = subprocess.Popen(
[
"bq",
"show",
"--schema",
"--format=json",
f"{project}:telemetry_stable.main_v5",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = process.communicate()
if process.returncode > 0:
raise Exception(
f"Call to bq exited non-zero: {process.returncode}", stdout, stderr
)
main_summary_schema = json.loads(stdout)
scalars_fields = []
for field in main_summary_schema:
if field["name"] != "payload":
continue
for payload_field in field["fields"]:
if payload_field["name"] == "processes":
for processes_field in payload_field["fields"]:
if processes_field["name"] in ["parent", "content", "gpu"]:
process_field = processes_field["name"]
for type_field in processes_field["fields"]:
if type_field["name"] == scalar_type:
scalars_fields.append(
{"scalars": type_field, "process": process_field}
)
break
if len(scalars_fields) == 0:
return
for scalars_and_process in scalars_fields:
for scalar in scalars_and_process["scalars"].get("fields", {}):
scalars_dict = None
if "name" not in scalar:
continue
if scalar.get("type", "") == "INTEGER":
scalars_dict = main_summary_scalars
elif scalar.get("type", "") == "BOOLEAN":
scalars_dict = main_summary_boolean_scalars
elif scalar.get("type", "") == "RECORD":
if scalar["fields"][1]["type"] == "BOOLEAN":
scalars_dict = main_summary_boolean_record_scalars
else:
scalars_dict = main_summary_record_scalars
save_scalars_by_type(
scalars_dict, scalar["name"], scalars_and_process["process"]
)
# Find the intersection between relevant scalar probes
# and those that exist in main summary
with urllib.request.urlopen(PROBE_INFO_SERVICE) as url:
data = json.loads(url.read())
excluded_probes = probe_filters.get_etl_excluded_probes_quickfix("desktop")
scalar_probes = (
set(
[
snake_case(x.replace("scalar/", ""))
for x in data.keys()
if x.startswith("scalar/")
]
)
- excluded_probes
)
return {
"scalars": filter_scalars_dict(main_summary_scalars, scalar_probes),
"booleans": filter_scalars_dict(
main_summary_boolean_scalars, scalar_probes
),
"keyed": filter_scalars_dict(main_summary_record_scalars, scalar_probes),
"keyed_boolean": filter_scalars_dict(
main_summary_boolean_record_scalars, scalar_probes
),
}