sync/notify/results.py (389 lines of code) (raw):

from __future__ import annotations import json import os from collections import defaultdict import newrelic import requests from .. import log from .. import tc from .. import commit as sync_commit from ..env import Environment from ..errors import RetryableError from ..meta import Metadata from ..repos import cinnabar from .. import wptfyi from typing import (Any, Callable, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, TYPE_CHECKING) from requests import Response if TYPE_CHECKING: from sync.repos import Repo from sync.downstream import DownstreamSync from sync.meta import MetaLink from sync.tc import TaskGroupView Logs = Mapping[str, Mapping[str, List[Any]]] # Any is really "anything with a json method" ResultsEntry = Tuple[str, Optional[str], "Result"] JobResultsSummary = MutableMapping[str, MutableMapping[str, MutableMapping[str, int]]] logger = log.get_logger(__name__) env = Environment() passing_statuses = frozenset(["PASS", "OK"]) statuses = frozenset(["OK", "PASS", "CRASH", "FAIL", "TIMEOUT", "ERROR", "NOTRUN", "PRECONDITION_FAILED"]) browsers = ["firefox", "chrome", "safari"] class StatusResult: def __init__(self, base: str | None = None, head: str | None = None) -> None: self.base: str | None = None self.head: str | None = None self.head_expected: list[str] = [] self.base_expected: list[str] = [] def set(self, has_changes: bool, status: str, expected: list[str]) -> None: if has_changes: self.head = status self.head_expected = expected else: self.base = status self.base_expected = expected def is_crash(self) -> bool: return self.head == "CRASH" def is_new_non_passing(self) -> bool: return self.base is None and self.head not in passing_statuses def is_regression(self) -> bool: # Regression if we go from a pass to a fail or a fail to a worse # failure and the result isn't marked as a known intermittent return ((self.base in passing_statuses and self.head not in passing_statuses and self.head not in self.head_expected) or (self.base == "FAIL" and self.head in ("TIMEOUT", "ERROR", "CRASH", "NOTRUN") and self.head not in self.head_expected)) def is_disabled(self) -> bool: return self.head == "SKIP" class Result: def __init__(self) -> None: # Mapping {browser: {platform: StatusResult}} self.statuses: MutableMapping[str, MutableMapping[str, StatusResult]] = defaultdict( lambda: defaultdict( StatusResult)) self.bug_links: list[MetaLink] = [] def iter_filter_status(self, fn: Callable, ) -> Iterator[tuple[str, str, StatusResult]]: for browser, by_platform in self.statuses.items(): for platform, status in by_platform.items(): if fn(browser, platform, status): yield browser, platform, status def set_status(self, browser: str, job_name: str, run_has_changes: bool, status: str, expected: list[str]) -> None: self.statuses[browser][job_name].set(run_has_changes, status, expected) def is_consistent(self, browser: str, target: str = "head") -> bool: assert target in ["base", "head"] browser_results: Mapping[str, StatusResult] | None = self.statuses.get( browser) if not browser_results: return True first_result = getattr(next(iter(browser_results.values())), target) return all(getattr(result, target) == first_result for result in browser_results.values()) def is_browser_only_failure(self, target_browser: str = "firefox") -> bool: gh_target = self.statuses[target_browser].get("GitHub") gh_other = [self.statuses.get(browser, {}).get("GitHub") for browser in browsers if browser != target_browser] if gh_target is None: # We don't have enough information to determine GH-only failures return False if gh_target.head in passing_statuses: return False if any(item is None or item.head not in passing_statuses for item in gh_other): return False # If it's passing on all internal platforms, assume a pref has to be # set or something. We could do better than this gecko_ci_statuses = [status for job_name, status in self.statuses[target_browser].items() if job_name != "GitHub"] if (gecko_ci_statuses and all(status.head in passing_statuses for status in gecko_ci_statuses)): return False return True def is_github_only_failure(self, target_browser: str = "firefox") -> bool: gh_status = self.statuses[target_browser].get("GitHub") if not gh_status: return False if gh_status.head in passing_statuses: return False # Check if any non-GitHub status is a pass if any(self.iter_filter_status( lambda browser, platform, status: (browser == target_browser and platform != "GitHub" and status.head in passing_statuses))): return False return True def has_crash(self, target_browser: str = "firefox") -> bool: return any(self.iter_filter_status( lambda browser, _, status: (browser == target_browser and status.is_crash()))) def has_new_non_passing(self, target_browser: str = "firefox") -> bool: return any(self.iter_filter_status( lambda browser, _, status: (browser == target_browser and status.is_new_non_passing()))) def has_regression(self, target_browser: str = "firefox") -> bool: return any(self.iter_filter_status( lambda browser, _, status: (browser == target_browser and status.is_regression()))) def has_disabled(self, target_browser: str = "firefox") -> bool: return any(self.iter_filter_status( lambda browser, _, status: (browser == target_browser and status.is_disabled()))) def has_non_disabled(self, target_browser: str = "firefox") -> bool: return any(self.iter_filter_status( lambda browser, platform, status: (browser == target_browser and platform != "GitHub" and not status.is_disabled()))) def has_passing(self) -> bool: return any(self.iter_filter_status( lambda _browser, _platform, status: status.head in passing_statuses)) def has_link(self, status: str | None = None) -> bool: if status is None: return len(self.bug_links) > 0 return any(item for item in self.bug_links if item.status == status) class TestResult(Result): def __init__(self) -> None: # Mapping {subtestname: SubtestResult} self.subtests: MutableMapping[str, SubtestResult] = defaultdict(SubtestResult) super().__init__() class SubtestResult(Result): pass class ResultsSummary: def __init__(self) -> None: self.parent_tests = 0 self.subtests = 0 self.job_results: JobResultsSummary = defaultdict( lambda: defaultdict( lambda: defaultdict(int))) class Results: def __init__(self) -> None: # Mapping of {test: TestResult} self.test_results: MutableMapping[str, TestResult] = defaultdict(TestResult) self.errors: list[tuple[str, bool]] = [] self.wpt_sha: str | None = None self.treeherder_url: str | None = None def iter_results(self) -> Iterator[ResultsEntry]: for test_name, result in self.test_results.items(): yield test_name, None, result for subtest_name, subtest_result in result.subtests.items(): yield test_name, subtest_name, subtest_result def iter_filter(self, fn: Callable) -> Iterator[ResultsEntry]: for test_name, subtest_name, result in self.iter_results(): if fn(test_name, subtest_name, result): yield test_name, subtest_name, result def add_jobs_from_log_files(self, logs_no_changes: Logs, logs_with_changes: Logs) -> None: for (browser_logs, run_has_changes) in [(logs_with_changes, True), (logs_no_changes, False)]: for browser, browser_job_logs in browser_logs.items(): for job_name, job_logs in browser_job_logs.items(): if not run_has_changes and job_name not in self.test_results: continue for log_data in job_logs: try: json_data = log_data.json() except ValueError: self.errors.append(("Failed to parse data for %s %s" % (browser, job_name), False)) continue self.add_log(json_data, browser, job_name, run_has_changes) def add_log(self, data: dict[str, Any], browser: str, job_name: str, run_has_changes: bool) -> None: for test in data["results"]: use_result = run_has_changes or test["test"] in self.test_results if use_result: status = test["status"] expected = [test.get("expected", status)] + test.get("known_intermittent", []) self.test_results[test["test"]].set_status(browser, job_name, run_has_changes, status, expected) for subtest in test["subtests"]: status = subtest["status"] expected = ([subtest.get("expected", status)] + subtest.get("known_intermittent", [])) (self.test_results[test["test"]] .subtests[subtest["name"]] .set_status(browser, job_name, run_has_changes, status, expected)) def add_metadata(self, metadata: Metadata) -> None: for test, result in self.test_results.items(): for meta_link in metadata.iter_bug_links(test, product="firefox"): if meta_link.subtest is None: result.bug_links.append(meta_link) else: if meta_link.subtest in result.subtests: result.subtests[meta_link.subtest].bug_links.append(meta_link) def browsers(self) -> set[str]: browsers = set() for result in self.test_results.values(): browsers |= {item for item in result.statuses.keys()} return browsers def job_names(self, browser: str) -> set[str]: job_names = set() for result in self.test_results.values(): if browser in result.statuses: for job_name in result.statuses[browser].keys(): job_names.add(job_name) return job_names def summary(self) -> ResultsSummary: summary = ResultsSummary() # Work out how many tests ran, etc. summary.parent_tests = 0 summary.subtests = 0 def update_for_result(result: SubtestResult | TestResult) -> None: for browser, browser_result in result.statuses.items(): for job_name, job_result in browser_result.items(): if job_result.head: summary.job_results[job_result.head][browser][job_name] += 1 for test_result in self.test_results.values(): summary.parent_tests += 1 # type: ignore summary.subtests = len(test_result.subtests) update_for_result(test_result) for subtest_result in test_result.subtests.values(): update_for_result(subtest_result) return summary def iter_crashes(self, target_browser: str = "firefox") -> Iterator[ResultsEntry]: return self.iter_filter(lambda _test, _subtest, result: result.has_crash(target_browser)) def iter_new_non_passing(self, target_browser: str = "firefox") -> Iterator[ResultsEntry]: return self.iter_filter(lambda _test, _subtest, result: result.has_new_non_passing(target_browser)) def iter_regressions(self, target_browser: str = "firefox") -> Iterator[ResultsEntry]: return self.iter_filter(lambda _test, _subtest, result: result.has_regression(target_browser)) def iter_disabled(self, target_browser: str = "firefox") -> Iterator[ResultsEntry]: return self.iter_filter(lambda _test, _subtest, result: result.has_disabled(target_browser)) def iter_browser_only(self, target_browser: str = "firefox") -> Iterator[ResultsEntry]: def is_browser_only(_test: str, _subtest: str | None, result: Result) -> bool: return result.is_browser_only_failure(target_browser) return self.iter_filter(is_browser_only) def get_push_changeset(commit: sync_commit.GeckoCommit) -> str | None: url = ("https://hg.mozilla.org/mozilla-central/json-pushes?changeset=%s&version=2&tipsonly=1" % commit.canonical_rev) headers = {"Accept": "application/json", "User-Agent": "wpt-sync"} resp = requests.get(url, headers=headers) try: resp.raise_for_status() except requests.exceptions.RequestException: if resp.status_code != 404: newrelic.agent.record_exception() return None result = resp.json() pushes = result["pushes"] [changeset] = list(pushes.values())[0]["changesets"] return changeset def get_central_tasks(git_gecko: Repo, sync: DownstreamSync) -> TaskGroupView | None: merge_base_commit = sync_commit.GeckoCommit( git_gecko, git_gecko.merge_base(sync.gecko_commits.head.sha1, env.config["gecko"]["refs"]["central"])[0]) hg_push_sha = get_push_changeset(merge_base_commit) if hg_push_sha is None: return None try: git_push_sha = cinnabar(git_gecko).hg2git(hg_push_sha) except ValueError: newrelic.agent.record_exception() return None push_commit = sync_commit.GeckoCommit(git_gecko, git_push_sha) if push_commit is None: return None taskgroup_id, state, _ = tc.get_taskgroup_id("mozilla-central", push_commit.canonical_rev) if taskgroup_id is None: return None if state != "completed": logger.info("mozilla-central decision task has state %s" % state) return None taskgroup_id = tc.normalize_task_id(taskgroup_id) tasks = tc.TaskGroup(taskgroup_id) wpt_tasks = tasks.view(tc.is_suite_fn("web-platform-tests")) if not wpt_tasks: return None if not wpt_tasks.is_complete(allow_unscheduled=True): return None dest = os.path.join(env.config["root"], env.config["paths"]["try_logs"], "central", push_commit.sha1) wpt_tasks.download_logs(dest, ["wptreport.json"]) return wpt_tasks class LogFile: def __init__(self, path): self.path = path def json(self) -> dict[str, Any]: with open(self.path) as f: return json.load(f) def get_logs(tasks: Iterable[dict[str, Any]], job_prefix: str = "Gecko-") -> Mapping[str, Mapping[str, list[LogFile]]]: logs = defaultdict(list) for task in tasks: job_name = job_prefix + tc.parse_job_name( task.get("task", {}).get("metadata", {}).get("name", "unknown")) runs = task.get("status", {}).get("runs", []) if not runs: continue run = runs[-1] log = run.get("_log_paths", {}).get("wptreport.json") if log: logs[job_name].append(LogFile(log)) return {"firefox": logs} def add_gecko_data(sync: DownstreamSync, results: Results) -> bool: complete_try_push = None for try_push in sorted(sync.try_pushes(), key=lambda x: -x.process_name.seq_id): if try_push.status == "complete": complete_try_push = try_push break if not complete_try_push: logger.info("No complete try push available for PR %s" % sync.pr) return False # Get the list of central tasks and download the wptreport logs central_tasks = get_central_tasks(sync.git_gecko, sync) if not central_tasks: logger.info("Not all mozilla-central results available for PR %s" % sync.pr) return False # TODO: this method should be @mut('sync') assert sync._lock is not None with try_push.as_mut(sync._lock): try_tasks = complete_try_push.tasks() if try_tasks is None: logger.error("Trypush didn't have a taskgroup id") return False try: complete_try_push.download_logs(try_tasks.wpt_tasks, first_only=True) except RetryableError: logger.warning("Downloading logs failed") return False try_log_files = get_logs(try_tasks.wpt_tasks) central_log_files = get_logs(central_tasks) results.add_jobs_from_log_files(central_log_files, try_log_files) results.treeherder_url = complete_try_push.treeherder_url return True def add_wpt_fyi_data(sync: DownstreamSync, results: Results) -> bool: head_sha1 = sync.wpt_commits.head.sha1 logs = [] for target, run_has_changes in [("base", False), ("head", True)]: target_results: MutableMapping[str, dict[str, list[Response]]] = defaultdict( dict) try: runs = wptfyi.get_runs(sha=head_sha1, labels=["pr_%s" % target]) for run in runs: if run["browser_name"] in browsers: browser = run["browser_name"] target_results[browser]["GitHub"] = [requests.get(run["raw_results_url"])] except requests.HTTPError as e: logger.error("Unable to fetch results from wpt.fyi: %s" % e) return False logs.append(target_results) results.add_jobs_from_log_files(*logs) results.wpt_sha = head_sha1 return True def for_sync(sync: DownstreamSync) -> Results: results = Results() if not add_gecko_data(sync, results): results.errors.append(("Failed to get Gecko data", True)) if not add_wpt_fyi_data(sync, results): results.errors.append(("Failed to get wptfyi data", True)) results.add_metadata(Metadata(sync.process_name)) return results