def longitudinal_shim_aggregate()

in jobs/update_orphaning_dashboard_etl.py [0:0]


def longitudinal_shim_aggregate(date_from, date_to, destination_project, destination_dataset, destination_table):
    """Aggregate selected metrics to destination table."""

    bq = bigquery.Client()

    longitudinal_shim_sql = """
    -- This function uses mozfun.hist.extract to tolerate compact string encodings
    -- and then turns the parsed struct back into a JSON string to maintain compatibility
    -- with the existing python-level logic that expects JSON blobs.
    -- Note that we only include "values" in the JSON output since that's the only
    -- histogram field used in the python code.
    CREATE TEMP FUNCTION hist_to_json(h STRING) AS (
    IF
      (h IS NULL,
        NULL,
        FORMAT('{"values":{%s}}', ARRAY_TO_STRING(ARRAY(
            SELECT
              FORMAT('"%d":%d', key, value)
            FROM
              UNNEST(mozfun.hist.extract(h).`values`)), ','))) );

    WITH
        main_sample_1pct AS (
        SELECT
            submission_timestamp,
            client_id,
            struct(cast(environment.build.version as STRING) as version, mozfun.norm.truncate_version(environment.build.version, 'major') as major_version, environment.build.application_name as application_name) as build,
            struct(struct(environment.settings.update.channel as channel, environment.settings.update.enabled as enabled) as update) as settings,
            payload.info.session_length,
            payload.info.profile_subsession_counter,
            environment.settings.update.enabled,
            payload.info.subsession_start_date,
            payload.info.subsession_length,
            hist_to_json(payload.histograms.update_check_code_notify) AS update_check_code_notify,
            ARRAY(SELECT AS STRUCT key, hist_to_json(value) AS value FROM UNNEST(payload.keyed_histograms.update_check_extended_error_notify)) AS update_check_extended_error_notify,
            hist_to_json(payload.histograms.update_check_no_update_notify) AS update_check_no_update_notify,
            hist_to_json(payload.histograms.update_not_pref_update_auto_notify) AS update_not_pref_update_auto_notify,
            hist_to_json(payload.histograms.update_ping_count_notify) AS update_ping_count_notify,
            hist_to_json(payload.histograms.update_unable_to_apply_notify) AS update_unable_to_apply_notify,
            hist_to_json(payload.histograms.update_download_code_partial) AS update_download_code_partial,
            hist_to_json(payload.histograms.update_download_code_complete) AS update_download_code_complete,
            hist_to_json(payload.histograms.update_state_code_partial_stage) AS update_state_code_partial_stage,
            hist_to_json(payload.histograms.update_state_code_complete_stage) AS update_state_code_complete_stage,
            hist_to_json(payload.histograms.update_state_code_unknown_stage) AS update_state_code_unknown_stage,
            hist_to_json(payload.histograms.update_state_code_partial_startup) AS update_state_code_partial_startup,
            hist_to_json(payload.histograms.update_state_code_complete_startup) AS update_state_code_complete_startup,
            hist_to_json(payload.histograms.update_state_code_unknown_startup) AS update_state_code_unknown_startup,
            hist_to_json(payload.histograms.update_status_error_code_complete_startup) AS update_status_error_code_complete_startup,
            hist_to_json(payload.histograms.update_status_error_code_partial_startup) AS update_status_error_code_partial_startup,
            hist_to_json(payload.histograms.update_status_error_code_unknown_startup) AS update_status_error_code_unknown_startup,
            hist_to_json(payload.histograms.update_status_error_code_complete_stage) AS update_status_error_code_complete_stage,
            hist_to_json(payload.histograms.update_status_error_code_partial_stage) AS update_status_error_code_partial_stage,
            hist_to_json(payload.histograms.update_status_error_code_unknown_stage) AS update_status_error_code_unknown_stage,
        FROM
            `moz-fx-data-shared-prod.telemetry.main`
        WHERE
            sample_id = 42
            AND environment.build.version IS NOT NULL )
    SELECT
        client_id,
        struct(array_agg(build.version order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as version, array_agg(build.major_version order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as major_version, array_agg(build.application_name order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as application_name) as build,
        struct(struct(array_agg(struct(settings.update.channel) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as channel, array_agg(struct(settings.update.enabled) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as enabled) as update) as settings,
        array_agg(session_length ignore nulls order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as session_length,
        array_agg(enabled ignore nulls order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as enabled,
        array_agg(subsession_start_date ignore nulls order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as subsession_start_date,
        array_agg(subsession_length ignore nulls order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as subsession_length,
        array_agg(struct(update_check_code_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_check_code_notify,
        array_agg(struct(update_check_extended_error_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_check_extended_error_notify,
        array_agg(struct(update_check_no_update_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_check_no_update_notify,
        array_agg(struct(update_not_pref_update_auto_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_not_pref_update_auto_notify,
        array_agg(struct(update_ping_count_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_ping_count_notify,
        array_agg(struct(update_unable_to_apply_notify) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_unable_to_apply_notify,
        array_agg(struct(update_download_code_partial) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_download_code_partial,
        array_agg(struct(update_download_code_complete) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_download_code_complete,
        array_agg(struct(update_state_code_partial_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_partial_stage,
        array_agg(struct(update_state_code_complete_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_complete_stage,
        array_agg(struct(update_state_code_unknown_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_unknown_stage,
        array_agg(struct(update_state_code_partial_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_partial_startup,
        array_agg(struct(update_state_code_complete_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_complete_startup,
        array_agg(struct(update_state_code_unknown_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_state_code_unknown_startup,
        array_agg(struct(update_status_error_code_complete_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_complete_startup,
        array_agg(struct(update_status_error_code_partial_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_partial_startup,
        array_agg(struct(update_status_error_code_unknown_startup) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_unknown_startup,
        array_agg(struct(update_status_error_code_complete_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_complete_stage,
        array_agg(struct(update_status_error_code_partial_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_partial_stage,
        array_agg(struct(update_status_error_code_unknown_stage) order by subsession_start_date desc, profile_subsession_counter desc limit 1000) as update_status_error_code_unknown_stage
    FROM
        main_sample_1pct
    WHERE
        DATE(submission_timestamp) >= @date_from
        AND DATE(submission_timestamp) <= @date_to
    GROUP BY
        client_id
    """

    dataset_id = destination_dataset
    table_ref = bq.dataset(dataset_id, project=destination_project).table(destination_table)
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("date_from", "DATE", date_from),
            bigquery.ScalarQueryParameter("date_to", "DATE", date_to),
        ]
    )
    job_config.destination = table_ref
    job_config.write_disposition = 'WRITE_TRUNCATE'

    print("Query is: " + longitudinal_shim_sql)
    print("Starting BigQuery aggregation to destination table " + str(table_ref) + "...")
    query_job = bq.query(longitudinal_shim_sql, job_config=job_config)
    # Wait for query execution
    query_job.result()
    print("BigQuery aggregation finished")