bugbot/bug/analyzer.py (139 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this file, # You can obtain one at http://mozilla.org/MPL/2.0/. from functools import cached_property from typing import Any, Iterable, NamedTuple from libmozdata import versions as lmdversions from libmozdata.bugzilla import Bugzilla from bugbot import utils from bugbot.components import ComponentName class VersionStatus(NamedTuple): """A representation of a version status flag""" channel: str version: int status: str @property def flag(self) -> str: return utils.get_flag(self.version, "status", self.channel) class BugAnalyzer: """A class to analyze a bug""" def __init__(self, bug: dict, store: "BugsStore"): """Constructor Args: bug: The bug to analyze store: The store of bugs """ self._bug = bug self._store = store @property def id(self) -> int: """The bug id.""" return self._bug["id"] @property def component(self) -> ComponentName: """The component that the bug is in.""" return ComponentName(self._bug["product"], self._bug["component"]) @property def is_security(self) -> bool: """Whether the bug is a security bug.""" return any("core-security" in group for group in self._bug["groups"]) @property def regressed_by_bugs(self) -> list["BugAnalyzer"]: """The bugs that regressed the bug.""" return [ self._store.get_bug_by_id(bug_id) for bug_id in self._bug["regressed_by"] ] @property def oldest_fixed_firefox_version(self) -> int | None: """The oldest version of Firefox that was fixed by this bug.""" fixed_versions = sorted( int(key[len("cf_status_firefox") :]) for key, value in self._bug.items() if key.startswith("cf_status_firefox") and "esr" not in key and value in ("fixed", "verified") ) if not fixed_versions: return None return fixed_versions[0] @property def latest_firefox_version_status(self) -> str | None: """The version status for the latest version of Firefox. The latest version is the highest version number that has a status flag set (not `---`). """ versions_status = sorted( (int(key[len("cf_status_firefox") :]), value) for key, value in self._bug.items() if value != "---" and key.startswith("cf_status_firefox") and "esr" not in key ) if not versions_status: return None return versions_status[-1][1] def get_field(self, field: str) -> Any: """Get a field value from the bug. Args: field: The field name. Returns: The field value. If the field is not found, `None` is returned. """ return self._bug.get(field) def detect_version_status_updates(self) -> list[VersionStatus]: """Detect the status for the version flags that should be updated. The status of the version flags is determined by the status of the regressor bug. Returns: A list of `VersionStatus` objects. """ if len(self._bug["regressed_by"]) > 1: # Currently only bugs with one regressor are supported return [] regressor_bug = self.regressed_by_bugs[0] regressed_version = regressor_bug.oldest_fixed_firefox_version if not regressed_version: return [] fixed_version = self.oldest_fixed_firefox_version # If the latest status flag is wontfix or fix-optional, we ignore # setting flags with the status "affected" to newer versions. is_latest_wontfix = self.latest_firefox_version_status in ( "wontfix", "fix-optional", ) flag_updates = [] for flag, channel, version in self._store.current_version_flags: if flag not in self._bug and channel == "esr": # It is okay if an ESR flag is absent (we try two, the current # and the previous). However, the absence of other flags is a # sign of something wrong. continue if self._bug[flag] != "---": # We don't override existing flags # XXX maybe check for consistency? continue if fixed_version and fixed_version <= version: # Bug was fixed in an earlier version, don't set the flag continue if ( version >= regressed_version # ESR: If the regressor was uplifted, so the regression affects # this version. or regressor_bug.get_field(flag) in ("fixed", "verified") ): if is_latest_wontfix: continue flag_updates.append(VersionStatus(channel, version, "affected")) else: flag_updates.append(VersionStatus(channel, version, "unaffected")) return flag_updates class BugNotInStoreError(LookupError): """The bug was not found the bugs store.""" class BugsStore: """A class to retrieve bugs.""" def __init__( self, bugs: Iterable[dict] = (), versions_map: dict[str, int] | None = None ): self.bugs = {bug["id"]: BugAnalyzer(bug, self) for bug in bugs} self.versions_map = versions_map def get_bug_by_id(self, bug_id: int) -> BugAnalyzer: """Get a bug by its id. Args: bug_id: The id of the bug to retrieve. Returns: A `BugAnalyzer` object representing the bug. Raises: BugNotFoundError: The bug was not found in the store. """ try: return self.bugs[bug_id] except KeyError as error: raise BugNotInStoreError(f"Bug {bug_id} is not the bugs store") from error def fetch_regressors(self, include_fields: list[str] | None = None): """Fetches the regressors for all the bugs in the store. Args: include_fields: The fields to include when fetching the bugs. """ bug_ids = ( bug_id for bug in self.bugs.values() if bug.get_field("regressed_by") for bug_id in bug.get_field("regressed_by") ) self.fetch_bugs(bug_ids, include_fields) def fetch_bugs( self, bug_ids: Iterable[int], include_fields: list[str] | None = None ): """Fetches the bugs from Bugzilla. Args: bug_ids: The ids of the bugs to fetch. include_fields: The fields to include when fetching the bugs. """ bug_ids = { bug_id for bug_id in bug_ids # TODO: We only fetch bugs that aren't already in the store. # However, the new fetch request might be specifying fields that # aren't in the existing bug. We need at some point to handle such # cases (currently, we do not have this requirement). if bug_id not in self.bugs } if not bug_ids: return def bug_handler(bug): self.bugs[bug["id"]] = BugAnalyzer(bug, self) Bugzilla(bug_ids, bughandler=bug_handler, include_fields=include_fields).wait() @cached_property def current_version_flags(self) -> list[tuple[str, str, int]]: """The current version flags.""" active_versions = [] channel_version_map = ( self.versions_map if self.versions_map else lmdversions.get(base=True) ) for channel in ("release", "beta", "nightly"): version = int(channel_version_map[channel]) flag = utils.get_flag(version, "status", channel) active_versions.append((flag, channel, version)) esr_versions = { channel_version_map["esr"], channel_version_map["esr_previous"], } for version in esr_versions: channel = "esr" flag = utils.get_flag(version, "status", channel) active_versions.append((flag, channel, version)) return active_versions