probe_scraper/fog_checks.py (157 lines of code) (raw):
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
This file contains various checks for Firefox on Glean (FOG).
FOG is Glean, yes, but is sufficiently different that it benefits from doing
its own expiry checks. Sending its own emails. Filing its own bugs.
"""
import re
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, TypedDict
from probe_scraper import probe_expiry_alert
from .glean_checks import get_current_metrics_by_repo
from .parsers.repositories import Repository
EXPIRED_METRICS_EMAIL_TEMPLATE = """
Each metric in the following list will soon expire at the end of Firefox {version}.
For your convenience, we've filed bugs to track the work of removing or renewing them:
{expiring_bugs_list}
What to do about this:
1. If the metric is no longer needed, remove it from its `metrics.yaml` file.
2. If the metric is still required, extend its expiration.
If you have any problems, please ask for help on the #glean Matrix channel[1]. We'll give you a hand.
What happens if you don't fix this:
The expiring metric will expire, causing a test failure which
* makes sheriffs unhappy,
* prevents developers from landing code, and
* generally makes for a bad time.
You will continue to get this e-mail as a reminder to clean up.
Your Friendly Neighbourhood Glean Team
[1] https://chat.mozilla.org/#/room/#glean:mozilla.org
This is an automated message sent from probe-scraper. See https://github.com/mozilla/probe-scraper for details.
""" # noqa
###
# Types for Annotations:
###
class Email(TypedDict):
subject: str
message: str
class EmailInfo(TypedDict):
addresses: List[str]
emails: List[Email]
# The full list of all repos that are FOG style. Must:
# * Expire based on Firefox Desktop Nightly Version, and
# * Use Bugzilla for its bug urls
FOG_REPOS: Set[str] = {"firefox-desktop", "gecko"}
# The BMO whiteboard tag to use for auto-filed bugs
BUG_WHITEBOARD_TAG = "[metric-expiry-alert]"
# The BMO Title, templated by version and metric family
BUG_SUMMARY_TEMPLATE = (
"Remove or update metrics expiring at the end of Firefox {version}: {probe}"
)
# BE ALERT: We regex on this template to find existing bugs.
# SEE probe_expiry_alert.find_existing_bugs FOR DETAILS.
# IF YOU MODIFY THIS WITHOUT CARE WE WILL FILE DUPLICATE BUGS.
# Please be kind to your Sheriffs and only modify with care.
BUG_DESCRIPTION_TEMPLATE = """
The following metrics will expire at the end of Firefox Nightly release: [version {version}][1].
```
{probes}
```
{notes}
What to do about this:
1. If one, some, or all of the metrics are no longer needed, please remove them from their `metrics.yaml` definition file.
2. If one, some, or all of the metrics are still required, please submit a patch to extend their expiry.
If you have any problems, please ask for help on the [#glean Matrix room](https://chat.mozilla.org/#/room/#glean:mozilla.org) or the #data-help Slack channel.
We'll give you a hand.
Your Friendly Neighbourhood Glean Team
[1]: https://wiki.mozilla.org/Release_Management/Calendar
---
This bug was auto-filed by [probe-scraper](https://github.com/mozilla/probe-scraper).
""" # noqa
BUG_NUMBER_PATTERN = re.compile(r"\d+")
def get_expiring_metrics(
metrics: Dict[str, Dict], latest_nightly_version: str
) -> Dict[str, Dict]:
"""
Filter the provided dict of metric name to metric info to just the expiring ones.
"""
# We start warning one version ahead.
target_version = int(latest_nightly_version) + 1
expiring_metrics = {}
for metric_name, metric in metrics.items():
if metric["expires"] == "never":
continue
if metric["expires"] == "expired":
# Also include manually-expired ones.
# This is not only technically correct, but makes testing easier.
expiring_metrics[metric_name] = metric
continue
try:
expiry_version = int(metric["expires"])
except ValueError:
# Expires cannot be parsed as a version. Treat as unexpired.
# TODO: Should we send emails for unparseable expiry versions?
continue
if expiry_version == target_version:
expiring_metrics[metric_name] = metric
return expiring_metrics
def bug_number_from_url(url: str) -> Optional[int]:
"""
Given a bug url, get its bug number.
If we can't figure out a reasonable bug number, return None.
"""
if "bugz" not in url:
# Not a bugzilla url. We don't understand you.
print(f"Can't figure out bug number for non-bugzilla url: {url}")
return None
bug = BUG_NUMBER_PATTERN.search(url)
if bug is not None:
try:
bug = int(bug[0])
except Exception:
print(f"Can't figure out bug number for url: {url}")
return None
return bug
def file_bugs(
expiring_metrics: Dict[str, Dict],
latest_nightly_version: str,
bugzilla_api_key: str,
dry_run: bool = True,
) -> Dict[str, List[str]]:
"""
Find existing and file new Bugzilla bugs for expiring metrics.
Needs a network connection.
If `dry_run`, doesn't file any new bugs, returning a fake bug url for all expiring metrics.
"""
next_version = str(int(latest_nightly_version) + 1)
# We try our best to reuse pieces of probe_expiry_alert.
# Swizzle and filter expiring_metrics into a list of ProbeDetails structs.
expiring_probes: List[probe_expiry_alert.ProbeDetails] = []
for metric_name, metric in expiring_metrics.items():
bug_numbers: List[Optional[int]] = [
bug_number_from_url(url) for url in metric["bugs"]
]
biggest_bug_number: Optional[int] = max(
[bug for bug in bug_numbers if bug is not None], default=None
)
if biggest_bug_number is not None:
product, component = probe_expiry_alert.get_bug_component(
biggest_bug_number, bugzilla_api_key
)
else:
product, component = None, None
if product is None and component is None:
product = probe_expiry_alert.BUG_DEFAULT_PRODUCT
component = probe_expiry_alert.BUG_DEFAULT_COMPONENT
expiring_probes.append(
probe_expiry_alert.ProbeDetails(
metric_name,
product,
component,
metric.get("notification_emails", []),
biggest_bug_number,
)
)
# Debug print time
print(f"Found {len(expiring_probes)} 'probes' expiring in nightly {next_version}:")
print([probe.name for probe in expiring_probes])
metrics_to_bug_numbers = probe_expiry_alert.file_bugs(
expiring_probes,
str(latest_nightly_version),
bugzilla_api_key,
dry_run,
BUG_WHITEBOARD_TAG,
BUG_SUMMARY_TEMPLATE,
BUG_DESCRIPTION_TEMPLATE,
)
# Swizzle out to a metric_name -> List[bug urls] dict
bug_urls_to_metrics = defaultdict(list)
for metric_name, bug_number in metrics_to_bug_numbers.items():
bug_urls_to_metrics[
probe_expiry_alert.BUGZILLA_BUG_LINK_TEMPLATE.format(bug_id=bug_number)
].append(metric_name)
if dry_run:
return {"https://example.com/fake_bug_url/": expiring_metrics.keys()}
return bug_urls_to_metrics
def file_bugs_and_get_emails_for_expiring_metrics(
repositories: List[Repository],
metrics_by_repo: Dict[str, Dict[str, Dict[str, Any]]],
bugzilla_api_key: Optional[str],
dry_run: bool = True,
) -> Optional[Dict[str, EmailInfo]]:
"""
If the provided repositories and metrics contain FOG-using repos:
* Determine which metrics are expiring in the next version.
* File bugs in Bugzilla for them, in the product and component of the most recent bug.
At most one bug per metric category. (Doesn't happen if you don't provide an API key.)
* Return a list of emails to send. At most one per FOG repo.
"""
if len(FOG_REPOS & metrics_by_repo.keys()) == 0:
print("No FOG-using repositories. Nothing to do.")
return None
# Glean repositories have a default list of notification emails we should include as well.
repo_addresses = {
repo.name: repo.notification_emails
for repo in repositories
if repo.name in FOG_REPOS
}
current_metrics_by_repo = get_current_metrics_by_repo(metrics_by_repo)
emails = {}
for fog_repo in FOG_REPOS:
if fog_repo not in metrics_by_repo:
continue
current_metrics: Dict[str, Dict] = current_metrics_by_repo[fog_repo]
latest_nightly_version: str = probe_expiry_alert.get_latest_nightly_version()
expiring_metrics: Dict[str, Dict] = get_expiring_metrics(
current_metrics, latest_nightly_version
)
print(f"Found {len(expiring_metrics)} expiring metrics in {fog_repo}.")
if len(expiring_metrics) == 0:
continue
metrics_addresses = set(repo_addresses[fog_repo])
for metric in expiring_metrics.values():
metrics_addresses.update(metric["notification_emails"])
addresses = list(metrics_addresses)
filed_bugs: Dict[str, List[str]] = file_bugs(
expiring_metrics, latest_nightly_version, bugzilla_api_key, dry_run
)
expiring_bugs_list = []
for bug_url, bug_metrics in filed_bugs.items():
# Sort the metric names for easier reading
bug_metrics = list(bug_metrics)
bug_metrics.sort()
expiring_metrics_list_str = "\n".join(bug_metrics)
expiring_bugs_list.append(f"{bug_url}:\n{expiring_metrics_list_str}")
# Nothing expiring? No emails needed.
if len(expiring_bugs_list) == 0:
continue
emails[f"expired_metrics_{fog_repo}"] = EmailInfo(
emails=[
{
"subject": f"Expired metrics in {fog_repo}",
"message": EXPIRED_METRICS_EMAIL_TEMPLATE.format(
expiring_bugs_list="\n".join(expiring_bugs_list),
version=int(latest_nightly_version),
),
}
],
addresses=addresses,
)
return emails