jobs/webcompat-kb/webcompat_kb/metric_changes.py (457 lines of code) (raw):

import argparse import logging import re from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timezone from typing import Iterator, Mapping, Optional, Sequence, cast from google.cloud import bigquery from .base import EtlJob from .bqhelpers import BigQuery, Json from .bugzilla import parse_user_story FIXED_STATES = {"RESOLVED", "VERIFIED"} @dataclass class BugFieldChange: field_name: str added: str removed: str @dataclass class BugChange: who: str change_time: datetime changes: list[BugFieldChange] @dataclass class BugData: number: int status: str resolution: str product: str component: str creator: str creation_time: datetime resolved_time: datetime keywords: Sequence[str] url: str user_story: str @dataclass class BugState: status: str product: str component: str keywords: list[str] url: str user_story: str change_idx: Optional[int] @dataclass class ScoreChange: who: str change_time: datetime score_delta: float reasons: list[str] @dataclass(frozen=True) class ChangeKey: who: str change_time: datetime def get_last_recorded_date(client: BigQuery) -> datetime: query = """ SELECT MAX(change_time) as change_time FROM webcompat_topline_metric_changes""" result = list(client.query(query)) if result: return result[0]["change_time"] return datetime(2024, 1, 1, tzinfo=timezone.utc) def get_bug_changes( client: BigQuery, last_change_time: datetime ) -> Mapping[int, list[BugChange]]: rv: dict[int, list[BugChange]] = {} query = """ SELECT number, who, change_time, changes FROM bugs_history WHERE change_time > @last_change_time ORDER BY change_time ASC """ query_parameters = [ bigquery.ScalarQueryParameter("last_change_time", "TIMESTAMP", last_change_time) ] bug_changes = client.query(query, parameters=query_parameters) for row in bug_changes: bug_id = row.number if bug_id not in rv: rv[bug_id] = [] changes = [ BugFieldChange(change["field_name"], change["added"], change["removed"]) for change in row.changes ] rv[bug_id].append(BugChange(row.who, row.change_time, changes)) logging.info(f"Got {bug_changes.num_results} changes for {len(rv)} bugs") return rv def get_recorded_changes( client: BigQuery, bugs: Iterator[int] ) -> Mapping[int, set[ChangeKey]]: query = """ SELECT number, who, change_time FROM webcompat_topline_metric_changes WHERE number IN UNNEST(@bugs) """ rv = defaultdict(set) query_parameters = [bigquery.ArrayQueryParameter("bugs", "INT64", list(bugs))] for row in client.query(query, parameters=query_parameters): rv[row.number].add(ChangeKey(row.who, row.change_time)) return rv def get_bugs( client: BigQuery, last_change_time: datetime, bugs: Iterator[int], ) -> Mapping[int, BugData]: rv: dict[int, BugData] = {} query = """ SELECT number, status, resolution, product, component, creator, creation_time, resolved_time, keywords, url, user_story_raw FROM bugzilla_bugs WHERE number IN UNNEST(@bugs) OR creation_time > @last_change_time """ query_parameters = [ bigquery.ScalarQueryParameter( "last_change_time", "TIMESTAMP", last_change_time ), bigquery.ArrayQueryParameter("bugs", "INT64", list(bugs)), ] job = client.query(query, parameters=query_parameters) for row in job: rv[row.number] = BugData( row.number, row.status, row.resolution, row.product, row.component, row.creator, row.creation_time, row.resolved_time, row.keywords, row.url, row.user_story_raw, ) logging.info(f"Processing a total of {len(rv)} bugs, including newly opened bugs") return rv def is_webcompat_bug(product: str, component: str, keywords: Sequence[str]) -> bool: return (product == "Web Compatibility" and component == "Site Reports") or ( product != "Web Compatibility" and "webcompat:site-report" in keywords ) header_pattern = re.compile(r"^@@ -(\d+),?(\d+)? \+(\d+),?(\d+)? @@$") def reverse_apply_diff(input_str: str, diff: str) -> str: """Apply a diff in reverse to get the original string""" input_lines = input_str.splitlines(True) input_idx = 0 diff_lines = diff.splitlines(True) diff_idx = 0 output_lines = [] while diff_idx < len(diff_lines): m = header_pattern.match(diff_lines[diff_idx]) if m is None: raise ValueError(f"Bad user story diff (missing header line):\n{diff}") start_line_number = int(m.group(3)) end_line_number = int(m.group(4)) if m.group(4) else None start_idx = max(start_line_number - 1, 0) if start_idx < input_idx or ( end_line_number is not None and end_line_number > len(input_lines) ): raise ValueError(f"Bad user story diff (index out of bounds):\n{diff}") output_lines.extend(input_lines[input_idx:start_idx]) diff_idx += 1 input_idx = start_idx while diff_idx < len(diff_lines) and diff_lines[diff_idx][0] != "@": change_char = diff_lines[diff_idx][0] data = diff_lines[diff_idx][1:] if change_char == "+": input_idx += 1 elif change_char == " ": if input_lines[input_idx].strip() != data.strip(): raise ValueError( f"Bad user story diff (patch doesn't match):\n{diff}\nInput line {input_idx} expected {data} got {input_lines[input_idx]}" ) output_lines.append(data) input_idx += 1 else: output_lines.append(data) diff_idx += 1 output_lines.extend(input_lines[input_idx:]) return "".join(output_lines) def bugs_historic_states( bug_data: Mapping[int, BugData], changes_by_bug: Mapping[int, list[BugChange]], ) -> Mapping[int, list[BugState]]: """Create a per bug list of historic states of that bug The first item in the list is the current state, subsequent items are prior states, in chronological order.""" rv: dict[int, list[BugState]] = {} for bug_id, bug in bug_data.items(): # Initial state corrsponding to what the bug looks like now states = [ BugState( bug.status, bug.product, bug.component, list(bug.keywords), bug.url, bug.user_story, change_idx=None, ) ] bug_changes = changes_by_bug.get(bug_id, []) prev_changes = None for count, change in enumerate(reversed(bug_changes)): index = len(bug_changes) - count - 1 if prev_changes is not None and prev_changes == change.changes: # Sometimes we seem to get duplicate changes we should skip continue current = states[-1] current.change_idx = index # Duplicate the current state prev = BugState( current.status, current.product, current.component, current.keywords[:], current.url, current.user_story, change_idx=None, ) # Apply the delta from current to previous state for field_change in change.changes: if field_change.field_name == "keywords": for keyword in field_change.added.split(", "): if keyword: try: prev.keywords.remove(keyword) except ValueError: # Occasionally keywords change case for prev_keyword in prev.keywords: if prev_keyword.lower() == keyword.lower(): prev.keywords.remove(prev_keyword) logging.warning( f"Didn't find keyword {keyword} using {prev_keyword}" ) break else: raise for keyword in field_change.removed.split(", "): if keyword: prev.keywords.append(keyword) elif field_change.field_name in { "status", "product", "component", "url", }: assert getattr(prev, field_change.field_name) == field_change.added setattr(prev, field_change.field_name, field_change.removed) elif field_change.field_name == "cf_user_story": prev.user_story = reverse_apply_diff( prev.user_story, field_change.added ) else: continue prev_changes = change.changes states.append(prev) rv[bug_id] = states return rv def get_current_scores(client: BigQuery) -> Mapping[int, float]: rv: dict[int, float] = {} query = "SELECT number, IFNULL(triage_score, 0) as score from scored_site_reports" for row in client.query(query): rv[row.number] = row.score return rv def compute_historic_scores( client: BigQuery, historic_states: Mapping[int, list[BugState]], current_scores: Mapping[int, float], ) -> Mapping[int, list[float]]: """Compute the webcompat scores corresponding to different states of bugs.""" rv: dict[int, list[float]] = {} schema = [ bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("index", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("keywords", "STRING", mode="REPEATED"), bigquery.SchemaField("url", "STRING", mode="REQUIRED"), bigquery.SchemaField("user_story", "JSON", mode="REQUIRED"), ] rows: list[Mapping[str, Json]] = [] for bug_id, states in historic_states.items(): rv[bug_id] = [0] * len(states) for i, state in enumerate(states): is_open = state.status not in FIXED_STATES is_webcompat = ( state.product == "Web Compatibility" and state.component == "Site Reports" ) or ( state.product != "Web Compatibility" and "webcompat:site-report" in state.keywords ) if is_open and is_webcompat: rows.append( { "number": bug_id, "index": i, "keywords": cast(Sequence[str], state.keywords), "url": state.url, "user_story": parse_user_story(state.user_story), } ) with client.temporary_table(schema, rows) as tmp_table: score_query = f""" DECLARE crux_yyyymm INT64 DEFAULT 202409; SELECT number, index, url, keywords, user_story, `moz-fx-dev-dschubert-wckb.webcompat_knowledge_base.WEBCOMPAT_METRIC_SCORE_NO_SITE_RANK`(keywords, user_story) * `moz-fx-dev-dschubert-wckb.webcompat_knowledge_base.WEBCOMPAT_METRIC_SCORE_SITE_RANK_MODIFER`(url, crux_yyyymm) as score FROM `{tmp_table.name}` """ bugs_with_webcompat_states = set() scores = tmp_table.query(score_query) for row in scores: bugs_with_webcompat_states.add(row.number) logging.debug( f"Bug {row.number}: {row.url} {row.keywords}, {repr(row.user_story)} SCORE: {row.score}" ) rv[row.number][row.index] = row.score for bug_id, computed_scores in rv.items(): current_score = float(current_scores.get(bug_id, 0)) if computed_scores[0] != current_score and states[0].status not in FIXED_STATES: history_logging = "\n".join( f" {score}: {state}" for score, state in zip(computed_scores, historic_states[bug_id]) ) logging.warning(f"""Bug {bug_id}, current score is {current_score} but computed {computed_scores[0]} STATES: {history_logging} """) logging.info( f"Got {scores.num_results} historic scores for {len(bugs_with_webcompat_states)} bugs" ) return rv def get_change_reasons(changes: Sequence[BugFieldChange]) -> list[str]: reasons = set() for change in changes: if change.field_name == "url": reasons.add("url-updated") elif change.field_name == "cf_user_story": reasons.add("triage") elif change.field_name == "keywords": if "webcompat:sitepatch-applied" in change.added: reasons.add("intervention-added") elif "webcompat:sitepatch-applied" in change.removed: reasons.add("intervention-removed") if "webcompat:site-report" in change.added: reasons.add("site-report-added") elif "webcompat:site-report" in change.removed: reasons.add("site-report-removed") elif change.field_name == "status": if change.added in FIXED_STATES and change.removed not in FIXED_STATES: reasons.add("resolved") elif change.removed in FIXED_STATES and change.added not in FIXED_STATES: reasons.add("reopened") rv = list(reasons) rv.sort() return rv def compute_score_changes( changes_by_bug: Mapping[int, list[BugChange]], bug_data: Mapping[int, BugData], recorded_changes: Mapping[int, set[ChangeKey]], historic_states: Mapping[int, list[BugState]], historic_scores: Mapping[int, list[float]], last_change_time: datetime, ) -> Mapping[int, list[ScoreChange]]: rv: dict[int, list[ScoreChange]] = {} for bug_id, states in historic_states.items(): rv[bug_id] = [] scores = historic_scores[bug_id] changes = changes_by_bug.get(bug_id) bug = bug_data[bug_id] newly_created = bug.creation_time > last_change_time if newly_created: assert bug_id not in recorded_changes assert len(states) == len(scores) prev_score = 0.0 # Iterate through states and scores from oldest to newest for state, score in zip(reversed(states), reversed(scores)): if state.change_idx is None: # This happens on the first iteration when the state represents # the state before any of the current changes were applied. if newly_created: score_change = ScoreChange( who=bug.creator, change_time=bug.creation_time, score_delta=score, reasons=["created"], ) else: score_change = None else: assert changes is not None score_delta = score - prev_score change = changes[state.change_idx] reasons = get_change_reasons(change.changes) if not reasons and score_delta > 0: logging.warning( f"No change reason for {bug_id} with change {change.changes}" ) score_change = ScoreChange( who=change.who, change_time=change.change_time, score_delta=score_delta, reasons=reasons, ) prev_score = score if score_change is not None and score_change.score_delta != 0: change_key = ChangeKey(score_change.who, score_change.change_time) if change_key in recorded_changes[bug_id]: logging.warning( f"Already recorded a change for bug {bug_id} from {change_key.who} at {change_key.change_time}, skipping" ) else: rv[bug_id].append(score_change) if len(rv[bug_id]) == 0 and any( is_webcompat_bug(state.product, state.component, state.keywords) for state in states ): logging.debug(f"""No score changes for WebCompat bug {bug_id}: STATES: {states} CHANGES: {changes} SCORES: {scores} """) changed_bugs = {key: len(value) for key, value in rv.items() if len(value)} logging.info( f"Got {sum(changed_bugs.values())} score changes in {len(changed_bugs)} bugs" ) return rv def insert_score_changes( client: BigQuery, score_changes: Mapping[int, Sequence[ScoreChange]], ) -> None: changes_table_name = "webcompat_topline_metric_changes" schema = [ bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("who", "STRING", mode="REQUIRED"), bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), bigquery.SchemaField("score_delta", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("reasons", "STRING", mode="REPEATED"), ] rows: list[dict[str, Json]] = [] for bug_id, changes in score_changes.items(): for change in changes: rows.append( { "number": bug_id, "who": change.who, "change_time": change.change_time.isoformat(), "score_delta": change.score_delta, "reasons": change.reasons, } ) changes_table = client.ensure_table( changes_table_name, schema=schema, recreate=False ) client.write_table(changes_table, schema, rows, overwrite=False) def update_metric_changes(client: BigQuery, recreate: bool) -> None: schema = [ bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("who", "STRING", mode="REQUIRED"), bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), bigquery.SchemaField("score_delta", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("reasons", "STRING", mode="REPEATED"), ] client.ensure_table("webcompat_topline_metric_changes", schema, recreate) last_recorded_date = get_last_recorded_date(client) logging.info(f"Last change time {last_recorded_date}") changes_by_bug = get_bug_changes(client, last_recorded_date) if not changes_by_bug: return current_bug_data = get_bugs(client, last_recorded_date, iter(changes_by_bug.keys())) recorded_changes = get_recorded_changes(client, iter(changes_by_bug.keys())) historic_states = bugs_historic_states(current_bug_data, changes_by_bug) current_scores = get_current_scores(client) historic_scores = compute_historic_scores(client, historic_states, current_scores) score_changes = compute_score_changes( changes_by_bug, current_bug_data, recorded_changes, historic_states, historic_scores, last_recorded_date, ) insert_score_changes(client, score_changes) class MetricChangesJob(EtlJob): name = "metric_changes" def default_dataset(self, args: argparse.Namespace) -> str: return args.bq_kb_dataset @classmethod def add_arguments(cls, parser: argparse.ArgumentParser) -> None: group = parser.add_argument_group( title="Metric Changes", description="Metric changes arguments" ) group.add_argument( "--recreate-metric-changes", action="store_true", help="Delete and recreate changes table from scratch", ) def main(self, client: BigQuery, args: argparse.Namespace) -> None: update_metric_changes(client, args.recreate_metric_changes)