in jobs/update_orphaning_dashboard_etl.py [0:0]
def longitudinal_shim_aggregate(date_from, date_to, destination_project, destination_dataset, destination_table):
"""Aggregate selected metrics to destination table."""
bq = bigquery.Client()
longitudinal_shim_sql = """
-- This function uses mozfun.hist.extract to tolerate compact string encodings
-- and then turns the parsed struct back into a JSON string to maintain compatibility
-- with the existing python-level logic that expects JSON blobs.
-- Note that we only include "values" in the JSON output since that's the only
-- histogram field used in the python code.
CREATE TEMP FUNCTION hist_to_json(h STRING) AS (
IF
(h IS NULL,
NULL,
FORMAT('{"values":{%s}}', ARRAY_TO_STRING(ARRAY(
SELECT
FORMAT('"%d":%d', key, value)
FROM
UNNEST(mozfun.hist.extract(h).`values`)), ','))) );
WITH
main_sample_1pct AS (
SELECT
submission_timestamp,
client_id,
struct(cast(environment.build.version as STRING) as version, mozfun.norm.truncate_version(environment.build.version, 'major') as major_version, environment.build.application_name as application_name) as build,
struct(struct(environment.settings.update.channel as channel, environment.settings.update.enabled as enabled) as update) as settings,
payload.info.session_length,
payload.info.profile_subsession_counter,
environment.settings.update.enabled,
payload.info.subsession_start_date,
payload.info.subsession_length,
hist_to_json(payload.histograms.update_check_code_notify) AS update_check_code_notify,
ARRAY(SELECT AS STRUCT key, hist_to_json(value) AS value FROM UNNEST(payload.keyed_histograms.update_check_extended_error_notify)) AS update_check_extended_error_notify,
hist_to_json(payload.histograms.update_check_no_update_notify) AS update_check_no_update_notify,
hist_to_json(payload.histograms.update_not_pref_update_auto_notify) AS update_not_pref_update_auto_notify,
hist_to_json(payload.histograms.update_ping_count_notify) AS update_ping_count_notify,
hist_to_json(payload.histograms.update_unable_to_apply_notify) AS update_unable_to_apply_notify,
hist_to_json(payload.histograms.update_download_code_partial) AS update_download_code_partial,
hist_to_json(payload.histograms.update_download_code_complete) AS update_download_code_complete,
hist_to_json(payload.histograms.update_state_code_partial_stage) AS update_state_code_partial_stage,
hist_to_json(payload.histograms.update_state_code_complete_stage) AS update_state_code_complete_stage,
hist_to_json(payload.histograms.update_state_code_unknown_stage) AS update_state_code_unknown_stage,
hist_to_json(payload.histograms.update_state_code_partial_startup) AS update_state_code_partial_startup,
hist_to_json(payload.histograms.update_state_code_complete_startup) AS update_state_code_complete_startup,
hist_to_json(payload.histograms.update_state_code_unknown_startup) AS update_state_code_unknown_startup,
hist_to_json(payload.histograms.update_status_error_code_complete_startup) AS update_status_error_code_complete_startup,
hist_to_json(payload.histograms.update_status_error_code_partial_startup) AS update_status_error_code_partial_startup,
hist_to_json(payload.histograms.update_status_error_code_unknown_startup) AS update_status_error_code_unknown_startup,
hist_to_json(payload.histograms.update_status_error_code_complete_stage) AS update_status_error_code_complete_stage,
hist_to_json(payload.histograms.update_status_error_code_partial_stage) AS update_status_error_code_partial_stage,
hist_to_json(payload.histograms.update_status_error_code_unknown_stage) AS update_status_error_code_unknown_stage,
FROM
`moz-fx-data-shared-prod.telemetry.main`
WHERE
sample_id = 42
AND environment.build.version IS NOT NULL )
SELECT
client_id,
struct(array_agg(build.version order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as version, array_agg(build.major_version order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as major_version, array_agg(build.application_name order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as application_name) as build,
struct(struct(array_agg(struct(settings.update.channel) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as channel, array_agg(struct(settings.update.enabled) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as enabled) as update) as settings,
array_agg(session_length ignore nulls order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as session_length,
array_agg(enabled ignore nulls order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as enabled,
array_agg(subsession_start_date ignore nulls order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as subsession_start_date,
array_agg(subsession_length ignore nulls order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as subsession_length,
array_agg(struct(update_check_code_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_check_code_notify,
array_agg(struct(update_check_extended_error_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_check_extended_error_notify,
array_agg(struct(update_check_no_update_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_check_no_update_notify,
array_agg(struct(update_not_pref_update_auto_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_not_pref_update_auto_notify,
array_agg(struct(update_ping_count_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_ping_count_notify,
array_agg(struct(update_unable_to_apply_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_unable_to_apply_notify,
array_agg(struct(update_download_code_partial) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_download_code_partial,
array_agg(struct(update_download_code_complete) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_download_code_complete,
array_agg(struct(update_state_code_partial_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_partial_stage,
array_agg(struct(update_state_code_complete_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_complete_stage,
array_agg(struct(update_state_code_unknown_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_unknown_stage,
array_agg(struct(update_state_code_partial_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_partial_startup,
array_agg(struct(update_state_code_complete_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_complete_startup,
array_agg(struct(update_state_code_unknown_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_unknown_startup,
array_agg(struct(update_status_error_code_complete_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_complete_startup,
array_agg(struct(update_status_error_code_partial_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_partial_startup,
array_agg(struct(update_status_error_code_unknown_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_unknown_startup,
array_agg(struct(update_status_error_code_complete_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_complete_stage,
array_agg(struct(update_status_error_code_partial_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_partial_stage,
array_agg(struct(update_status_error_code_unknown_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_unknown_stage
FROM
main_sample_1pct
WHERE
DATE(submission_timestamp) >= @date_from
AND DATE(submission_timestamp) <= @date_to
GROUP BY
client_id
"""
dataset_id = destination_dataset
table_ref = bq.dataset(dataset_id, project=destination_project).table(destination_table)
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("date_from", "DATE", date_from),
bigquery.ScalarQueryParameter("date_to", "DATE", date_to),
]
)
job_config.destination = table_ref
job_config.write_disposition = 'WRITE_TRUNCATE'
print("Query is: " + longitudinal_shim_sql)
print("Starting BigQuery aggregation to destination table " + str(table_ref) + "...")
query_job = bq.query(longitudinal_shim_sql, job_config=job_config)
# Wait for query execution
query_job.result()
print("BigQuery aggregation finished")