sync/bug.py (474 lines of code) (raw):

from __future__ import annotations import base64 import re import sys import traceback import urllib.parse import bugsy import newrelic from . import log from .env import Environment from typing import Any from bugsy import Bug env = Environment() logger = log.get_logger(__name__) max_comment_length = 65535 # Hack because bugsy has an incomplete list of statuses if "REOPENED" not in bugsy.bug.VALID_STATUS: bugsy.bug.VALID_STATUS.append("REOPENED") # Trying to send this field makes bug creation fail if "cc_detail" in bugsy.bug.ARRAY_TYPES: bugsy.bug.ARRAY_TYPES.remove("cc_detail") def bz_url_from_api_url(api_url): if api_url is None: return None parts = urllib.parse.urlparse(api_url) bz_url = (parts.scheme, parts.netloc, "", "", "", "") return urllib.parse.urlunparse(bz_url) def bug_number_from_url(url: str) -> str | None: if url is None: return None bugs = urllib.parse.parse_qs(urllib.parse.urlsplit(url).query).get("id") if bugs: return bugs[0] return None status_re = re.compile(r"\[wptsync ([^\[ ]+)(?: ([^\[ ]+))?\]") def get_sync_data(whiteboard: str) -> tuple[str | None, str | None]: matches = status_re.findall(whiteboard) if matches: subtype, status = matches[0] if not status: status = None return subtype, status return None, None def set_sync_data(whiteboard: str, subtype: str | None, status: str | None) -> str: if subtype is None: raise ValueError if status: text = f"[wptsync {subtype} {status}]" else: text = "[wptsync %s]" % subtype new = status_re.sub(text, whiteboard) separator = ', ' if len(whiteboard) else '' if new == whiteboard: new = whiteboard + separator + text return new def check_valid_comment(text): if len(text) > max_comment_length: # The maximum comment length is in "characters", not bytes text = text[:max_comment_length - 3] + "[\u2026]" logger.error("Truncating comment that exceeds maximum length") return text class Bugzilla: bug_cache: dict[int, Bug] = {} def __init__(self, config): self.api_url = config["bugzilla"]["url"] self.bz_url = bz_url_from_api_url(self.api_url) self.bugzilla = bugsy.Bugsy(bugzilla_url=self.api_url, api_key=config["bugzilla"]["apikey"]) if "flags" not in self.bugzilla.DEFAULT_SEARCH: self.bugzilla.DEFAULT_SEARCH += ["flags"] def bug_ctx(self, bug_id: int) -> BugContext: return BugContext(self, bug_id) def bugzilla_url(self, bug_id: int) -> str: return f"{self.bz_url}/show_bug.cgi?id={bug_id}" def id_from_url(self, url, bz_url=None): if bz_url is None: bz_url = self.bz_url if not url.startswith(bz_url): return None parts = urllib.parse.urlsplit(url) query = urllib.parse.parse_qs(parts.query) if "id" not in query or len(query["id"]) != 1: return None return query["id"][0] def _get_bug(self, bug_id: int, force_update: bool = False) -> Bug | None: if bug_id not in self.bug_cache or force_update: try: bug = self.bugzilla.get(bug_id) except bugsy.BugsyException: logger.error("Failed to retrieve bug with id %s" % bug_id) return None except Exception as e: logger.error(f"Failed to retrieve bug with id {bug_id}: {e}") newrelic.agent.record_exception() return None self.bug_cache[bug_id] = bug return self.bug_cache[bug_id] def comment(self, bug_id: int, comment: str, **kwargs: Any ) -> None: bug = self._get_bug(bug_id) if bug is None: logger.error(f"Failed to find bug {bug_id} to add comment:\n{comment}") return body = { "comment": check_valid_comment(comment) } body.update(kwargs) self.bugzilla.request(f'bug/{bug.id}/comment', method='POST', json=body) def new(self, summary: str, comment: str, product: str, component: str, whiteboard: str | None = None, priority: str | None = None, url: str | None = None, bug_type: str = "task", assign_to_sync: bool = True ): # type (...) -> int bug = bugsy.Bug(self.bugzilla, type=bug_type, summary=summary, product=product, component=component) if assign_to_sync: # Self-assign bugs by default to get them off triage radars bz_username = env.config["bugzilla"]["username"] if bz_username: bug._bug["assigned_to"] = bz_username bug.add_comment(comment) if priority is not None: if priority not in ("P1", "P2", "P3", "P4", "P5"): raise ValueError("Invalid bug priority %s" % priority) bug._bug["priority"] = priority if whiteboard: bug._bug["whiteboard"] = whiteboard if url: bug._bug["url"] = url self.bugzilla.put(bug) self.bug_cache[bug.id] = bug return bug.id def set_component(self, bug: Bug | int, product: str | None = None, component: str | None = None ): # type (...) -> None if not isinstance(bug, bugsy.Bug): bug = self._get_bug(bug) if bug is None: logger.error("Failed to find bug %s to set component: %s::%s" % (bug, product, component)) return if product is not None: bug.product = product if component is not None: bug.component = component if product is not None or component is not None: try: self.bugzilla.put(bug) except bugsy.BugsyException: logger.error(f"Failed to set component {bug.product} :: {bug.component}") def set_whiteboard(self, bug: Bug | int, whiteboard: str ): if not isinstance(bug, bugsy.Bug): bug = self._get_bug(bug) if not bug: return None bug._bug["whiteboard"] = whiteboard try: self.bugzilla.put(bug) except bugsy.errors.BugsyException: logger.warning(traceback.format_exc()) except Exception as e: logger.warning(f"Problem setting Bug {bug.id} Whiteboard: {e}") newrelic.agent.record_exception() def get_whiteboard(self, bug: Bug | int) -> str | None: if not isinstance(bug, bugsy.Bug): bug = self._get_bug(bug, True) if not bug: return None return bug._bug.get("whiteboard", "") def get_status(self, bug: Bug | int) -> tuple[str, str] | None: if not isinstance(bug, bugsy.Bug): bug = self._get_bug(bug) if not bug: return None return (bug.status, bug.resolution) def set_status(self, bug: Bug | int, status: str, resolution: str | None = None ) -> None: if not isinstance(bug, bugsy.Bug): bug = self._get_bug(bug) if not bug: return None bug.status = status if resolution is not None: assert status == "RESOLVED" bug.resolution = resolution self.bugzilla.put(bug) def get_dupe(self, bug_id: Bug | int) -> int | None: if not isinstance(bug_id, bugsy.Bug): bug = self._get_bug(bug_id) else: bug = bug_id if not bug: return None return bug._bug.get("dupe_of") class BugContext: def __init__(self, bugzilla: Bugzilla, bug_id: int ): self.bugzilla = bugzilla self.bug_id = bug_id def __enter__(self) -> BugContext: self.bug: Bug = self.bugzilla._get_bug(self.bug_id) self._comments = None self.comment: dict[str, Any] | None = None self.attachments: list[dict[str, Any]] = [] self.depends: dict[str, list[int]] = {"add": [], "remove": []} self.blocks: dict[str, list[int]] = {"add": [], "remove": []} self.dirty: set[str] = set() return self def __exit__(self, *args: Any, **kwargs: Any) -> None: if self.dirty: # Apparently we can't add comments atomically with other changes if "comment" in self.dirty: self.dirty.remove("comment") if self.comment is not None: self.bugzilla.comment(self.bug_id, **self.comment) if "attachment" in self.dirty: for attachment in self.attachments: self.bugzilla.bugzilla.request('bug/{}/attachment'.format(self.bug._bug['id']), method='POST', json=attachment) if "depends" in self.dirty: self.bug.depends_on.extend(self.depends["add"]) for item in self.depends["remove"]: if item in self.bug.depends_on: self.bug.depends_on.remove(item) if "blocks" in self.dirty: self.bug.blocks.extend(self.blocks["add"]) for item in self.blocks["remove"]: if item in self.bug.blocks: self.bug.blocks.remove(item) if self.dirty: self.bugzilla.bugzilla.put(self.bug) # Poison the object so it can't be used outside a context manager self.bug = None def __setitem__(self, name: str, value: Any ): if name == "comment": return self.add_comment(value) self.bug._bug[name] = value self.dirty.add(name) def add_comment(self, comment: str, check_dupe: bool = True, comment_tags: list[str] | None = None, is_private: bool = False, is_markdown: bool = False ): if self.comment is not None: raise ValueError("Can only set one comment per bug") comment = check_valid_comment(comment) if check_dupe: comments = self.get_comments() for item in comments: if item.text == comment: return False self.comment = {"comment": comment, "is_markdown": is_markdown, "is_private": is_private} if comment_tags is not None: self.comment["comment_tags"] = comment_tags self.dirty.add("comment") return True def get_comments(self): # type () -> List[bugsy.Comment] if self._comments is None: self._comments = self.bug.get_comments() return self._comments def needinfo(self, *requestees: str) -> None: if not requestees: return bug: Bug = self.bugzilla._get_bug(self.bug_id) flags = bug._bug.get("flags", []) existing = {item["requestee"] for item in flags if item["name"] == "needinfo" and item["status"] == "?"} for requestee in requestees: if requestee not in existing: flags.append({ 'name': 'needinfo', 'requestee': requestee, 'status': '?', }) self.bug._bug["flags"] = flags self.dirty.add("flags") def add_attachment(self, data: bytes, file_name: str, summary: str, content_type: str = "text/plain", comment=None, # type Optional[Text] is_patch: bool = False, is_private: bool = False, is_markdown: bool = False, flags: list[str] | None = None ): body: dict[str, Any] = { "data": base64.encodebytes(data).decode("ascii"), "file_name": file_name, "summary": summary, "content_type": content_type } if comment: body["comment"] = comment if is_patch: body["is_patch"] = is_patch if is_private: body["is_private"] = is_private if is_markdown: body["is_markdown"] = is_markdown if flags: body["flags"] = flags self.attachments.append(body) self.dirty.add("attachment") def add_depends(self, bug_id: int) -> None: self.depends["add"].append(bug_id) self.dirty.add("depends") def remove_depends(self, bug_id: int) -> None: self.depends["remove"].append(bug_id) self.dirty.add("depends") def add_blocks(self, bug_id: int) -> None: self.blocks["add"].append(bug_id) self.dirty.add("blocks") def remove_blocks(self, bug_id: int) -> None: self.blocks["remove"].append(bug_id) self.dirty.add("blocks") class MockBugzilla(Bugzilla): def __init__(self, config): self.api_url = config["bugzilla"]["url"] self.bz_url = bz_url_from_api_url(self.api_url) self.output = sys.stdout self.known_bugs = [] self.dupes = {} def _log(self, data: str | bytes) -> None: data = str(data) self.output.write(data) self.output.write("\n") def bug_ctx(self, bug_id: int) -> BugContext: return MockBugContext(self, bug_id) def new(self, summary: str, comment: str, product: str, component: str, whiteboard: str | None = None, priority: str | None = None, url: str | None = None, bug_type: str = "task", assign_to_sync: bool = True, ) -> int: self._log("Creating a bug in component {product} :: {component}\nSummary: {summary}\n" "Comment: {comment}\nWhiteboard: {whiteboard}\nPriority: {priority}\n" "URL: {url}\nType: {bug_type}\nAssign to sync: {assign_to_sync}".format( product=product, component=component, summary=summary, comment=comment, whiteboard=whiteboard, priority=priority, url=url, bug_type=bug_type, assign_to_sync=assign_to_sync)) if self.known_bugs: bug_id = self.known_bugs[-1] + 1 else: bug_id = 100000 self.known_bugs.append(bug_id) return bug_id def comment(self, bug_id: int, comment: str, **kwargs: Any ) -> None: self._log(f"Posting to bug {bug_id}:\n{comment}") def set_component(self, bug_id: int, product: str | None = None, component: str | None = None) -> None: self._log(f"Setting bug {bug_id} product: {product} component: {component}") def set_whiteboard(self, bug_id: int, whiteboard: str) -> None: self._log(f"Setting bug {bug_id} whiteboard: {whiteboard}") def get_whiteboard(self, bug: Bug | int) -> str: return "fake data" def get_status(self, bug: Bug | int) -> tuple[str, str]: return ("NEW", "") def set_status(self, bug: Bug | int, status: str, resolution: str | None = None) -> None: self._log(f"Setting bug {bug} status {status}") def get_dupe(self, bug: Bug | int) -> int | None: return self.dupes.get(bug) class MockBugContext(BugContext): def __init__(self, bugzilla: MockBugzilla, bug_id: int) -> None: self.bugzilla = bugzilla self.bug_id = bug_id def __enter__(self) -> BugContext: self.changes: list[str] = [] self.comment: dict[str, Any] | None = None return self def __exit__(self, *args: Any, **kwargs: Any) -> None: for item in self.changes: self.bugzilla._log("%s\n" % item) # type: ignore def __setitem__(self, name: str, value: str) -> None: self.changes.append("Setting bug {} {} {}".format(self.bug_id, name, value)) def add_comment(self, comment: str, check_dupe: bool = True, comment_tags: list[str] | None = None, is_private: bool = False, is_markdown: bool = False ) -> None: if self.comment is not None: raise ValueError("Can only set one comment per bug") self.comment = {"comment": comment, "is_markdown": is_markdown, "is_private": is_private} if comment_tags is not None: self.comment["comment_tags"] = comment_tags def get_comments(self): # type () -> List[bugsy.Comment] return [] def needinfo(self, *requestees: str) -> None: for requestee in requestees: self.changes.append(f"Setting bug {self.bug_id} needinfo {requestee}") def add_attachment(self, data: bytes, file_name: str, summary: str, content_type: str = "text/plain", comment=None, # type Optional[Text] is_patch: bool = False, is_private: bool = False, is_markdown: bool = False, flags: list[str] | None = None ): body: dict[str, Any] = { "data": base64.encodebytes(data), "file_name": file_name, "summary": summary, "content_type": content_type } if comment: body["comment"] = comment if is_patch: body["is_patch"] = is_patch if is_private: body["is_private"] = is_private if is_markdown: body["is_markdown"] = is_markdown if flags: body["flags"] = flags self.changes.append("Setting bug %s add_attachment: %r" % (self.bug_id, body)) def add_depends(self, bug_id: int) -> None: self.changes.append(f"Setting bug {self.bug_id} add_depends {bug_id}") def remove_depends(self, bug_id: int) -> None: self.changes.append(f"Setting bug {self.bug_id} remove_depends {bug_id}") def add_blocks(self, bug_id: int) -> None: self.changes.append(f"Setting bug {self.bug_id} add_blocks {bug_id}") def remove_blocks(self, bug_id: int) -> None: self.changes.append(f"Setting bug {self.bug_id} remove_blocks {bug_id}")