bigquery_etl/glam/templates/clients_scalar_aggregates_v1.sql (111 lines of code) (raw):
{{ header }}
{% include "clients_scalar_aggregates_v1.udf.sql" %}
{% set aggregate_filter_clause %}
{% if filter_version %}
LEFT JOIN
glam_etl.{{ prefix }}__latest_versions_v1
USING (channel)
{% endif %}
WHERE
-- allow for builds to be slighly ahead of the current submission date, to
-- account for a reasonable amount of clock skew
{{ build_date_udf }}(app_build_id) < DATE_ADD(@submission_date, INTERVAL 3 day)
-- only keep builds from the last year
AND {{ build_date_udf }}(app_build_id) > DATE_SUB(@submission_date, INTERVAL 365 day)
{% if filter_version %}
AND app_version BETWEEN (latest_version - {{ num_versions_to_keep }} + 1) AND latest_version
{% endif %}
{% endset %}
WITH filtered_date_channel AS (
SELECT
*
FROM
{{ source_table }}
WHERE
submission_date = @submission_date
),
filtered_aggregates AS (
SELECT
submission_date,
{{ attributes }},
{{ user_data_attributes }},
agg_type,
value
FROM
filtered_date_channel
CROSS JOIN
UNNEST(scalar_aggregates)
WHERE
value IS NOT NULL
),
version_filtered_new AS (
SELECT
submission_date,
{% for attribute in attributes_list %}
scalar_aggs.{{ attribute }},
{% endfor %}
{{ user_data_attributes }},
agg_type,
value
FROM
filtered_aggregates AS scalar_aggs
{{ aggregate_filter_clause }}
),
scalar_aggregates_new AS (
SELECT
{{ attributes }},
{{ user_data_attributes }},
agg_type,
--format:off
CASE agg_type
WHEN 'max' THEN max(value)
WHEN 'min' THEN min(value)
WHEN 'count' THEN sum(value)
WHEN 'sum' THEN sum(value)
WHEN 'false' THEN sum(value)
WHEN 'true' THEN sum(value)
END AS value
--format:on
FROM
version_filtered_new
WHERE
-- avoid overflows from very large numbers that are typically anomalies
-- Negative values are incorrect and should not happen but were observed,
-- probably due to some bit flips.
value BETWEEN 0 AND POW(2, 40)
GROUP BY
{{ attributes }},
{{ user_data_attributes }},
agg_type
),
filtered_new AS (
SELECT
{{ attributes }},
ARRAY_AGG(({{ user_data_attributes }}, agg_type, value)) AS scalar_aggregates
FROM
scalar_aggregates_new
GROUP BY
{{ attributes }}
),
filtered_old AS (
SELECT
{% for attribute in attributes_list %}
scalar_aggs.{{ attribute }},
{% endfor %}
scalar_aggregates
FROM
{{ destination_table }} AS scalar_aggs
{{ aggregate_filter_clause }}
),
joined_new_old AS (
SELECT
{% for attribute in attributes_list %}
COALESCE(old_data.{{attribute}}, new_data.{{attribute}}) as {{attribute}},
{% endfor %}
COALESCE(old_data.scalar_aggregates, []) AS old_aggs,
COALESCE(new_data.scalar_aggregates, []) AS new_aggs
FROM
filtered_new AS new_data
FULL OUTER JOIN
filtered_old AS old_data
USING ({{ attributes }})
)
SELECT
{{ attributes }},
udf_merged_user_data(
ARRAY_CONCAT(old_aggs, new_aggs)
) AS scalar_aggregates
FROM
joined_new_old