bugbot/utils.py (506 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this file, # You can obtain one at http://mozilla.org/MPL/2.0/. import copy import datetime import json import os import random import re from typing import Iterable, Union from urllib.parse import quote_plus, urlencode import dateutil.parser import humanize import pytz import requests from dateutil.relativedelta import relativedelta from libmozdata import utils as lmdutils from libmozdata import versions as lmdversions from libmozdata.bugzilla import Bugzilla, BugzillaShorten from libmozdata.fx_trains import FirefoxTrains from libmozdata.hgmozilla import Mercurial from requests.exceptions import HTTPError from bugbot.constants import ( BOT_MAIN_ACCOUNT, HIGH_PRIORITY, HIGH_SEVERITY, OLD_SEVERITY_MAP, ) _CONFIG = None _CYCLE_SPAN = None _MERGE_DAY = None _TRIAGE_OWNERS = None _DEFAULT_ASSIGNEES = None _CURRENT_VERSIONS = None _CONFIG_PATH = "./configs/" BZ_FIELD_PAT = re.compile(r"^[fovj]([0-9]+)$") PAR_PAT = re.compile(r"\([^\)]*\)") BRA_PAT = re.compile(r"\[[^\]]*\]") DIA_PAT = re.compile("<[^>]*>") UTC_PAT = re.compile(r"UTC\+[^ \t]*") COL_PAT = re.compile(":[^:]*") BACKOUT_PAT = re.compile("^back(s|(ed))?[ \t]*out", re.I) BUG_PAT = re.compile(r"^bug[s]?[ \t]*([0-9]+)", re.I) WHITEBOARD_ACCESS_PAT = re.compile(r"\[access\-s\d\]") MAX_URL_LENGTH = 512 def get_weekdays(): return {"Mon": 0, "Tue": 1, "Wed": 2, "Thu": 3, "Fri": 4, "Sat": 5, "Sun": 6} def _get_config(): global _CONFIG if _CONFIG is None: try: with open(_CONFIG_PATH + "/rules.json", "r") as In: _CONFIG = json.load(In) except IOError: _CONFIG = {} return _CONFIG def get_config(name, entry, default=None): conf = _get_config() if name not in conf: name = "common" rule_conf = conf[name] if entry in rule_conf: return rule_conf[entry] rule_conf = conf["common"] return rule_conf.get(entry, default) def get_receivers(rule_name): receiver_lists = get_config("common", "receiver_list", default={}) receivers = get_config(rule_name, "receivers", []) if isinstance(receivers, str): receivers = receiver_lists[receivers] additional_receivers = get_config(rule_name, "additional_receivers", []) if isinstance(additional_receivers, str): additional_receivers = receiver_lists[additional_receivers] return list(dict.fromkeys([*receivers, *additional_receivers])) def init_random(): now = datetime.datetime.utcnow() now = now.timestamp() random.seed(now) def get_signatures(sgns): if not sgns: return set() res = set() sgns = map(lambda x: x.strip(), sgns.split("[@")) for s in filter(None, sgns): try: i = s.rindex("]") res.add(s[:i].strip()) except ValueError: res.add(s) return res def add_signatures(old, new): added_sgns = "[@ " + "]\n[@ ".join(sorted(new)) + "]" if old: return old + "\n" + added_sgns return added_sgns def get_empty_assignees(params, negation=False): n = get_last_field_num(params) n = int(n) params.update( { "j" + str(n): "OR", "f" + str(n): "OP", "f" + str(n + 1): "assigned_to", "o" + str(n + 1): "equals", "v" + str(n + 1): "nobody@mozilla.org", "f" + str(n + 2): "assigned_to", "o" + str(n + 2): "regexp", "v" + str(n + 2): r"^.*\.bugs$", "f" + str(n + 3): "assigned_to", "o" + str(n + 3): "isempty", "f" + str(n + 4): "CP", } ) if negation: params["n" + str(n)] = 1 return params def is_no_assignee(mail): return mail == "nobody@mozilla.org" or mail.endswith(".bugs") or mail == "" def get_login_info(): with open(_CONFIG_PATH + "config.json", "r") as In: return json.load(In) def get_private(): with open(_CONFIG_PATH + "config.json", "r") as In: return json.load(In)["private"] def get_gcp_service_account_info() -> dict: """Get the GCP service account info from the downloaded key file.""" with open( _CONFIG_PATH + "gcp_service_account.json", "r", encoding="utf-8" ) as json_file: return json.load(json_file) def plural(sword, data, pword=""): if isinstance(data, int): p = data != 1 else: p = len(data) != 1 if not p: return sword if pword: return pword return sword + "s" def english_list(items): assert len(items) > 0 if len(items) == 1: return items[0] return "{} and {}".format(", ".join(items[:-1]), items[-1]) def shorten_long_bz_url(url): if not url or len(url) <= MAX_URL_LENGTH: return url # the url can be very long and line length are limited in email protocol: # https://datatracker.ietf.org/doc/html/rfc5322#section-2.1.1 # So we need to generate a short URL. def url_handler(u, data): data["url"] = u data = {} try: BugzillaShorten(url, url_data=data, url_handler=url_handler).wait() except HTTPError: # workaround for https://github.com/mozilla/bugbot/issues/1402 return "\n".join( [url[i : i + MAX_URL_LENGTH] for i in range(0, len(url), MAX_URL_LENGTH)] ) return data["url"] def get_cycle_span() -> str: """Return the cycle span in the format YYYYMMDD-YYYYMMDD""" global _CYCLE_SPAN if _CYCLE_SPAN is None: schedule = FirefoxTrains().get_release_schedule("nightly") start = lmdutils.get_date_ymd(schedule["nightly_start"]) end = lmdutils.get_date_ymd(schedule["merge_day"]) now = lmdutils.get_date_ymd("today") assert start <= now <= end _CYCLE_SPAN = start.strftime("%Y%m%d") + "-" + end.strftime("%Y%m%d") return _CYCLE_SPAN def get_next_release_date() -> datetime.datetime: """Return the next release date""" schedule = FirefoxTrains().get_release_schedule("beta") release_date = lmdutils.get_date_ymd(schedule["release"]) release_date = release_date.replace(hour=0, minute=0, second=0, microsecond=0) return release_date def is_merge_day(date: datetime.datetime | None = None) -> bool: """Check if the date is the merge day Args: date: the date to check. If None, the current date is used. Returns: True if the date is the merge day """ if date is None: date = lmdutils.get_date_ymd("today") schedule = FirefoxTrains().get_release_schedule("nightly") last_merge = lmdutils.get_date_ymd(schedule["nightly_start"]) next_merge = lmdutils.get_date_ymd(schedule["merge_day"]) return date in (next_merge, last_merge) def get_report_bugs(channel, op="+"): url = "https://bugzilla.mozilla.org/page.cgi?id=release_tracking_report.html" params = { "q": "approval-mozilla-{}:{}:{}:0:and:".format(channel, op, get_cycle_span()) } # allow_redirects=False avoids to load the data # and we'll just get the redirected url to get all the bug ids we need r = requests.get(url, params=params, allow_redirects=False) # something like https://bugzilla.mozilla.org/buglist.cgi?bug_id=1493711,1502766,1499908 url = r.headers["Location"] return url.split("=")[1].split(",") def get_flag(version, name, channel): if name in ["status", "tracking"]: if channel == "esr": return "cf_{}_firefox_esr{}".format(name, version) return "cf_{}_firefox{}".format(name, version) elif name == "approval": if channel == "esr": return "approval-mozilla-esr{}".format(version) return "approval-mozilla-{}".format(channel) def get_needinfo(bug, days=-1): now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc) for flag in bug.get("flags", []): if flag.get("name", "") == "needinfo" and flag["status"] == "?": date = flag["modification_date"] date = dateutil.parser.parse(date) if (now - date).days >= days: yield flag def get_last_field_num(params): s = set() for k in params.keys(): m = BZ_FIELD_PAT.match(k) if m: s.add(int(m.group(1))) x = max(s) + 1 if s else 1 return str(x) def add_prod_comp_to_query(params, prod_comp): n = int(get_last_field_num(params)) params[f"j{n}"] = "OR" params[f"f{n}"] = "OP" n += 1 for pc in prod_comp: prod, comp = pc.split("::") params[f"j{n}"] = "AND" params[f"f{n}"] = "OP" n += 1 params[f"f{n}"] = "product" params[f"o{n}"] = "equals" params[f"v{n}"] = prod n += 1 params[f"f{n}"] = "component" params[f"o{n}"] = "equals" params[f"v{n}"] = comp n += 1 params[f"f{n}"] = "CP" n += 1 params[f"f{n}"] = "CP" def get_bz_search_url(params): return "https://bugzilla.mozilla.org/buglist.cgi?" + urlencode(params, doseq=True) def has_bot_set_ni(bug): bot = get_config("common", "bot_bz_mail") for flag in get_needinfo(bug): if flag["setter"] in bot: return True return False def get_triage_owners(): global _TRIAGE_OWNERS if _TRIAGE_OWNERS is not None: return _TRIAGE_OWNERS # accessible is the union of: # selectable (all product we can see) # enterable (all product a user can file bugs into). prods = get_config("common", "products") url = "https://bugzilla.mozilla.org/rest/product" params = { "type": "accessible", "include_fields": ["name", "components.name", "components.triage_owner"], "names": prods, } r = requests.get(url, params=params) products = r.json()["products"] _TRIAGE_OWNERS = {} for prod in products: prod_name = prod["name"] for comp in prod["components"]: owner = comp["triage_owner"] if owner and not is_no_assignee(owner): comp_name = comp["name"] pc = f"{prod_name}::{comp_name}" if owner not in _TRIAGE_OWNERS: _TRIAGE_OWNERS[owner] = [pc] else: _TRIAGE_OWNERS[owner].append(pc) return _TRIAGE_OWNERS def get_default_assignees(): global _DEFAULT_ASSIGNEES if _DEFAULT_ASSIGNEES is not None: return _DEFAULT_ASSIGNEES # accessible is the union of: # selectable (all product we can see) # enterable (all product a user can file bugs into). prods = get_config("common", "products") url = "https://bugzilla.mozilla.org/rest/product" params = { "type": "accessible", "include_fields": ["name", "components.name", "components.default_assigned_to"], "names": prods, } r = requests.get(url, params=params) products = r.json()["products"] _DEFAULT_ASSIGNEES = {} for prod in products: prod_name = prod["name"] _DEFAULT_ASSIGNEES[prod_name] = dap = {} for comp in prod["components"]: comp_name = comp["name"] assignee = comp["default_assigned_to"] dap[comp_name] = assignee return _DEFAULT_ASSIGNEES def organize(bugs, columns, key=None): if isinstance(bugs, dict): # we suppose that the values are the bugdata dict bugs = bugs.values() def identity(x): return x def bugid_key(x): return -int(x) lambdas = {"id": bugid_key} def mykey(p): return tuple(lambdas.get(c, identity)(x) for x, c in zip(p, columns)) if len(columns) >= 2: res = [tuple(info[c] for c in columns) for info in bugs] else: c = columns[0] res = [info[c] for info in bugs] return sorted(res, key=mykey if not key else key) def merge_bz_changes(c1, c2): if not c1: return c2 if not c2: return c1 assert set(c1.keys()).isdisjoint( c2.keys() ), "Merge changes with common keys is not a good idea" c = copy.deepcopy(c1) c.update(c2) return c def is_test_file(path): e = os.path.splitext(path)[1][1:].lower() return "test" in path and e not in {"ini", "list", "in", "py", "json", "manifest"} def get_better_name(name): if not name: return "" def repl(m): if m.start(0) == 0: return m.group(0) return "" if name.startswith("Nobody;"): s = "Nobody" else: s = PAR_PAT.sub("", name) s = BRA_PAT.sub("", s) s = DIA_PAT.sub("", s) s = COL_PAT.sub(repl, s) s = UTC_PAT.sub("", s) s = s.strip() if s.startswith(":"): s = s[1:] return s.encode("utf-8").decode("utf-8") def is_backout(json): return json.get("backedoutby", "") != "" or bool(BACKOUT_PAT.search(json["desc"])) def get_pushlog(startdate, enddate, channel="nightly"): """Get the pushlog from hg.mozilla.org""" # Get the pushes where startdate <= pushdate <= enddate # pushlog uses strict inequality, it's why we add +/- 1 second fmt = "%Y-%m-%d %H:%M:%S" startdate -= relativedelta(seconds=1) startdate = startdate.strftime(fmt) enddate += relativedelta(seconds=1) enddate = enddate.strftime(fmt) url = "{}/json-pushes".format(Mercurial.get_repo_url(channel)) r = requests.get( url, params={"startdate": startdate, "enddate": enddate, "version": 2, "full": 1}, ) return r.json() def get_bugs_from_desc(desc): """Get a bug number from the patch description""" return BUG_PAT.findall(desc) def get_bugs_from_pushlog(startdate, enddate, channel="nightly"): pushlog = get_pushlog(startdate, enddate, channel=channel) bugs = set() for push in pushlog["pushes"].values(): for chgset in push["changesets"]: if chgset.get("backedoutby", "") != "": continue desc = chgset["desc"] for bug in get_bugs_from_desc(desc): bugs.add(bug) return bugs def get_checked_versions(): # There are different reasons to not return versions: # i) we're merge day: the versions are changing # ii) not consecutive versions numbers # iii) bugzilla updated nightly version but p-d is not updated if is_merge_day(): return {} versions = lmdversions.get(base=True) versions["central"] = versions["nightly"] v = [versions[k] for k in ["release", "beta", "central"]] versions = {k: str(v) for k, v in versions.items()} if v[0] + 2 == v[1] + 1 == v[2]: nightly_bugzilla = get_nightly_version_from_bz() if v[2] != nightly_bugzilla: from . import logger logger.info("Versions mismatch between Bugzilla and product-details") return {} return versions from . import logger logger.info("Not consecutive versions in product/details") return {} def get_info_from_hg(json): res = {} push = json["pushdate"][0] push = datetime.datetime.utcfromtimestamp(push) push = lmdutils.as_utc(push) res["date"] = lmdutils.get_date_str(push) res["backedout"] = json.get("backedoutby", "") != "" m = BUG_PAT.search(json["desc"]) res["bugid"] = m.group(1) if m else "" return res def bz_ignore_case(s): return "[" + "][".join(c + c.upper() for c in s) + "]" def check_product_component(data, bug): prod = bug["product"] comp = bug["component"] pc = prod + "::" + comp return pc in data or comp in data def get_components(data): res = [] for comp in data: if "::" in comp: _, comp = comp.split("::", 1) res.append(comp) return res def get_products_components(data): prods = set() comps = set() for pc in data: if "::" in pc: p, c = pc.split("::", 1) prods.add(p) else: c = pc comps.add(c) return prods, comps def ireplace(old, repl, text): return re.sub("(?i)" + re.escape(old), lambda m: repl, text) def get_human_lag(date): today = pytz.utc.localize(datetime.datetime.utcnow()) dt = dateutil.parser.parse(date) if isinstance(date, str) else date return humanize.naturaldelta(today - dt) def get_nightly_version_from_bz(): def bug_handler(bug, data): status = "cf_status_firefox" N = len(status) for k in bug.keys(): if k.startswith(status): k = k[N:] if k.isdigit(): data.append(int(k)) data = [] Bugzilla(bugids=["1234567"], bughandler=bug_handler, bugdata=data).get_data().wait() return max(data) def nice_round(val): return int(round(100 * val)) def is_bot_email(email: str) -> bool: """Check if the email is belong to a bot or component-watching account. Args: email: the account login email. """ if email.endswith("@disabled.tld"): return False return email.endswith(".bugs") or email.endswith(".tld") def get_last_no_bot_comment_date(bug: dict) -> str: """Get the create date of the last comment by non bot account. Args: bug: the bug dictionary; it must has the comments list. Returns: If no comments or all comments are posted by bots, the creation date of the bug itself will be returned. """ for comment in reversed(bug["comments"]): if not is_bot_email(comment["creator"]): return comment["creation_time"] return bug["comments"][0]["creation_time"] def get_sort_by_bug_importance_key(bug): """ We need bugs with high severity (S1 or S2) or high priority (P1 or P2) to be first (do not need to be high in both). Next, bugs with higher priority and severity are preferred. Finally, for bugs with the same severity and priority, we favour recently changed or created bugs. """ is_important = bug["priority"] in HIGH_PRIORITY or bug["severity"] in HIGH_SEVERITY priority = bug["priority"] if bug["priority"].startswith("P") else "P10" severity = ( bug["severity"] if bug["severity"].startswith("S") else OLD_SEVERITY_MAP.get(bug["severity"], "S10") ) time_order = ( lmdutils.get_timestamp(bug["last_change_time"]) if "last_change_time" in bug else int(bug["id"]) # Bug ID reflects the creation order ) return ( not is_important, severity, priority, time_order * -1, ) def get_mail_to_ni(bug: dict) -> Union[dict, None]: """Get the person that should be needinfoed about the bug. If the bug is assigned, the assignee will be selected. Otherwise, will fallback to the triage owner. Args: bug: The bug that you need to send a needinfo request about. Returns: A dict with the nicname and the email of the person that should receive the needinfo request. If not available will return None. """ for field in ["assigned_to", "triage_owner"]: person = bug.get(field, "") if not is_no_assignee(person): return {"mail": person, "nickname": bug[f"{field}_detail"]["nick"]} return None def get_name_from_user_detail(detail: dict) -> str: """Get the name of the user from the detail object. Returns: The name of the user or the email as a fallback. """ name = detail["real_name"] if is_no_assignee(detail["email"]): name = "nobody" if name.strip() == "": name = detail["name"] if name.strip() == "": name = detail["email"] return name def is_weekend(date: Union[datetime.datetime, str]) -> bool: """Get if the provided date is a weekend day (Saturday or Sunday)""" parsed_date = lmdutils.get_date_ymd(date) return parsed_date.weekday() >= 5 def get_whiteboard_access_rating(whiteboard: str) -> str: """Get the access rating tag from the whiteboard. Args: whiteboard: a whiteboard string that contains an access rating tag. Returns: An access rating tag. """ access_tags = WHITEBOARD_ACCESS_PAT.findall(whiteboard) assert len(access_tags) == 1, "Should have only one access tag" return access_tags[0] def create_bug(bug_data: dict) -> dict: """Create a new bug. Args: bug_data: The bug data to create. Returns: A dictionary with the bug id of the newly created bug. """ resp = requests.post( url=Bugzilla.API_URL, json=bug_data, headers=Bugzilla([]).get_header(), verify=True, timeout=Bugzilla.TIMEOUT, ) resp.raise_for_status() return resp.json() def is_keywords_removed_by_bugbot(bug: dict, keywords: Iterable) -> bool: """Check if the bug had any of the provided keywords removed by bugbot. Args: bug: The bug to check. keywords: The keywords to check. Returns: True if any of the keywords was removed by bugbot, False otherwise. """ return any( keyword in change["removed"] for entry in bug["history"] if entry["who"] == BOT_MAIN_ACCOUNT for change in entry["changes"] if change["field_name"] == "keywords" for keyword in keywords ) def get_bug_bugdash_url(component, tab_name: str) -> str: """ Generate bugdash URL for a component. Args: component: The name of the targeted component. tab_name: The name of the tab that should be active. Returns: A URL pointing to Bugdash based on the provided component and tab. """ # Bugdash uses a single colon instead of a double colon to prefix the product name. encoded_component = quote_plus(f"{component.product}:{component.name}") return f"https://bugdash.moz.tools/?component={encoded_component}#tab.{tab_name}"