bigquery_etl/glam/templates/clients_histogram_aggregates_v1.sql (102 lines of code) (raw):
{{ header }}
{% include "clients_histogram_aggregates_v1.udf.sql" %}
{% set aggregate_filter_clause %}
{% if filter_version %}
LEFT JOIN
glam_etl.{{ prefix }}__latest_versions_v1
USING (channel)
{% endif %}
WHERE
-- allow for builds to be slighly ahead of the current submission date, to
-- account for a reasonable amount of clock skew
{{ build_date_udf }}(app_build_id) < DATE_ADD(@submission_date, INTERVAL 3 day)
-- only keep builds from the last year
AND {{ build_date_udf }}(app_build_id) > DATE_SUB(@submission_date, INTERVAL 365 day)
{% if filter_version %}
AND app_version BETWEEN (latest_version - {{ num_versions_to_keep }} + 1) AND latest_version
{% endif %}
{% endset %}
WITH extracted_accumulated AS (
SELECT
*
FROM
glam_etl.{{ prefix }}__clients_histogram_aggregates_v1
{% if parameterize %}
WHERE
sample_id >= @min_sample_id
AND sample_id <= @max_sample_id
{% endif %}
),
filtered_accumulated AS (
SELECT
{{ attributes }},
histogram_aggregates
FROM
extracted_accumulated
{{ aggregate_filter_clause }}
),
-- unnest the daily data
extracted_daily AS (
SELECT
* EXCEPT (app_version, histogram_aggregates),
CAST(app_version AS INT64) AS app_version,
unnested_histogram_aggregates as histogram_aggregates
FROM
glam_etl.{{ prefix }}__view_clients_daily_histogram_aggregates_v1,
UNNEST(histogram_aggregates) unnested_histogram_aggregates
WHERE
{% if parameterize %}
submission_date = @submission_date
{% else %}
submission_date = DATE_SUB(current_date, interval 2 day)
{% endif %}
AND value IS NOT NULL
AND ARRAY_LENGTH(value) > 0
),
filtered_daily AS (
SELECT
{{ attributes }},
histogram_aggregates.*
FROM
extracted_daily
{{ aggregate_filter_clause }}
),
-- re-aggregate based on the latest version
aggregated_daily AS (
SELECT
{{ attributes }},
{{ metric_attributes }},
mozfun.map.sum(ARRAY_CONCAT_AGG(mozfun.glam.histogram_filter_high_values(value))) AS value
FROM
filtered_daily
GROUP BY
{{ attributes }},
{{ metric_attributes }}
),
-- note: this seems costly, if it's just going to be unnested again
transformed_daily AS (
SELECT
{{ attributes }},
ARRAY_AGG(
STRUCT<
metric STRING,
metric_type STRING,
key STRING,
agg_type STRING,
aggregates ARRAY<STRUCT<key STRING, value INT64>>
>({{ metric_attributes }}, value)
) AS histogram_aggregates
FROM
aggregated_daily
GROUP BY
{{ attributes }}
)
SELECT
{% for attribute in attributes_list %}
COALESCE(accumulated.{{ attribute }}, daily.{{ attribute }}) AS {{ attribute }},
{% endfor %}
udf_merged_user_data(
ARRAY_CONCAT(
COALESCE(accumulated.histogram_aggregates, []),
COALESCE(daily.histogram_aggregates, [])
)
) AS histogram_aggregates
FROM
filtered_accumulated AS accumulated
FULL OUTER JOIN
transformed_daily AS daily
USING ({{ attributes }})