jobs/webcompat-kb/webcompat_kb/crux.py (150 lines of code) (raw):

import argparse import logging import sys from dataclasses import dataclass from datetime import datetime, UTC from typing import Self from google.cloud import bigquery from .base import EtlJob, VALID_DATASET_ID from .bqhelpers import BigQuery, RangePartition @dataclass class Config: bq_project: str bq_crux_dataset: str write: bool @classmethod def from_args(cls, args: argparse.Namespace) -> Self: return cls( bq_project=args.bq_project_id, bq_crux_dataset=args.bq_crux_dataset, write=args.write, ) def get_latest_crux_dataset(client: BigQuery) -> int: query = r"""SELECT cast(tables.table_name as int) AS crux_date FROM `chrome-ux-report.all.INFORMATION_SCHEMA.TABLES` AS tables WHERE tables.table_schema = "all" AND REGEXP_CONTAINS(tables.table_name, r"20\d\d\d\d") ORDER BY crux_date DESC LIMIT 1 """ result = list(client.query(query)) if len(result) != 1: raise ValueError("Failed to get latest CrUX import") return result[0]["crux_date"] def get_imported_datasets(client: BigQuery, config: Config) -> int: query = f""" SELECT yyyymm FROM `{config.bq_crux_dataset}.import_runs` ORDER BY yyyymm DESC LIMIT 1 """ try: result = list(client.query(query)) except Exception: return 0 if not result: return 0 return result[0]["yyyymm"] def update_crux_data(client: BigQuery, config: Config, date: int) -> None: query = f""" SELECT yyyymm, origin, country_code, experimental.popularity.rank as rank from `chrome-ux-report.experimental.country` WHERE yyyymm = {date} UNION ALL SELECT yyyymm, origin, "global" as country_code, experimental.popularity.rank as rank from `chrome-ux-report.experimental.global` WHERE yyyymm = {date} """ schema = [ bigquery.SchemaField("yyyymm", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("origin", "STRING", mode="REQUIRED"), bigquery.SchemaField("country_code", "STRING", mode="REQUIRED", max_length=8), bigquery.SchemaField("rank", "INTEGER", mode="REQUIRED"), ] if config.write: query = f"INSERT `{config.bq_project}.{config.bq_crux_dataset}.origin_ranks` (yyyymm, origin, country_code, rank)\n({query})" logging.info("Updating CrUX data") client.ensure_table( "origin_ranks", schema, partition=RangePartition("yyyymm", 201701, 202501) ) client.query(query) def update_sightline_data(client: BigQuery, config: Config, date: int) -> None: if config.write: insert_str = f"INSERT `{config.bq_project}.{config.bq_crux_dataset}.sightline_top_1000` (yyyymm, host)" else: insert_str = "" query = f""" {insert_str} SELECT DISTINCT yyyymm, NET.HOST(origin) AS host, FROM `{config.bq_crux_dataset}.origin_ranks` AS crux_ranks JOIN ( SELECT country_code FROM UNNEST(JSON_VALUE_ARRAY('["global", "us", "fr", "de", "es", "it", "mx"]')) AS country_code) AS countries ON crux_ranks.country_code = countries.country_code WHERE crux_ranks.rank = 1000 AND crux_ranks.yyyymm = {date}""" schema = [ bigquery.SchemaField("yyyymm", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("host", "STRING"), ] logging.info("Updating sightline data") client.ensure_table( "sightline_top_1000", schema, partition=RangePartition("yyyymm", 201701, 202501) ) client.query(query) def update_min_rank_data(client: BigQuery, config: Config, date: int) -> None: if config.write: insert_str = f"INSERT `{config.bq_project}.{config.bq_crux_dataset}.host_min_ranks` (yyyymm, host, global_rank, local_rank, sightline_rank)" else: insert_str = "" query = f""" {insert_str} SELECT {date} as yyyymm, NET.HOST(origin) AS host, MIN( IF (origin_ranks.country_code = "global", origin_ranks.rank, NULL)) AS global_rank, MIN( IF (origin_ranks.country_code != "global", origin_ranks.rank, NULL)) AS local_rank, MIN( IF (country_code IS NOT NULL, origin_ranks.rank, NULL)) as sightline_rank FROM `{config.bq_crux_dataset}.origin_ranks` AS origin_ranks LEFT JOIN UNNEST(JSON_VALUE_ARRAY('["global", "us", "fr", "de", "es", "it", "mx"]')) as country_code ON origin_ranks.country_code = country_code WHERE origin_ranks.yyyymm = {date} GROUP BY host """ schema = [ bigquery.SchemaField("yyyymm", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("host", "STRING", mode="REQUIRED"), bigquery.SchemaField("global_rank", "INTEGER"), bigquery.SchemaField("local_rank", "INTEGER"), bigquery.SchemaField("sightline_rank", "INTEGER"), ] logging.info("Updating host_min_ranks data") client.ensure_table( "host_min_ranks", schema, partition=RangePartition("yyyymm", 201701, 202501) ) client.query(query) def update_import_date( client: BigQuery, config: Config, run_at: datetime, data_yyyymm: int ) -> None: if not config.write: return formatted_time = run_at.strftime("%Y-%m-%dT%H:%M:%SZ") rows_to_insert = [ { "run_at": formatted_time, "yyyymm": data_yyyymm, }, ] logging.info("Updating last run date") schema = [ bigquery.SchemaField("run_at", "TIMESTAMP", mode="REQUIRED"), bigquery.SchemaField("yyyymm", "INTEGER", mode="REQUIRED"), ] runs_table = client.ensure_table("import_runs", schema) client.insert_rows(runs_table, rows_to_insert) def get_previous_month_yyyymm(date: datetime) -> int: year = date.year month = date.month - 1 if month == 0: year -= 1 month = 12 return year * 100 + month class CruxJob(EtlJob): name = "crux" @classmethod def add_arguments(cls, parser: argparse.ArgumentParser) -> None: group = parser.add_argument_group( title="CrUX", description="CrUX update arguments" ) group.add_argument( "--bq-crux-dataset", default="crux_imported", help="BigQuery CrUX import dataset", ) def set_default_args( self, parser: argparse.ArgumentParser, args: argparse.Namespace ) -> None: if not VALID_DATASET_ID.match(args.bq_crux_dataset): parser.print_usage() logging.error(f"Invalid crux dataset id {args.bq_crux_dataset}") sys.exit(1) def default_dataset(self, args: argparse.Namespace) -> str: return args.bq_crux_dataset def main(self, client: BigQuery, args: argparse.Namespace) -> None: run_at = datetime.now(UTC) config = Config.from_args(args) last_import_yyyymm = get_imported_datasets(client, config) last_yyyymm = get_previous_month_yyyymm(run_at) logging.debug(f"Last CrUX import was {last_import_yyyymm}") logging.debug(f"Last month was {last_yyyymm}") if last_import_yyyymm >= last_yyyymm: logging.info(f"Already have a CrUX import for {last_yyyymm}") return latest_yyyymm = get_latest_crux_dataset(client) logging.debug(f"Latest CrUX data is {latest_yyyymm}") if last_import_yyyymm >= latest_yyyymm: logging.info("No new CrUX data available") return update_crux_data(client, config, latest_yyyymm) update_sightline_data(client, config, latest_yyyymm) update_min_rank_data(client, config, latest_yyyymm) update_import_date(client, config, run_at, latest_yyyymm)