in glam/api/views.py [0:0]
def get_glean_aggregations_from_bq(bqClient, request, req_data):
channel = req_data["channel"]
product = req_data["product"]
probe = req_data["probe"]
num_versions = req_data["num_versions"]
ping_type = req_data["ping_type"]
os = req_data["os"]
aggregation_level = req_data["aggregation_level"]
table_id = f"glam_{product}_{channel}_aggregates"
shas = {}
if aggregation_level == "version":
build_id_filter = 'AND build_id = "*"'
else:
if product == "fog":
shas = _get_firefox_shas(channel, hourly=True)
build_id_filter = 'AND build_id != "*"'
# Build the SQL query with parameters
query = f"""
WITH versions AS (
SELECT
ARRAY_AGG(DISTINCT version
ORDER BY
version DESC
LIMIT
@num_versions) AS selected_versions
FROM
`{GLAM_BQ_PROD_PROJECT}.glam_etl.{table_id}`
WHERE
metric = @metric
)
SELECT
* EXCEPT(selected_versions)
FROM
`{GLAM_BQ_PROD_PROJECT}.glam_etl.{table_id}`,
versions
WHERE
metric = @metric
AND ping_type = @ping_type
AND os = @os
{build_id_filter}
AND version IN UNNEST(versions.selected_versions)
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("metric", "STRING", probe),
bigquery.ScalarQueryParameter("ping_type", "STRING", ping_type),
bigquery.ScalarQueryParameter("os", "STRING", os),
bigquery.ScalarQueryParameter("num_versions", "INT64", num_versions),
]
)
with bqClient as client:
query_job = client.query(query, job_config=job_config)
response = []
for row in query_job:
if aggregation_level != "version" and not row.build_date:
continue
else:
# Remove extra +00
build_date = (
None
if aggregation_level == "version"
else datetime.fromisoformat(row.build_date[:-3]).replace(
tzinfo=timezone.utc
)
)
data = {
"version": row.version,
"ping_type": row.ping_type,
"os": row.os,
"build_id": row.build_id,
"revision": shas.get(row.build_id, ""),
"build_date": build_date,
"metric": row.metric,
"metric_type": row.metric_type,
"metric_key": row.metric_key,
"client_agg_type": row.client_agg_type,
"total_users": int(
row.total_users
), # Casting, otherwise this BIGNUMERIC column is read as a string
"sample_count": (
# Casting, otherwise this BIGNUMERIC column is read as a string.
# Fallback to zero as temp workaround for missing total_sample
# on labeled_distributions.
int(row.total_sample)
if row.total_sample
else 0
),
"histogram": row.histogram and orjson.loads(row.histogram) or "",
"non_norm_histogram": row.non_norm_histogram
and orjson.loads(row.non_norm_histogram)
or "",
"percentiles": row.percentiles and orjson.loads(row.percentiles) or "",
"non_norm_percentiles": row.non_norm_percentiles
and orjson.loads(row.non_norm_percentiles)
or "",
}
# Get the total distinct client IDs for this set of dimensions.
# data["total_addressable_market"] = counts.get(f"{row.version}-{row.build_id}")
response.append(data)
if response:
_log_probe_query(request)
return response