backfill/2022-08-15-clients-first-seen/backfill.py (57 lines of code) (raw):

# Based on https://github.com/mozilla/bigquery-etl/blob/main/sql/moz-fx-data-shared-prod/telemetry_derived/clients_first_seen_v1/init.sql from argparse import ArgumentParser from multiprocessing.pool import ThreadPool from functools import partial from google.cloud import bigquery CREATE_TABLE_QUERY = """ CREATE OR REPLACE TABLE {dataset}.{table} PARTITION BY first_seen_date CLUSTER BY normalized_channel, sample_id AS SELECT CAST(NULL AS DATE) AS first_seen_date, CAST(NULL AS DATE) AS second_seen_date, cd.* EXCEPT (submission_date) FROM `moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6` AS cd WHERE FALSE; """ PARTITON_QUERY = """ CREATE OR REPLACE TABLE `moz-fx-data-shared-prod.tmp.clients_first_seen_dates_{sample_id}` PARTITION BY first_seen_date AS WITH base AS ( SELECT client_id, ARRAY_AGG(submission_date ORDER BY submission_date) AS dates_seen, FROM `moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6` WHERE submission_date >= DATE('2010-01-01') AND sample_id = {sample_id} GROUP BY client_id ) SELECT client_id, IF(ARRAY_LENGTH(dates_seen) > 0, dates_seen[OFFSET(0)], NULL) AS first_seen_date, IF(ARRAY_LENGTH(dates_seen) > 1, dates_seen[OFFSET(1)], NULL) AS second_seen_date, FROM base; INSERT {dataset}.{table} SELECT cfsd.first_seen_date, cfsd.second_seen_date, cd.* EXCEPT (submission_date) FROM telemetry_derived.clients_daily_v6 AS cd LEFT JOIN tmp.clients_first_seen_dates_{sample_id} AS cfsd ON (cd.submission_date = cfsd.first_seen_date AND cd.client_id = cfsd.client_id) WHERE cfsd.client_id IS NOT NULL AND cd.submission_date >= DATE('2010-01-01') AND sample_id = {sample_id}; """ parser = ArgumentParser() parser.add_argument( "--project_id", "--project-id", default="moz-fx-data-shared-prod", help="ID of the project in which to find tables", ) parser.add_argument( "--dataset", default="analysis", help="Dataset name to create clients_first_seen_table in", ) parser.add_argument( "--table", default="ascholtz_clients_first_seen_v1", help="Name of the destination table to be created", ) parser.add_argument( "--parallelism", "-p", default=10, help="Number of threads run in parallel", ) parser.add_argument( "--dry_run", default=False, help="Issue a dry run" ) def _create_temp_table(client, job_config, dataset, table, sample_id): """Create temporary table for sample_id.""" print(f"Create temporary table for sample_id={sample_id}") print(PARTITON_QUERY.format(sample_id=sample_id, dataset=dataset, table=table)) client.query( PARTITON_QUERY.format(sample_id=sample_id, dataset=dataset, table=table), job_config=job_config ).result() def main(): """Backfill clients_first_seen_v1 in parallel.""" args = parser.parse_args() client = bigquery.Client(args.project_id) if args.dry_run: print("Do a dry run") job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) else: job_config = bigquery.QueryJobConfig(dry_run=False, use_query_cache=False) # create the destination table client.query( CREATE_TABLE_QUERY.format(dataset=args.dataset, table=args.table), job_config=job_config ).result() with ThreadPool(args.parallelism) as pool: # create a temporary table for each sample_id pool.map(partial(_create_temp_table, client, job_config, args.dataset, args.table), list(range(0, 2))) if __name__ == "__main__": main()