def get_glean_aggregations_from_bq()

in glam/api/views.py [0:0]


def get_glean_aggregations_from_bq(bqClient, request, req_data):

    channel = req_data["channel"]
    product = req_data["product"]
    probe = req_data["probe"]
    num_versions = req_data["num_versions"]
    ping_type = req_data["ping_type"]
    os = req_data["os"]
    aggregation_level = req_data["aggregation_level"]

    table_id = f"glam_{product}_{channel}_aggregates"
    shas = {}
    if aggregation_level == "version":
        build_id_filter = 'AND build_id = "*"'
    else:
        if product == "fog":
            shas = _get_firefox_shas(channel, hourly=True)
        build_id_filter = 'AND build_id != "*"'
    # Build the SQL query with parameters
    query = f"""
        WITH versions AS (
            SELECT
                ARRAY_AGG(DISTINCT version
                ORDER BY
                version DESC
                LIMIT
                @num_versions) AS selected_versions
            FROM
                `{GLAM_BQ_PROD_PROJECT}.glam_etl.{table_id}`
            WHERE
                metric = @metric
            )
            SELECT
            * EXCEPT(selected_versions)
            FROM
                `{GLAM_BQ_PROD_PROJECT}.glam_etl.{table_id}`,
                versions
            WHERE
                metric = @metric
                AND ping_type = @ping_type
                AND os = @os
                {build_id_filter}
                AND version IN UNNEST(versions.selected_versions)
    """
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("metric", "STRING", probe),
            bigquery.ScalarQueryParameter("ping_type", "STRING", ping_type),
            bigquery.ScalarQueryParameter("os", "STRING", os),
            bigquery.ScalarQueryParameter("num_versions", "INT64", num_versions),
        ]
    )
    with bqClient as client:
        query_job = client.query(query, job_config=job_config)

    response = []

    for row in query_job:
        if aggregation_level != "version" and not row.build_date:
            continue
        else:
            # Remove extra +00
            build_date = (
                None
                if aggregation_level == "version"
                else datetime.fromisoformat(row.build_date[:-3]).replace(
                    tzinfo=timezone.utc
                )
            )
        data = {
            "version": row.version,
            "ping_type": row.ping_type,
            "os": row.os,
            "build_id": row.build_id,
            "revision": shas.get(row.build_id, ""),
            "build_date": build_date,
            "metric": row.metric,
            "metric_type": row.metric_type,
            "metric_key": row.metric_key,
            "client_agg_type": row.client_agg_type,
            "total_users": int(
                row.total_users
            ),  # Casting, otherwise this BIGNUMERIC column is read as a string
            "sample_count": (
                # Casting, otherwise this BIGNUMERIC column is read as a string.
                # Fallback to zero as temp workaround for missing total_sample
                # on labeled_distributions.
                int(row.total_sample)
                if row.total_sample
                else 0
            ),
            "histogram": row.histogram and orjson.loads(row.histogram) or "",
            "non_norm_histogram": row.non_norm_histogram
            and orjson.loads(row.non_norm_histogram)
            or "",
            "percentiles": row.percentiles and orjson.loads(row.percentiles) or "",
            "non_norm_percentiles": row.non_norm_percentiles
            and orjson.loads(row.non_norm_percentiles)
            or "",
        }

        # Get the total distinct client IDs for this set of dimensions.
        # data["total_addressable_market"] = counts.get(f"{row.version}-{row.build_id}")

        response.append(data)
    if response:
        _log_probe_query(request)
    return response