jobs/webcompat-kb/webcompat_kb/utils.py (203 lines of code) (raw):

import argparse import logging import sys from dataclasses import dataclass from datetime import datetime from typing import Mapping from google.cloud import bigquery from .bqhelpers import BigQuery, Json, get_client def get_parser_create_test_dataset() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() parser.add_argument( "--log-level", choices=["debug", "info", "warn", "error"], default="info", help="Log level", ) parser.add_argument( "--bq-project", dest="bq_project_id", help="BigQuery project id" ) parser.add_argument("--bq-kb-dataset", help="BigQuery knowledge base dataset id") parser.add_argument("--write", action="store_true", help="Write changes") return parser def create_test_dataset() -> None: logging.basicConfig() parser = get_parser_create_test_dataset() args = parser.parse_args() logging.getLogger().setLevel(logging.getLevelNamesMapping()[args.log_level.upper()]) test_dataset_name = f"{args.bq_kb_dataset}_test" tables = [ "breakage_reports", "bugs_history", "bugzilla_bugs", "core_bugs", "etp_breakage_reports", "import_runs", "interventions", "kb_bugs", "other_browser_issues", "standards_issues", "standards_positions", ] logging.info(f"Will create dataset {args.bq_project_id}.{test_dataset_name}") for table in tables: logging.info( f"Will create table {args.bq_project_id}.{test_dataset_name}.{table} from {args.bq_project_id}.{args.bq_kb_dataset}.{table}" ) res = "" while res not in {"y", "n"}: res = input("Continue y/N? ").strip().lower() res = "n" if res == "" else res if res != "y": sys.exit(1) client = BigQuery(get_client(args.bq_project_id), test_dataset_name, args.write) if not args.write: logging.info("Not writing; pass --write to commit changes") else: client.client.create_dataset(test_dataset_name, exists_ok=True) for table_name in tables: target = f"{test_dataset_name}.{table_name}" if args.write: client.delete_table(target, not_found_ok=True) else: logging.info(f"Would delete table {target}") src = f"{args.bq_kb_dataset}.{table_name}" query = f""" CREATE TABLE {target} CLONE {src} """ if args.write: logging.info(f"Creating table {target} from {src}") client.query(query) else: logging.info(f"Would run query:{query}") @dataclass(frozen=True) class HistoryKey: number: int who: str change_time: datetime def get_parser_backfill_history() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() parser.add_argument( "--log-level", choices=["debug", "info", "warn", "error"], default="info", help="Log level", ) parser.add_argument( "--bq-project", dest="bq_project_id", help="BigQuery project id" ) parser.add_argument( "--bq-kb-src-dataset", help="BigQuery knowledge base source dataset id" ) parser.add_argument( "--bq-kb-dest-dataset", help="BigQuery knowledge base source dataset id" ) parser.add_argument( "--write", action="store_true", default=False, help="Write changes" ) return parser def normalize_change(change: dict[str, str]) -> dict[str, str]: if change["field_name"] == "keywords": for key in ["added", "removed"]: items = change[key].split(", ") items.sort() change[key] = ", ".join(items) return change def backfill_history() -> None: logging.basicConfig() parser = get_parser_backfill_history() args = parser.parse_args() logging.getLogger().setLevel(logging.getLevelNamesMapping()[args.log_level.upper()]) src_dataset = args.bq_kb_src_dataset dest_dataset = args.bq_kb_dest_dataset client = BigQuery(get_client(args.bq_project_id), dest_dataset, args.write) existing_records_dest: dict[HistoryKey, list[dict[str, str]]] = {} existing_records_src: dict[HistoryKey, list[dict[str, str]]] = {} for dataset, records in [ (dest_dataset, existing_records_dest), (src_dataset, existing_records_src), ]: for row in client.query("""SELECT * FROM bugs_history""", dataset_id=dataset): key = HistoryKey(row.number, row.who, row.change_time) if key in records: logging.warning( f"Got duplicate src data for {key}: {row.changes}, {records[key]}" ) for change in row.changes: if change not in records[key]: records[key].append(change) else: records[key] = row.changes logging.info( f"Started with {len(existing_records_src)} records in {src_dataset} and {len(existing_records_dest)} in {dest_dataset}" ) new_records: list[tuple[datetime, Mapping[str, Json]]] = [] new_count = 0 updated_count = 0 unchanged_count = 0 for key, changes in existing_records_src.items(): if key in existing_records_dest: existing = [ normalize_change(change) for change in existing_records_dest[key] ] new = [normalize_change(change) for change in changes] if new == existing or ( all(item in existing for item in new) and all(item in new for item in existing) ): unchanged_count += 1 else: missing = [item for item in existing if item not in new] if missing: logging.warning( f"Updating record {key}, merging {new} with {existing}" ) changes.extend(missing) updated_count += 1 else: new_count += 1 new_records.append( ( key.change_time, { "number": key.number, "who": key.who, "change_time": key.change_time.isoformat(), "changes": changes, }, ) ) for key, changes in existing_records_dest.items(): if key not in existing_records_src: unchanged_count += 1 new_records.append( ( key.change_time, { "number": key.number, "who": key.who, "change_time": key.change_time.isoformat(), "changes": changes, }, ) ) logging.info( f"Writing {len(new_records)} records to {dest_dataset}, {unchanged_count} unchanged, {updated_count} updated, {new_count} new" ) new_records.sort() schema = [ bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("who", "STRING", mode="REQUIRED"), bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), bigquery.SchemaField( "changes", "RECORD", mode="REPEATED", fields=[ bigquery.SchemaField("field_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("added", "STRING", mode="REQUIRED"), bigquery.SchemaField("removed", "STRING", mode="REQUIRED"), ], ), ] client.write_table( "bugs_history", schema, [item[1] for item in new_records], overwrite=True )