jobs/webcompat-kb/webcompat_kb/bugzilla.py (1,144 lines of code) (raw):

import argparse import enum import json import logging import os import re import time from typing import ( Any, Iterable, Iterator, MutableMapping, Optional, Self, ) from collections import defaultdict from collections.abc import Sequence, Mapping from dataclasses import dataclass from datetime import datetime, timedelta import bugdantic from google.cloud import bigquery from .base import EtlJob from .bqhelpers import BigQuery class BugLoadError(Exception): pass @dataclass(frozen=True) class Bug: id: int summary: str status: str resolution: str product: str component: str creator: str see_also: list[str] depends_on: list[int] blocks: list[int] priority: Optional[int] severity: Optional[int] creation_time: datetime assigned_to: Optional[str] keywords: list[str] url: str user_story: str last_resolved: Optional[datetime] last_change_time: datetime size_estimate: Optional[str] whiteboard: str webcompat_priority: Optional[str] webcompat_score: Optional[int] @property def parsed_user_story(self) -> Mapping[str, Any]: return parse_user_story(self.user_story) @property def resolved(self) -> Optional[datetime]: if self.status in {"RESOLVED", "VERIFIED"} and self.last_resolved: return self.last_resolved return None @classmethod def from_bugzilla(cls, bug: bugdantic.bugzilla.Bug) -> Self: assert bug.id is not None assert bug.summary is not None assert bug.status is not None assert bug.resolution is not None assert bug.product is not None assert bug.component is not None assert bug.creator is not None assert bug.see_also is not None assert bug.depends_on is not None assert bug.blocks is not None assert bug.priority is not None assert bug.severity is not None assert bug.creation_time is not None assert bug.assigned_to is not None assert bug.keywords is not None assert bug.url is not None assert bug.last_change_time is not None assert bug.whiteboard is not None assert bug.cf_user_story is not None return cls( id=bug.id, summary=bug.summary, status=bug.status, resolution=bug.resolution, product=bug.product, component=bug.component, see_also=bug.see_also, depends_on=bug.depends_on, blocks=bug.blocks, priority=extract_int_from_field( bug.priority, value_map={ "--": None, }, ), severity=extract_int_from_field( bug.severity, value_map={ "n/a": None, "--": None, "blocker": 1, "critical": 1, "major": 2, "normal": 3, "minor": 4, "trivial": 4, "enhancement": 4, }, ), creation_time=bug.creation_time, assigned_to=bug.assigned_to if bug.assigned_to != "nobody@mozilla.org" else None, keywords=bug.keywords, url=bug.url, user_story=bug.cf_user_story, last_resolved=bug.cf_last_resolved, last_change_time=bug.last_change_time, whiteboard=bug.whiteboard, creator=bug.creator, size_estimate=( bug.cf_size_estimate if bug.cf_size_estimate != "---" else None ), webcompat_priority=( bug.cf_webcompat_priority if bug.cf_webcompat_priority != "---" else None ), webcompat_score=extract_int_from_field( bug.cf_webcompat_score, value_map={ "---": None, "?": None, }, ), ) def to_json(self) -> Mapping[str, Any]: fields = {**vars(self)} for key in fields: if isinstance(fields[key], datetime): fields[key] = fields[key].isoformat() return fields @classmethod def from_json(cls, bug_data: Mapping[str, Any]) -> Self: return cls( id=bug_data["id"], summary=bug_data["summary"], status=bug_data["status"], resolution=bug_data["resolution"], product=bug_data["product"], component=bug_data["component"], see_also=bug_data["see_also"], depends_on=bug_data["depends_on"], blocks=bug_data["blocks"], priority=bug_data["priority"], severity=bug_data["severity"], creation_time=datetime.fromisoformat(bug_data["creation_time"]), assigned_to=bug_data["assigned_to"], keywords=bug_data["keywords"], url=bug_data["url"], user_story=bug_data["user_story"], last_resolved=datetime.fromisoformat(bug_data["last_resolved"]) if bug_data["last_resolved"] is not None else None, last_change_time=datetime.fromisoformat(bug_data["last_change_time"]), whiteboard=bug_data["whiteboard"], creator=bug_data["creator"], size_estimate=bug_data["size_estimate"], webcompat_priority=bug_data["webcompat_priority"], webcompat_score=bug_data["webcompat_score"], ) @dataclass(frozen=True) class BugHistoryChange: field_name: str added: str removed: str @dataclass(frozen=True) class BugHistoryEntry: number: int who: str change_time: datetime changes: list[BugHistoryChange] @dataclass(frozen=True) class HistoryChange: number: int who: str change_time: datetime field_name: str added: str removed: str class PropertyChange(enum.StrEnum): added = "added" removed = "removed" @dataclass(frozen=True) class PropertyHistoryItem: change_time: datetime change: PropertyChange class PropertyHistory: """Representation of the history of a specific boolean property (i.e. one that can be present or not)""" def __init__(self) -> None: self.data: list[PropertyHistoryItem] = [] def __len__(self) -> int: return len(self.data) def add(self, change_time: datetime, change: PropertyChange) -> None: self.data.append(PropertyHistoryItem(change_time=change_time, change=change)) def missing_initial_add(self) -> bool: """Check if the property was initially added""" self.data.sort(key=lambda x: x.change_time) return len(self.data) == 0 or self.data[0].change == PropertyChange.removed BugId = int BugsById = Mapping[BugId, Bug] MutBugsById = MutableMapping[BugId, Bug] HistoryByBug = Mapping[BugId, Sequence[BugHistoryEntry]] BUG_QUERIES: Mapping[str, dict[str, str | list[str]]] = { "webcompat_product": { "component": [ "Knowledge Base", "Privacy: Site Reports", "Site Reports", "Interventions", ], "product": "Web Compatibility", "j_top": "OR", "f1": "bug_status", "o1": "changedafter", "v1": "2020-01-01", "f2": "resolution", "o2": "isempty", }, "webcompat_other": { "f1": "product", "o1": "notequals", "v1": "Web Compatibility", "f2": "OP", "j2": "OR", "f3": "keywords", "o3": "casesubstring", "v3": "webcompat:", "f4": "keywords", "o4": "casesubstring", "v4": "parity-", "f5": "cf_user_story", "o5": "casesubstring", "v5": "web-feature:", "f6": "CP", "f7": "OP", "j7": "OR", "f8": "bug_status", "o8": "changedafter", "v8": "2020-01-01", "f9": "resolution", "o9": "isempty", "f10": "CP", }, } @dataclass class BugLinkConfig: table_name: str from_field_name: str to_field_name: str @dataclass class ExternalLinkConfig: table_name: str field_name: str match_substrs: list[str] def get_links( self, all_bugs: BugsById, kb_bugs: set[BugId] ) -> Mapping[BugId, set[str]]: rv: defaultdict[int, set[str]] = defaultdict(set) for kb_bug_id in kb_bugs: bug_ids = [kb_bug_id] + all_bugs[kb_bug_id].depends_on for bug_id in bug_ids: if bug_id not in all_bugs: continue bug = all_bugs[bug_id] for entry in bug.see_also: if any(substr in entry for substr in self.match_substrs): rv[kb_bug_id].add(entry) return rv EXTERNAL_LINK_CONFIGS = { config.table_name: config for config in [ ExternalLinkConfig( "interventions", "code_url", ["github.com/mozilla-extensions/webcompat-addon"], ), ExternalLinkConfig( "other_browser_issues", "issue_url", ["bugs.chromium.org", "bugs.webkit.org", "crbug.com"], ), ExternalLinkConfig( "standards_issues", "issue_url", ["github.com/w3c", "github.com/whatwg", "github.com/wicg"], ), ExternalLinkConfig( "standards_positions", "discussion_url", ["standards-positions"] ), ] } def extract_int_from_field( field_value: Optional[str], value_map: Optional[Mapping[str, Optional[int]]] = None ) -> Optional[int]: if field_value: if value_map and field_value.lower() in value_map: return value_map[field_value.lower()] match = re.search(r"\d+", field_value) if match: return int(match.group()) logging.warning( f"Unexpected field value '{field_value}', could not convert to integer" ) return None def parse_user_story(input_string: str) -> Mapping[str, str | list[str]]: if not input_string: return {} lines = input_string.splitlines() result_dict: dict[str, str | list[str]] = {} for line in lines: if line: key_value = line.split(":", 1) if len(key_value) == 2: key, value = key_value if key in result_dict: current_value = result_dict[key] if isinstance(current_value, list): current_value.append(value) else: result_dict[key] = [current_value, value] else: result_dict[key] = value if not result_dict: return {} return result_dict class BugCache(Mapping): def __init__(self, bz_client: bugdantic.Bugzilla): self.bz_client = bz_client self.bugs: MutBugsById = {} def __getitem__(self, key: BugId) -> Bug: return self.bugs[key] def __len__(self) -> int: return len(self.bugs) def __iter__(self) -> Iterator[BugId]: yield from self.bugs def bz_fetch_bugs( self, params: Optional[dict[str, str | list[str]]] = None, bug_ids: Optional[Sequence[BugId]] = None, ) -> None: if (params is None and bug_ids is None) or ( params is not None and bug_ids is not None ): raise ValueError("Must pass params or ids but not both") fields = [ "id", "summary", "status", "resolution", "product", "component", "creator", "see_also", "depends_on", "blocks", "priority", "severity", "creation_time", "assigned_to", "keywords", "url", "cf_user_story", "cf_last_resolved", "last_change_time", "whiteboard", "cf_size_estimate", "cf_webcompat_priority", "cf_webcompat_score", ] try: if params is not None: bugs = self.bz_client.search( query=params, include_fields=fields, page_size=200 ) else: assert bug_ids is not None bugs = self.bz_client.bugs( bug_ids, include_fields=fields, page_size=200 ) for bug in bugs: assert bug.id is not None self.bugs[bug.id] = Bug.from_bugzilla(bug) except Exception as e: logging.error(f"Error: {e}") raise def missing_relations(self, bugs: BugsById, relation: str) -> set[BugId]: related_ids = set() for bug in bugs.values(): related_ids |= { bug_id for bug_id in getattr(bug, relation) if bug_id not in self } return related_ids def into_mapping(self) -> BugsById: """Convert the data into a plain dict. Also reset this object, so we aren't sharing the state between multiple places """ bugs = self.bugs self.bugs = {} return bugs def is_site_report(bug: Bug) -> bool: return (bug.product == "Web Compatibility" and bug.component == "Site Reports") or ( bug.product != "Web Compatibility" and "webcompat:site-report" in bug.keywords ) def is_etp_report(bug: Bug) -> bool: return ( bug.product == "Web Compatibility" and bug.component == "Privacy: Site Reports" ) def is_kb_entry(bug: Bug) -> bool: """Get things that are directly in the knowledge base. This doesn't include core bugs that should be considered part of the knowledge base because they directly block a platform bug.""" return bug.product == "Web Compatibility" and bug.component == "Knowledge Base" def is_webcompat_platform_bug(bug: Bug) -> bool: """Check if a bug is a platform bug . These are only actually in the kb if they also block a site report""" return ( bug.product != "Web Compatibility" and "webcompat:platform-bug" in bug.keywords ) def get_kb_bug_core_bugs( all_bugs: BugsById, kb_bugs: set[BugId], platform_bugs: set[BugId] ) -> Mapping[BugId, set[BugId]]: rv = defaultdict(set) for kb_id in kb_bugs: if kb_id not in platform_bugs: for bug_id in all_bugs[kb_id].depends_on: if bug_id in platform_bugs: rv[kb_id].add(bug_id) return rv def get_kb_bug_site_report( all_bugs: BugsById, kb_bugs: set[BugId], site_report_bugs: set[BugId] ) -> Mapping[BugId, set[BugId]]: rv = defaultdict(set) for kb_id in kb_bugs: if kb_id in site_report_bugs: rv[kb_id].add(kb_id) for bug_id in all_bugs[kb_id].blocks: if bug_id in site_report_bugs: rv[kb_id].add(bug_id) return rv def get_etp_breakage_reports( all_bugs: BugsById, etp_reports: set[BugId] ) -> Mapping[BugId, set[BugId]]: rv = {} for bug_id in etp_reports: report_bug = all_bugs[bug_id] meta_bugs = { meta_id for meta_id in report_bug.depends_on + report_bug.blocks if meta_id in all_bugs and "meta" in all_bugs[meta_id].keywords } if meta_bugs: rv[bug_id] = meta_bugs return rv def fetch_all_bugs( bz_client: bugdantic.Bugzilla, ) -> BugsById: """Get all the bugs that should be imported into BigQuery. :returns: A tuple of (all bugs, site report bugs, knowledge base bugs, core bugs, ETP report bugs, ETP dependencies).""" bug_cache = BugCache(bz_client) for category, filter_config in BUG_QUERIES.items(): logging.info(f"Fetching {category} bugs") bug_cache.bz_fetch_bugs(params=filter_config) tried_to_fetch: set[BugId] = set() missing_relations = None # Add a limit on how many fetches we will try recurse_limit = 10 for _ in range(recurse_limit): # Get all blocking bugs for site reports or kb entries or etp site reports # This can take more than one iteration if dependencies themselves turn out # to be site reports that were excluded by a the date cutoff missing_relations = bug_cache.missing_relations( { bug_id: bug for bug_id, bug in bug_cache.items() if is_site_report(bug) or is_kb_entry(bug) or is_etp_report(bug) }, "depends_on", ) missing_relations |= bug_cache.missing_relations( {bug_id: bug for bug_id, bug in bug_cache.items() if is_etp_report(bug)}, "blocks", ) # If we already tried to fetch a bug don't try to fetch it again missing_relations -= tried_to_fetch if not missing_relations: break tried_to_fetch |= missing_relations logging.info("Fetching related bugs") bug_cache.bz_fetch_bugs(bug_ids=list(missing_relations)) for bug_id in missing_relations: if bug_id not in bug_cache: logging.warning(f"Failed to fetch bug {bug_id}") else: logging.warning( f"Failed to fetch all dependencies after {recurse_limit} attempts" ) return bug_cache.into_mapping() class BugHistoryUpdater: def __init__( self, bq_client: BigQuery, bz_client: bugdantic.Bugzilla, ): self.bq_client = bq_client self.bz_client = bz_client def run(self, all_bugs: BugsById, recreate: bool) -> HistoryByBug: if not recreate: existing_records = self.bigquery_fetch_history(all_bugs.keys()) new_bugs, existing_bugs = self.group_bugs(all_bugs) existing_bugs_history = self.missing_records( existing_records, self.existing_bugs_history(existing_bugs) ) else: existing_records = {} new_bugs = all_bugs existing_bugs_history = {} new_bugs_history = self.missing_records( existing_records, self.new_bugs_history(new_bugs) ) if not (new_bugs_history or existing_bugs_history): logging.info("No relevant history updates") return {} return self.merge_history(existing_bugs_history, new_bugs_history) def group_bugs(self, all_bugs: BugsById) -> tuple[BugsById, BugsById]: all_ids = set(all_bugs.keys()) existing_ids = self.bigquery_fetch_imported_ids() new_ids = all_ids - existing_ids new_bugs = { bug_id: bug for bug_id, bug in all_bugs.items() if bug_id in new_ids } existing_bugs = { bug_id: bug for bug_id, bug in all_bugs.items() if bug_id not in new_ids } return new_bugs, existing_bugs def merge_history(self, *sources: HistoryByBug) -> HistoryByBug: history: defaultdict[BugId, list[BugHistoryEntry]] = defaultdict(list) for source in sources: for bug_id, changes in source.items(): history[bug_id].extend(changes) return history def new_bugs_history(self, new_bugs: BugsById) -> HistoryByBug: history = self.bugzilla_fetch_history(new_bugs.keys()) synthetic_history = self.create_initial_history_entry(new_bugs, history) return self.merge_history(history, synthetic_history) def existing_bugs_history(self, existing_bugs: BugsById) -> HistoryByBug: last_import_time = self.bigquery_last_import() if last_import_time is None: logging.info("No previous history update found") return {} updated_bugs = { bug_id for bug_id, bug in existing_bugs.items() if bug.last_change_time > last_import_time } if not updated_bugs: logging.info(f"No updated bugs since {last_import_time.isoformat()}") return {} logging.info( f"Fetching history for bugs {','.join(str(item) for item in updated_bugs)} updated since {last_import_time.isoformat()}" ) bugs_full_history = self.bugzilla_fetch_history(updated_bugs) # Filter down to only recent updates, since we always get the full history bugs_history = {} for bug_id, bug_full_history in bugs_full_history.items(): bug_history = [ item for item in bug_full_history if item.change_time > last_import_time ] if bug_history: bugs_history[bug_id] = bug_history return bugs_history def missing_records( self, existing_records: HistoryByBug, updates: HistoryByBug ) -> HistoryByBug: if not existing_records: return updates existing_history = set(self.flatten_history(existing_records)) new_history = set(self.flatten_history(updates)) diff = new_history - existing_history return self.unflatten_history(item for item in new_history if item in diff) def bigquery_fetch_imported_ids(self) -> set[int]: query = """SELECT number FROM bugzilla_bugs""" res = self.bq_client.query(query) rows = list(res) imported_ids = {bug["number"] for bug in rows} return imported_ids def bigquery_last_import(self) -> Optional[datetime]: query = """ SELECT MAX(run_at) AS last_run_at FROM import_runs WHERE is_history_fetch_completed = TRUE """ res = self.bq_client.query(query) row = list(res)[0] return row["last_run_at"] def bugzilla_fetch_history(self, ids: Iterable[int]) -> HistoryByBug: history: dict[int, list[bugdantic.bugzilla.History]] = {} chunk_size = 100 ids_list = list(ids) max_retries = 3 for retry in range(max_retries): retry += 1 retry_ids: list[int] = [] chunks: list[list[int]] = [] for i in range((len(ids_list) // chunk_size) + 1): offset = i * chunk_size bug_ids = ids_list[offset : offset + chunk_size] if bug_ids: chunks.append(bug_ids) for i, bug_ids in enumerate(chunks): logging.info( f"Fetching history from bugzilla for {','.join(str(item) for item in bug_ids)} ({i + 1}/{len(chunks)})" ) try: results = self.bz_client.search( query={"id": bug_ids}, include_fields=["id", "history"] ) except Exception as e: logging.warning(f"Search request failed:\n{e}") results = [] for bug in results: assert bug.id is not None assert bug.history is not None history[bug.id] = bug.history # Add anything missing from the response to the retry list retry_ids.extend(item for item in bug_ids if item not in history) time.sleep(1) ids_list = retry_ids if retry_ids: time.sleep(10) logging.info(f"Retrying {len(retry_ids)} bugs with missing history") else: break if len(ids_list) != 0: raise BugLoadError( f"Failed to fetch bug history for {','.join(str(item) for item in ids_list)}" ) return self.bugzilla_to_history_entry(history) def bugzilla_to_history_entry( self, updated_history: Mapping[int, list[bugdantic.bugzilla.History]] ) -> HistoryByBug: rv: dict[int, list[BugHistoryEntry]] = defaultdict(list) for bug_id, history in updated_history.items(): # Need to ensure we have an entry for every bug even if there isn't any history rv[bug_id] = [] for record in history: relevant_changes = [ BugHistoryChange( field_name=change.field_name, added=change.added, removed=change.removed, ) for change in record.changes if change.field_name in ["keywords", "status", "url", "cf_user_story"] ] if relevant_changes: filtered_record = BugHistoryEntry( number=bug_id, who=record.who, change_time=record.when, changes=relevant_changes, ) rv[bug_id].append(filtered_record) return rv def bigquery_fetch_history(self, bug_ids: Iterable[int]) -> HistoryByBug: rv: defaultdict[int, list[BugHistoryEntry]] = defaultdict(list) formatted_numbers = ", ".join(str(bug_id) for bug_id in bug_ids) query = f""" SELECT * FROM bugs_history WHERE number IN ({formatted_numbers}) """ result = self.bq_client.query(query) for row in result: rv[row["number"]].append( BugHistoryEntry( row["number"], row["who"], row["change_time"], changes=[ BugHistoryChange( change["field_name"], change["added"], change["removed"] ) for change in row["changes"] ], ) ) return rv def keyword_history( self, history: HistoryByBug ) -> Mapping[BugId, Mapping[str, PropertyHistory]]: """Get the time each keyword has been added and removed from each bug""" keyword_history: defaultdict[int, dict[str, PropertyHistory]] = defaultdict( dict ) for bug_id, records in history.items(): for record in records: for change in record.changes: if change.field_name == "keywords": for src, change_type in [ (change.added, PropertyChange.added), (change.removed, PropertyChange.removed), ]: if src: for keyword in src.split(", "): if keyword not in keyword_history[bug_id]: keyword_history[bug_id][keyword] = ( PropertyHistory() ) keyword_history[bug_id][keyword].add( change_time=record.change_time, change=change_type, ) return keyword_history def get_missing_keywords( self, bug_id: int, current_keywords: list[str], keyword_history: Mapping[BugId, Mapping[str, PropertyHistory]], ) -> set[str]: missing_keywords = set() # Check if keyword exists, but is not in "added" history for keyword in current_keywords: if bug_id not in keyword_history or keyword not in keyword_history[bug_id]: missing_keywords.add(keyword) # Check for keywords that have "removed" record as the earliest # event in the sorted timeline if bug_id in keyword_history: for keyword, history in keyword_history[bug_id].items(): if history.missing_initial_add(): missing_keywords.add(keyword) return missing_keywords def create_initial_history_entry( self, all_bugs: BugsById, history: HistoryByBug ) -> HistoryByBug: """Backfill history entries for bug creation. If a bug has keywords set, but there isn't a history entry corresponding to the keyword being added, we assume they were set on bug creation, and create a history entry to represent that.""" result: dict[int, list[BugHistoryEntry]] = {} keyword_history = self.keyword_history(history) for bug_id, bug in all_bugs.items(): missing_keywords = self.get_missing_keywords( bug_id, bug.keywords, keyword_history ) if missing_keywords: logging.debug( f"Adding initial history for bug {bug_id} with keywords {missing_keywords}" ) record = BugHistoryEntry( number=bug.id, who=bug.creator, change_time=bug.creation_time, changes=[ BugHistoryChange( added=", ".join(sorted(missing_keywords)), field_name="keywords", removed="", ) ], ) result[bug.id] = [record] return result def flatten_history(self, history: HistoryByBug) -> Iterable[HistoryChange]: for records in history.values(): for record in records: for change in record.changes: yield HistoryChange( record.number, record.who, record.change_time, change.field_name, change.added, change.removed, ) def unflatten_history(self, diff: Iterable[HistoryChange]) -> HistoryByBug: changes: dict[tuple[int, str, datetime], BugHistoryEntry] = {} for item in diff: key = (item.number, item.who, item.change_time) if key not in changes: changes[key] = BugHistoryEntry( number=item.number, who=item.who, change_time=item.change_time, changes=[], ) changes[key].changes.append( BugHistoryChange( field_name=item.field_name, added=item.added, removed=item.removed, ) ) rv: dict[int, list[BugHistoryEntry]] = {} for change in changes.values(): if change.number not in rv: rv[change.number] = [] rv[change.number].append(change) return rv class BigQueryImporter: """Class to handle all writes to BigQuery""" def __init__(self, bq_client: BigQuery): self.client = bq_client def write_table( self, table_name: str, schema: list[bigquery.SchemaField], rows: Sequence[Mapping[str, Any]], overwrite: bool, ) -> None: table = self.client.ensure_table(table_name, schema, False) self.client.write_table(table, schema, rows, overwrite) def convert_bug(self, bug: Bug) -> Mapping[str, Any]: return { "number": bug.id, "title": bug.summary, "status": bug.status, "resolution": bug.resolution, "product": bug.product, "component": bug.component, "creator": bug.creator, "severity": bug.severity, "priority": bug.priority, "creation_time": bug.creation_time.isoformat(), "assigned_to": bug.assigned_to, "keywords": bug.keywords, "url": bug.url, "user_story": bug.parsed_user_story, "user_story_raw": bug.user_story, "resolved_time": bug.resolved.isoformat() if bug.resolved is not None else None, "whiteboard": bug.whiteboard, "size_estimate": bug.size_estimate, "webcompat_priority": bug.webcompat_priority, "webcompat_score": bug.webcompat_score, "depends_on": bug.depends_on, "blocks": bug.blocks, } def convert_history_entry(self, entry: BugHistoryEntry) -> Mapping[str, Any]: return { "number": entry.number, "who": entry.who, "change_time": entry.change_time.isoformat(), "changes": [ { "field_name": change.field_name, "added": change.added, "removed": change.removed, } for change in entry.changes ], } def insert_bugs(self, all_bugs: BugsById) -> None: table = "bugzilla_bugs" schema = [ bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("title", "STRING", mode="REQUIRED"), bigquery.SchemaField("status", "STRING", mode="REQUIRED"), bigquery.SchemaField("resolution", "STRING", mode="REQUIRED"), bigquery.SchemaField("product", "STRING", mode="REQUIRED"), bigquery.SchemaField("component", "STRING", mode="REQUIRED"), bigquery.SchemaField("creator", "STRING", mode="REQUIRED"), bigquery.SchemaField("severity", "INTEGER"), bigquery.SchemaField("priority", "INTEGER"), bigquery.SchemaField("creation_time", "TIMESTAMP", mode="REQUIRED"), bigquery.SchemaField("assigned_to", "STRING"), bigquery.SchemaField("keywords", "STRING", mode="REPEATED"), bigquery.SchemaField("url", "STRING"), bigquery.SchemaField("user_story", "JSON"), bigquery.SchemaField("user_story_raw", "STRING"), bigquery.SchemaField("resolved_time", "TIMESTAMP"), bigquery.SchemaField("size_estimate", "STRING"), bigquery.SchemaField("whiteboard", "STRING"), bigquery.SchemaField("webcompat_priority", "STRING"), bigquery.SchemaField("webcompat_score", "INTEGER"), bigquery.SchemaField("depends_on", "INTEGER", mode="REPEATED"), bigquery.SchemaField("blocks", "INTEGER", mode="REPEATED"), ] rows = [self.convert_bug(bug) for bug in all_bugs.values()] self.write_table(table, schema, rows, overwrite=True) def insert_history_changes( self, history_entries: HistoryByBug, recreate: bool ) -> None: schema = [ bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("who", "STRING", mode="REQUIRED"), bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), bigquery.SchemaField( "changes", "RECORD", mode="REPEATED", fields=[ bigquery.SchemaField("field_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("added", "STRING", mode="REQUIRED"), bigquery.SchemaField("removed", "STRING", mode="REQUIRED"), ], ), ] rows = [ self.convert_history_entry(entry) for entries in history_entries.values() for entry in entries ] self.write_table("bugs_history", schema, rows, overwrite=recreate) def insert_bug_list( self, table_name: str, field_name: str, bugs: Iterable[BugId] ) -> None: schema = [bigquery.SchemaField(field_name, "INTEGER", mode="REQUIRED")] rows = [{field_name: bug_id} for bug_id in bugs] self.write_table(table_name, schema, rows, overwrite=True) def insert_bug_links( self, link_config: BugLinkConfig, links_by_bug: Mapping[BugId, Iterable[BugId]] ) -> None: schema = [ bigquery.SchemaField( link_config.from_field_name, "INTEGER", mode="REQUIRED" ), bigquery.SchemaField(link_config.to_field_name, "INTEGER", mode="REQUIRED"), ] rows = [ { link_config.from_field_name: from_bug_id, link_config.to_field_name: to_bug_id, } for from_bug_id, to_bug_ids in links_by_bug.items() for to_bug_id in to_bug_ids ] self.write_table(link_config.table_name, schema, rows, overwrite=True) def insert_external_links( self, link_config: ExternalLinkConfig, links_by_bug: Mapping[BugId, Iterable[str]], ) -> None: schema = [ bigquery.SchemaField("knowledge_base_bug", "INTEGER", mode="REQUIRED"), bigquery.SchemaField(link_config.field_name, "STRING", mode="REQUIRED"), ] rows = [ {"knowledge_base_bug": bug_id, link_config.field_name: link_text} for bug_id, links in links_by_bug.items() for link_text in links ] self.write_table(link_config.table_name, schema, rows, overwrite=True) def record_import_run( self, start_time: float, count: int, history_count: Optional[int], last_change_time: datetime, ) -> None: elapsed_time = time.monotonic() - start_time elapsed_time_delta = timedelta(seconds=elapsed_time) run_at = last_change_time - elapsed_time_delta formatted_time = run_at.strftime("%Y-%m-%dT%H:%M:%SZ") rows_to_insert = [ { "run_at": formatted_time, "bugs_imported": count, "bugs_history_updated": history_count if history_count is not None else 0, "is_history_fetch_completed": history_count is not None, }, ] bugbug_runs_table = "import_runs" self.client.insert_rows(bugbug_runs_table, rows_to_insert) def get_kb_entries(all_bugs: BugsById, site_report_blockers: set[BugId]) -> set[BugId]: direct_kb_entries = {bug_id for bug_id, bug in all_bugs.items() if is_kb_entry(bug)} kb_blockers = { dependency for bug_id in direct_kb_entries for dependency in all_bugs[bug_id].depends_on if dependency in all_bugs } # We include any bug that's blocking a site report but isn't in Web Compatibility platform_site_report_blockers = { bug_id for bug_id in site_report_blockers if bug_id not in kb_blockers and all_bugs[bug_id].product != "Web Compatibility" } # We also include all other bugs that are platform bugs but don't depend on a kb entry # TODO: This is probably too many bugs; platform bugs that don't block any site reports # should likely be excluded platform_kb_entries = { bug_id for bug_id in all_bugs if bug_id not in kb_blockers and is_webcompat_platform_bug(all_bugs[bug_id]) } return direct_kb_entries | platform_site_report_blockers | platform_kb_entries def group_bugs( all_bugs: BugsById, ) -> tuple[set[BugId], set[BugId], set[BugId], set[BugId]]: """Extract groups of bugs according to their types""" site_reports = {bug_id for bug_id, bug in all_bugs.items() if is_site_report(bug)} etp_reports = {bug_id for bug_id, bug in all_bugs.items() if is_etp_report(bug)} site_report_blockers = { dependency for bug_id in site_reports for dependency in all_bugs[bug_id].depends_on # This might not be true if the dependency is a bug we can't access if dependency in all_bugs } kb_bugs = get_kb_entries(all_bugs, site_report_blockers) platform_bugs = { bug_id for bug_id in all_bugs if all_bugs[bug_id].product != "Web Compatibility" } return site_reports, etp_reports, kb_bugs, platform_bugs def write_bugs( path: str, all_bugs: BugsById, site_reports: set[BugId], etp_reports: set[BugId], kb_bugs: set[BugId], platform_bugs: set[BugId], bug_links: Iterable[tuple[BugLinkConfig, Mapping[BugId, set[BugId]]]], external_links: Iterable[tuple[ExternalLinkConfig, Mapping[BugId, set[str]]]], ) -> None: data: dict[str, Any] = {} data["all_bugs"] = {bug_id: bug.to_json() for bug_id, bug in all_bugs.items()} data["site_report"] = list(site_reports) data["etp_reports"] = list(etp_reports) data["kb_bugs"] = list(kb_bugs) data["platform_bugs"] = list(platform_bugs) for bug_link_config, link_data in bug_links: data[bug_link_config.table_name] = { bug_id: list(values) for bug_id, values in link_data.items() } for external_link_config, external_link_data in external_links: data[external_link_config.table_name] = { bug_id: list(values) for bug_id, values in external_link_data.items() } with open(path, "w") as f: json.dump(data, f) def load_bugs( bz_client: bugdantic.Bugzilla, load_bug_data_path: Optional[str] ) -> BugsById: if load_bug_data_path is not None: try: logging.info(f"Reading bug data from {load_bug_data_path}") with open(load_bug_data_path) as f: data = json.load(f) return { int(bug_id): Bug.from_json(bug_data) for bug_id, bug_data in data["all_bugs"].items() } except Exception as e: raise BugLoadError(f"Reading bugs from {load_bug_data_path} failed") from e else: try: return fetch_all_bugs(bz_client) except Exception as e: raise BugLoadError( "Fetching bugs from Bugzilla was not completed due to an error, aborting." ) from e def run( bq_client: BigQuery, bq_dataset_id: str, bz_client: bugdantic.Bugzilla, write: bool, include_history: bool, recreate_history: bool, write_bug_data_path: Optional[str], load_bug_data_path: Optional[str], ) -> None: start_time = time.monotonic() all_bugs = load_bugs(bz_client, load_bug_data_path) history_changes = None if include_history: history_updater = BugHistoryUpdater(bq_client, bz_client) try: history_changes = history_updater.run(all_bugs, recreate_history) except Exception as e: logging.error(f"Exception updating history: {e}") raise else: logging.info("Not updating bug history") site_reports, etp_reports, kb_bugs, platform_bugs = group_bugs(all_bugs) # Links between different kinds of bugs bug_links = [ ( BugLinkConfig("breakage_reports", "knowledge_base_bug", "breakage_bug"), get_kb_bug_site_report(all_bugs, kb_bugs, site_reports), ), ( BugLinkConfig("core_bugs", "knowledge_base_bug", "core_bug"), get_kb_bug_core_bugs(all_bugs, kb_bugs, platform_bugs), ), ( BugLinkConfig("etp_breakage_reports", "breakage_bug", "etp_meta_bug"), get_etp_breakage_reports(all_bugs, etp_reports), ), ] # Links between bugs and external data sources external_links = [ (config, config.get_links(all_bugs, kb_bugs)) for config in EXTERNAL_LINK_CONFIGS.values() ] if write_bug_data_path is not None: write_bugs( write_bug_data_path, all_bugs, site_reports, etp_reports, kb_bugs, platform_bugs, bug_links, external_links, ) last_change_time_max = max(bug.last_change_time for bug in all_bugs.values()) # Finally do the actual import importer = BigQueryImporter(bq_client) importer.insert_bugs(all_bugs) if history_changes is not None: importer.insert_history_changes(history_changes, recreate=recreate_history) importer.insert_bug_list("kb_bugs", "number", kb_bugs) for bug_link_config, data in bug_links: importer.insert_bug_links(bug_link_config, data) for external_link_config, links_by_bug in external_links: importer.insert_external_links(external_link_config, links_by_bug) importer.record_import_run( start_time, len(all_bugs), len(history_changes) if history_changes is not None else None, last_change_time_max, ) class BugzillaJob(EtlJob): name = "bugzilla" @classmethod def add_arguments(cls, parser: argparse.ArgumentParser) -> None: group = parser.add_argument_group( title="Bugzilla", description="Bugzilla import arguments" ) group.add_argument( "--bugzilla-api-key", help="Bugzilla API key", default=os.environ.get("BUGZILLA_API_KEY"), ) group.add_argument( "--bugzilla-no-history", dest="bugzilla_include_history", action="store_false", default=True, help="Don't read or update bug history", ) group.add_argument( "--bugzilla-recreate-history", action="store_true", help="Re-read bug history from scratch", ) group.add_argument( "--bugzilla-write-bug-data", action="store", help="Path to write bug data as a JSON file", ) group.add_argument( "--bugzilla-load-bug-data", action="store", help="Path to JSON file to load bug data from", ) def default_dataset(self, args: argparse.Namespace) -> str: return args.bq_kb_dataset def main(self, bq_client: BigQuery, args: argparse.Namespace) -> None: bz_config = bugdantic.BugzillaConfig( "https://bugzilla.mozilla.org", args.bugzilla_api_key, allow_writes=args.write, ) bz_client = bugdantic.Bugzilla(bz_config) run( bq_client, args.bq_kb_dataset, bz_client, args.write, args.bugzilla_include_history, args.bugzilla_recreate_history, args.bugzilla_write_bug_data, args.bugzilla_load_bug_data, )