sync/downstream.py (927 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """Functionality to support VCS syncing for WPT.""" from __future__ import annotations import os import re import subprocess import traceback from collections import defaultdict from datetime import datetime import enum import git import newrelic from . import bugcomponents from . import gitutils from . import log from . import notify from . import trypush from . import commit as sync_commit from .base import FrozenDict, entry_point from .commit import GeckoCommit, WptCommit from .env import Environment from .errors import AbortError from .gitutils import update_repositories from .lock import SyncLock, mut, constructor from .projectutil import Mach, WPT from .sync import LandableStatus, SyncProcess from .trypush import TryPush from git.objects.tree import Tree from git.repo.base import Repo from typing import Any, List, Mapping, MutableMapping, cast, TYPE_CHECKING logger = log.get_logger(__name__) env = Environment() @enum.unique class DownstreamAction(enum.Enum): ready = 0 manual_fix = 1 try_push = 2 try_push_stability = 3 wait_try = 4 wait_upstream = 5 try_rebase = 6 def reason_str(self): # type () -> Text return {DownstreamAction.ready: "", DownstreamAction.manual_fix: "", DownstreamAction.try_push: "valid try push required", DownstreamAction.try_push_stability: "stability try push required", DownstreamAction.wait_try: "waiting for try to complete", DownstreamAction.wait_upstream: "waiting for PR to be merged", DownstreamAction.try_rebase: "Need to rebase try push"}.get(self, "") class DownstreamSync(SyncProcess): sync_type = "downstream" obj_id = "pr" statuses = ("open", "complete") status_transitions = [("open", "complete"), ("complete", "open")] # Unfortunately, if a backout occurs @classmethod @constructor(lambda args: ("downstream", str(args["pr_id"]))) def new(cls, lock: SyncLock, git_gecko: Repo, git_wpt: Repo, wpt_base: str, pr_id: int, pr_title: str, pr_body: str, ) -> DownstreamSync: # TODO: add PR link to the comment sync = super().new(lock, git_gecko, git_wpt, pr=pr_id, gecko_base=DownstreamSync.gecko_landing_branch(), gecko_head=DownstreamSync.gecko_landing_branch(), wpt_base=wpt_base, wpt_head="origin/pr/%s" % pr_id) with sync.as_mut(lock): sync.create_bug(git_wpt, pr_id, pr_title, pr_body) return sync def make_bug_comment(self, git_wpt: Repo, pr_id: int, pr_title: str, pr_body: str | None) -> str: pr_msg = env.gh_wpt.cleanup_pr_body(pr_body) # TODO: Ensure we have the right set of commits before geting here author = self.wpt_commits[0].author if self.wpt_commits else b"" msg = ["Sync web-platform-tests PR %s into mozilla-central" " (this bug is closed when the sync is complete)." % pr_id, "", "PR: %s" % env.gh_wpt.pr_url(pr_id), "" "Details from upstream follow.", "", "%s wrote:" % author.decode("utf8", "ignore"), "> %s" % pr_title] if pr_msg: msg.append("> ") msg.extend("> %s" % line for line in pr_msg.split("\n")) return "\n".join(msg) @classmethod def has_metadata(cls, message: bytes) -> bool: required_keys = ["wpt-commits", "wpt-pr"] metadata = sync_commit.get_metadata(message) return all(item in metadata for item in required_keys) @property def landable_status(self) -> LandableStatus: if self.skip: return LandableStatus.skip if self.metadata_ready: return LandableStatus.ready if self.error: return LandableStatus.error return LandableStatus.missing_try_results @property def pr_head(self) -> WptCommit: """Head commit of the PR. Typically this is equal to self.wpt_commits.head but if the PR is rebased onto master then the head of the PR won't match the commit that actually merged unless it happens to be a fast-forward""" return sync_commit.WptCommit(self.git_wpt, "origin/pr/%s" % self.pr) @SyncProcess.error.setter # type: ignore @mut() def error(self, value: str | None) -> Any | None: if self.pr: if value is not None: env.gh_wpt.add_labels(self.pr, "mozilla:gecko-blocked") elif self.error is not None: env.gh_wpt.remove_labels(self.pr, "mozilla:gecko-blocked") return SyncProcess.error.fset(self, value) # type: ignore @property def pr_status(self): return self.data.get("pr-status", "open") @pr_status.setter # type: ignore @mut() def pr_status(self, value): self.data["pr-status"] = value @property def notify_bugs(self) -> FrozenDict: return FrozenDict(**self.data.get("notify-bugs", {})) @notify_bugs.setter # type: ignore @mut() def notify_bugs(self, value: FrozenDict) -> None: self.data["notify-bugs"] = value.as_dict() @property def next_action(self) -> DownstreamAction: """Work out the next action for the sync based on the current status. Returns a DownstreamAction indicating the next step to take.""" if self.data.get("force-metadata-ready"): # This is mostly for testing return DownstreamAction.ready if self.skip: return DownstreamAction.ready if self.error: if self.tried_to_rebase is False: return DownstreamAction.try_rebase return DownstreamAction.manual_fix latest_try_push = self.latest_valid_try_push if (latest_try_push and not latest_try_push.taskgroup_id): if latest_try_push.status == "open": return DownstreamAction.wait_try elif latest_try_push.infra_fail: if self.tried_to_rebase is False: return DownstreamAction.try_rebase return DownstreamAction.manual_fix assert self.pr is not None pr = env.gh_wpt.get_pull(self.pr) if pr.merged: # Wait till PR is merged to do anything if not latest_try_push: if self.requires_stability_try: logger.debug("Sync for PR %s requires a stability try push" % self.pr) return DownstreamAction.try_push_stability elif self.requires_try: return DownstreamAction.try_push else: return DownstreamAction.ready if latest_try_push.status != "complete": return DownstreamAction.wait_try if self.requires_stability_try and not latest_try_push.stability: return DownstreamAction.try_push_stability # If we have infra failure, flag for human intervention. Retrying stability # runs would be very costly if latest_try_push.infra_fail: tasks = latest_try_push.tasks() if tasks is None: return DownstreamAction.manual_fix # Check if we had any successful tests if tasks.has_completed_tests(): return DownstreamAction.ready else: return DownstreamAction.manual_fix return DownstreamAction.ready else: return DownstreamAction.wait_upstream @property def metadata_ready(self) -> bool: return self.next_action == DownstreamAction.ready @property def results_notified(self) -> bool: return self.data.get("results-notified", False) @results_notified.setter # type: ignore @mut() def results_notified(self, value): self.data["results-notified"] = value @property def skip(self) -> bool: return self.data.get("skip", False) @skip.setter # type: ignore @mut() def skip(self, value: bool) -> None: self.data["skip"] = value @property def tried_to_rebase(self) -> bool: return self.data.get("tried_to_rebase", False) @tried_to_rebase.setter # type: ignore @mut() def tried_to_rebase(self, value: bool) -> None: self.data["tried_to_rebase"] = value @mut() def try_rebase(self) -> None: logger.info("Rebasing onto %s" % self.gecko_landing_branch()) initial_tried_to_rebase = self.tried_to_rebase self.tried_to_rebase = True commit_hash_before_rebase = self.gecko_commits.base.sha1 self.gecko_rebase(self.gecko_landing_branch(), abort_on_fail=True) commit_hash_after_rebase = self.gecko_commits.base.sha1 if commit_hash_before_rebase == commit_hash_after_rebase: self.tried_to_rebase = initial_tried_to_rebase @property def wpt(self) -> WPT: git_work = self.wpt_worktree.get() git_work.git.reset(hard=True) return WPT(os.path.join(git_work.working_dir)) @property def requires_try(self) -> bool: return not self.skip and len(self.gecko_commits) > 0 @property def requires_stability_try(self) -> bool: return self.requires_try and self.has_affected_tests_readonly @property def latest_valid_try_push(self) -> TryPush | None: """Try push for the current head of the PR, if any. In legacy cases we don't store the wpt-head for the try push so we always assume that any try push is valid""" latest_try_push = self.latest_try_push if latest_try_push is None: return None if len(self.gecko_commits) == 0: # Something most likely is not correct, but we can't fix it here. return latest_try_push if self.metadata_commit is not None: if len(self.gecko_commits) == 1: # Apparently we only have a metadata commit and the actual change got rebased away # In this case the metadata commit is probably wrong, but we can't fix that here return latest_try_push gecko_head = self.gecko_commits[-2] else: gecko_head = self.gecko_commits[-1] # Check if the try push is for the current PR head if (latest_try_push.wpt_head and latest_try_push.wpt_head not in (self.pr_head.sha1, self.wpt_commits.head.sha1)): logger.info("Got more commits since latest try push") return None if latest_try_push.gecko_head not in {gecko_head.sha1, self.metadata_commit.sha1 if self.metadata_commit else None}: logger.info("Gecko commits changed since latest try push") return None return latest_try_push @mut() def try_paths(self) -> Mapping[str, list[str]]: """Return a mapping of {test_type: path} for tests that should be run on try. Paths are relative to the gecko root""" affected_tests = self.affected_tests() base_path = env.config["gecko"]["path"]["wpt"] # Filter out paths that aren't in the head. # This can happen if the files were moved in a previous PR that we haven't yet # merged affected_paths = {} head_tree = self.gecko_commits.head.commit.tree def contains(tree: Tree, path: str) -> bool: path_parts = path.split(os.path.sep) for part in path_parts: try: tree = tree[part] except KeyError: return False return True for test_type, wpt_paths in affected_tests.items(): paths = [] for path in wpt_paths: gecko_path = os.path.join(base_path, path) if contains(head_tree, gecko_path): paths.append(gecko_path) if paths: affected_paths[test_type] = paths if not affected_paths: # Default to just running infra tests infra_path = os.path.join(base_path, "infrastructure/") affected_paths = { "testharness": [infra_path], "reftest": [infra_path], "wdspec": [infra_path], } return affected_paths @mut() def next_try_push(self, try_cls: type = trypush.TryFuzzyCommit) -> TryPush | None: """Schedule a new try push for the sync, if required. A stability try push will only be scheduled if the upstream PR is approved, which we check directly from GitHub. Therefore returning None is not an indication that the sync is ready to land, just that there's no further action at this time. """ if self.skip or self.status != "open": return None self.update_commits() # Ensure affected tests is up to date self.affected_tests() action = self.next_action if action == DownstreamAction.try_rebase: self.try_rebase() action = self.next_action if action == DownstreamAction.try_push: return TryPush.create(self._lock, self, affected_tests=self.try_paths(), stability=False, hacks=False, try_cls=try_cls) elif action == DownstreamAction.try_push_stability: return TryPush.create(self._lock, self, affected_tests=self.try_paths(), stability=True, hacks=False, try_cls=try_cls) return None @mut() def create_bug(self, git_wpt: Repo, pr_id: int, pr_title: str, pr_body: str | None) -> None: if self.bug is not None: return comment = self.make_bug_comment(git_wpt, pr_id, pr_title, pr_body) summary = f"[wpt-sync] Sync PR {pr_id} - {pr_title}" if len(summary) > 255: summary = summary[:254] + "\u2026" bug = env.bz.new(summary=summary, comment=comment, product="Testing", component="web-platform-tests", whiteboard="[wptsync downstream]", priority="P4", url=env.gh_wpt.pr_url(pr_id)) self.bug = bug # type: ignore @mut() def update_wpt_commits(self) -> None: """Update the set of commits in the PR from the latest upstream.""" if not self.wpt_commits.head or self.wpt_commits.head.sha1 != self.pr_head.sha1: self.wpt_commits.head = self.pr_head # type: ignore if (len(self.wpt_commits) == 0 and self.git_wpt.is_ancestor(self.wpt_commits.head.commit, self.git_wpt.rev_parse("origin/master"))): # The commits landed on master so we need to change the commit # range to not use origin/master as a base base_commit = None assert isinstance(self.wpt_commits.head, sync_commit.WptCommit) assert self.wpt_commits.head.pr() == self.pr for commit in self.git_wpt.iter_commits(self.wpt_commits.head.sha1): wpt_commit = sync_commit.WptCommit(self.git_wpt, commit) if wpt_commit.pr() != self.pr: base_commit = wpt_commit break assert base_commit is not None self.data["wpt-base"] = base_commit.sha1 self.wpt_commits.base = base_commit # type: ignore @mut() def update_github_check(self) -> None: if not env.config["web-platform-tests"]["github"]["checks"]["enabled"]: return title = "gecko/sync" head_sha = self.wpt_commits.head.sha1 # TODO: maybe just get this from GitHub rather than store it existing = self.data.get("check") check_id = None if existing is not None: if existing.get("sha1") == head_sha: check_id = existing.get("id") assert self.bug is not None url = env.bz.bugzilla_url(self.bug) external_id = str(self.bug) # For now hardcode the status at completed status = "completed" conclusion = "neutral" completed_at = datetime.now() output = {"title": "Gecko sync for PR %s" % self.pr, "summary": "Gecko sync status: %s" % self.landable_status.reason_str(), "test": self.build_check_text(head_sha)} try: logger.info("Generating GH check status") resp = env.gh_wpt.set_check(title, check_id=check_id, commit_sha=head_sha, url=url, external_id=external_id, status=status, started_at=None, conclusion=conclusion, completed_at=completed_at, output=output) self.data["check"] = {"id": resp["id"], "sha1": head_sha} except AssertionError: raise except Exception: # Just log errors trying to update the check status, but otherwise don't fail newrelic.agent.record_exception() import traceback logger.error("Creating PR status check failed") logger.error(traceback.format_exc()) def build_check_text(self, commit_sha: str) -> str: text = """ # Summary [Bugzilla](%(bug_link)s) # Try pushes %(try_push_section)s %(error_section)s """ try_pushes = [try_push for try_push in sorted(self.try_pushes(), key=lambda x: -x.process_name.seq_id) if try_push.wpt_head == commit_sha] if not try_pushes: try_push_section = "No current try pushes" else: items = [] for try_push in try_pushes: link_str = "Try push" + (" (stability)" if try_push.stability else "") items.append(" * [{}]({}): {}{}".format(link_str, try_push.treeherder_url, try_push.status, " infra-fail" if try_push.infra_fail else "")) try_push_section = "\n".join(items) error_section = "# Errors:\n ```%s```" % self.error if self.error else "" assert self.bug is not None return text % {"bug_link": env.bz.bugzilla_url(self.bug), "try_push_section": try_push_section, "error_section": error_section} def files_changed(self) -> set[str]: # TODO: Would be nice to do this from mach with a gecko worktree return set(self.wpt.files_changed().decode("utf8", "replace").split("\n")) @property def metadata_commit(self) -> GeckoCommit | None: if len(self.gecko_commits) == 0: return None if self.gecko_commits[-1].metadata.get("wpt-type") == "metadata": commit = self.gecko_commits[-1] assert isinstance(commit, GeckoCommit) return commit return None @mut() def ensure_metadata_commit(self) -> GeckoCommit: if self.metadata_commit: return self.metadata_commit git_work = self.gecko_worktree.get() if "metadata-commit" in self.data: gitutils.cherry_pick(git_work, self.data["metadata-commit"]) # We only care about the cached value inside this function, so # remove it now so we don't have to maintain it del self.data["metadata-commit"] else: assert all(item.metadata.get("wpt-type") != "metadata" for item in self.gecko_commits) metadata = { "wpt-pr": str(self.pr), "wpt-type": "metadata" } msg = sync_commit.Commit.make_commit_msg( b"Bug %s [wpt PR %s] - Update wpt metadata, a=testonly" % (str(self.bug).encode("utf8") if self.bug is not None else b"None", str(self.pr).encode("utf8") if self.pr is not None else b"None"), metadata) sync_commit.create_commit(git_work, msg, allow_empty=True) commit = git_work.commit("HEAD") return sync_commit.GeckoCommit(self.git_gecko, commit.hexsha) @mut() def set_bug_component(self, files_changed: set[str]) -> None: new_component = bugcomponents.get(self.gecko_worktree.get(), files_changed, default=("Testing", "web-platform-tests")) env.bz.set_component(self.bug, *new_component) @mut() def move_metadata(self, renames: dict[str, str]) -> None: if not renames: return self.ensure_metadata_commit() gecko_work = self.gecko_worktree.get() metadata_base = env.config["gecko"]["path"]["meta"] for old_path, new_path in renames.items(): old_meta_path = os.path.join(metadata_base, old_path + ".ini") if os.path.exists(os.path.join(gecko_work.working_dir, old_meta_path)): new_meta_path = os.path.join(metadata_base, new_path + ".ini") dir_name = os.path.join(gecko_work.working_dir, os.path.dirname(new_meta_path)) if not os.path.exists(dir_name): os.makedirs(dir_name) gecko_work.index.move((old_meta_path, new_meta_path)) self._commit_metadata() @mut() def update_bug_components(self, renames: dict[str, str]) -> None: if not renames: return self.ensure_metadata_commit() gecko_work = self.gecko_worktree.get() bugcomponents.update(gecko_work, renames) self._commit_metadata() @mut() def _commit_metadata(self, amend: bool = True) -> None: assert self.metadata_commit gecko_work = self.gecko_worktree.get() if gecko_work.is_dirty(): logger.info("Updating metadata commit") try: gecko_work.git.commit(amend=True, no_edit=True) except git.GitCommandError as e: if amend and e.status == 1 and "--allow-empty" in e.stdout: logger.warning("Amending commit made it empty, resetting") gecko_work.git.reset("HEAD^") @mut() def update_commits(self) -> bool: exception = None try: self.update_wpt_commits() # Check if this sync reverts some unlanded earlier PR and if so mark both # as skip and don't try to apply the commits here reverts = self.reverts_syncs() if reverts: all_open = all(item.status == "open" for item in reverts) for revert_sync in reverts: if revert_sync.status == "open": logger.info("Skipping sync for PR %s because it is later reverted" % revert_sync.pr) with SyncLock.for_process(revert_sync.process_name) as revert_lock: assert isinstance(revert_lock, SyncLock) with revert_sync.as_mut(revert_lock): revert_sync.skip = True # type: ignore # TODO: If this commit reverts some closed syncs, then set the metadata # commit of this commit to the revert of the metadata commit from that # sync if all_open: logger.info("Sync was a revert of other open syncs, skipping") self.skip = True # type: ignore return False old_gecko_head = self.gecko_commits.head.sha1 logger.debug(f"PR {self.pr} gecko HEAD was {old_gecko_head}") def plain_apply() -> bool: logger.info("Applying on top of the current commits") self.wpt_to_gecko_commits() return True def rebase_apply() -> bool: logger.info("Applying with a rebase onto latest integration branch") new_base = self.gecko_integration_branch() gecko_work = self.gecko_worktree.get() reset_head = "HEAD" if (len(self.gecko_commits) > 0 and self.gecko_commits[0].metadata.get("wpt-type") == "dependent"): # If we have any dependent commits first reset to the new # head. This prevents conflicts if the dependents already # landed # TODO: Actually check if they landed? reset_head = new_base gecko_work.git.reset(reset_head, hard=True) self.gecko_rebase(new_base, abort_on_fail=True) self.wpt_to_gecko_commits() return True def dependents_apply() -> bool: logger.info("Applying with upstream dependents") dependencies = self.unlanded_commits_same_files() if dependencies: logger.info("Found dependencies:\n%s" % "\n".join(item.msg.splitlines()[0].decode("utf8", "replace") for item in dependencies)) self.wpt_to_gecko_commits(dependencies) assert self.bug is not None env.bz.comment(self.bug, "PR %s applied with additional changes from upstream: %s" % (self.pr, ", ".join(item.sha1 for item in dependencies))) return True return False error = None for fn in [plain_apply, rebase_apply, dependents_apply]: try: if fn(): error = None break else: logger.error("Applying with %s was a no-op" % fn.__name__) except Exception as e: import traceback error = e logger.error("Applying with %s errored" % fn.__name__) logger.error(traceback.format_exc()) if error is not None: raise error logger.debug(f"PR {self.pr} gecko HEAD now {self.gecko_commits.head.sha1}") if old_gecko_head == self.gecko_commits.head.sha1: logger.info("Gecko commits did not change for PR %s" % self.pr) return False # If we have a metadata commit already, ensure it's applied now if "metadata-commit" in self.data: self.ensure_metadata_commit() renames = self.wpt_renames() self.move_metadata(renames) self.update_bug_components(renames) files_changed = self.files_changed() self.set_bug_component(files_changed) except Exception as e: exception = e raise finally: # If we managed to apply all the commits without error, reset the error flag # otherwise update it with the current exception self.error = exception return True @mut() def wpt_to_gecko_commits(self, dependencies: list[WptCommit] | None = None) -> None: """Create a patch based on wpt branch, apply it to corresponding gecko branch. If there is a commit with wpt-type metadata, this function will remove it. The sha1 will be stashed in self.data["metadata-commit"] so it can be restored next time we call ensure_metadata_commit() """ # The logic here is that we can retain any dependent commits as long as we have # at least the set in the dependencies array, followed by the gecko commits created # from the wpt_commits, interspersed with any number of manifest commits, # followed by zero or one metadata commits if dependencies: expected_commits: list[tuple[str, WptCommit | None, bool]] = [ (item.sha1, item, True) for item in dependencies] else: # If no dependencies are supplied, retain the ones that we alredy have, if any expected_commits = [] for commit in self.gecko_commits: assert isinstance(commit, sync_commit.GeckoCommit) if commit.metadata.get("wpt-type") == "dependency": expected_commits.append((commit.metadata["wpt-commit"], None, True)) else: break # Expect all the new commits for commit in self.wpt_commits: assert isinstance(commit, WptCommit) if not commit.is_merge: expected_commits.append((commit.sha1, commit, False)) existing = [ commit for commit in self.gecko_commits if commit.metadata.get("wpt-commit") and commit.metadata.get("wpt-type") in ("dependency", None)] if TYPE_CHECKING: existing_commits = cast(List[GeckoCommit], existing) else: existing_commits = existing retain_commits = 0 for gecko_commit, (wpt_sha1, _, _) in zip(existing_commits, expected_commits): if gecko_commit.metadata.get("wpt-commit") != wpt_sha1: break retain_commits += 1 keep_commits = existing_commits[:retain_commits] maybe_add_commits = expected_commits[retain_commits:] # Strip out any leading commits that come from currently applied dependencies that are # not being retained strip_count = 0 for _, wpt_commit, _ in maybe_add_commits: if wpt_commit is not None: break strip_count += 1 add_commits = maybe_add_commits[strip_count:] if len(keep_commits) == len(existing_commits) and not add_commits: logger.info("Commits did not change") return logger.info("Keeping %i existing commits; adding %i new commits" % (len(keep_commits), len(add_commits))) if self.metadata_commit: # If we have a metadata commit, store it in self.data["metadata-commit"] # remove it when updating commits, and reapply it when we next call # ensure_metadata_commit self.data["metadata-commit"] = self.metadata_commit.sha1 reset_head = None if not keep_commits: reset_head = self.data["gecko-base"] elif len(keep_commits) < len(existing_commits): reset_head = keep_commits[-1] elif ("metadata-commit" in self.data and self.gecko_commits[-1].metadata.get("wpt-type") == "metadata"): reset_head = self.gecko_commits[-2] # Clear the set of affected tests since there are updates del self.data["affected-tests"] gecko_work = self.gecko_worktree.get() if reset_head: self.gecko_commits.head = reset_head # type: ignore gecko_work.git.reset(hard=True) for _, wpt_commit, is_dependency in add_commits: assert wpt_commit is not None logger.info("Moving commit %s" % wpt_commit.sha1) if is_dependency: metadata = { "wpt-type": "dependency", "wpt-commit": wpt_commit.sha1 } msg_filter = None else: metadata = { "wpt-pr": str(self.pr), "wpt-commit": wpt_commit.sha1 } msg_filter = self.message_filter wpt_commit.move(gecko_work, dest_prefix=env.config["gecko"]["path"]["wpt"], msg_filter=msg_filter, metadata=metadata, patch_fallback=True) def unlanded_commits_same_files(self) -> list[WptCommit]: from . import landing sync_point = landing.load_sync_point(self.git_gecko, self.git_wpt) base = sync_point["upstream"] head = "origin/master" changed = self.wpt_commits.files_changed commits = [] for commit in self.git_wpt.iter_commits(f"{base}..{head}", reverse=True, paths=list(changed)): wpt_commit = sync_commit.WptCommit(self.git_wpt, commit) # Check for same-pr rather than same-commit because we always # use the commits on the PR branch, not the merged commits. # The other option is to use the GH API to decide if the PR # merged and if so what the merge commit was, although in that # case we would still not know the commit prior to merge, which # is what we need if wpt_commit.pr() == self.pr: break commits.append(wpt_commit) return commits def message_filter(self, msg: bytes) -> tuple[bytes, dict[str, str]]: msg = sync_commit.try_filter(msg) parts = msg.split(b"\n", 1) if len(parts) > 1: summary, body = parts else: summary = parts[0] body = b"" new_msg = b"Bug %s [wpt PR %s] - %s, a=testonly\n\nSKIP_BMO_CHECK\n%s" % ( str(self.bug).encode("utf8"), str(self.pr).encode("utf8"), summary, body) return new_msg, {} @mut() def affected_tests(self, revish: Any | None = None) -> Mapping[str, list[str]]: # TODO? support files, harness changes -- don't want to update metadata if "affected-tests" not in self.data: tests_by_type: MutableMapping[str, list[str]] = defaultdict(list) logger.info("Updating MANIFEST.json") self.wpt.manifest() args = ["--show-type", "--new"] if revish: args.append(revish) logger.info("Getting a list of tests affected by changes.") try: output = self.wpt.tests_affected(*args) except subprocess.CalledProcessError: logger.error("Calling wpt tests-affected failed") return tests_by_type if output: for item_bytes in output.strip().split(b"\n"): item = item_bytes.decode("utf8", "replace") path, test_type = item.strip().split("\t") tests_by_type[test_type].append(path) self.data["affected-tests"] = tests_by_type return self.data["affected-tests"] @property def affected_tests_readonly(self) -> Mapping[str, list[str]]: if "affected-tests" not in self.data: logger.warning("Trying to get affected tests before it's set") return {} return self.data["affected-tests"] @property def has_affected_tests_readonly(self) -> bool: if "affected-tests" not in self.data: logger.warning("Trying to get affected tests before it's set") return True return bool(self.data["affected-tests"]) @mut() def update_metadata(self, log_files, stability=False): meta_path = env.config["gecko"]["path"]["meta"] gecko_work = self.gecko_worktree.get() mach = Mach(gecko_work.working_dir) args = [] if stability: help_text = mach.wpt_update("--help").decode("utf8") if "--stability " in help_text: args.extend(["--stability", "wpt-sync Bug %s" % self.bug]) else: args.append("--update-intermittent") args.extend(log_files) logger.debug("Updating metadata") output = mach.wpt_update(*args) prefix = b"disabled:" disabled = [] for line in output.split(b"\n"): if line.startswith(prefix): disabled.append(line[len(prefix):].decode("utf8", "replace").strip()) if gecko_work.is_dirty(untracked_files=True, path=meta_path): self.ensure_metadata_commit() gecko_work.git.add(meta_path, all=True) self._commit_metadata() return disabled @mut() def try_notify(self, force: bool = False) -> None: newrelic.agent.record_custom_event("try_notify", params={ "sync_bug": self.bug, "sync_pr": self.pr }) if self.results_notified and not force: return if not self.bug: logger.error("Sync for PR %s has no associated bug" % self.pr) return if not self.affected_tests(): logger.debug("PR %s doesn't have affected tests so skipping results notification" % self.pr) newrelic.agent.record_custom_event("try_notify_no_affected", params={ "sync_bug": self.bug, "sync_pr": self.pr }) return logger.info("Trying to generate results notification for PR %s" % self.pr) results = notify.results.for_sync(self) if not results: # TODO handle errors here better, perhaps logger.error("Failed to get results notification for PR %s" % self.pr) newrelic.agent.record_custom_event("try_notify_failed", params={ "sync_bug": self.bug, "sync_pr": self.pr }) return message, truncated = notify.msg.for_results(results) with env.bz.bug_ctx(self.bug) as bug: if truncated: bug.add_attachment(data=message.encode("utf8"), file_name="wpt-results.md", summary="Notable wpt changes", is_markdown=True, comment=truncated) else: env.bz.comment(self.bug, message, is_markdown=True) bugs = notify.bugs.for_sync(self, results) notify.bugs.update_metadata(self, bugs) self.results_notified = True # type: ignore with SyncLock.for_process(self.process_name) as lock: assert isinstance(lock, SyncLock) for try_push in self.try_pushes(): with try_push.as_mut(lock): try_push.cleanup_logs() def reverts_syncs(self) -> set[DownstreamSync]: """Return a set containing the previous syncs reverted by this one, if any""" revert_re = re.compile(b"This reverts commit ([0-9A-Fa-f]+)") unreverted_commits = defaultdict(set) for commit in self.wpt_commits: if not commit.msg.startswith(b"Revert "): # If not everything is a revert then return return set() revert_shas = revert_re.findall(commit.msg) if len(revert_shas) == 0: return set() # Just use the first match for now sha = revert_shas[0].decode("ascii") try: # Reassign the hash here, in case a short hash was used for reverting. sha = str(self.git_wpt.rev_parse(sha)) except (ValueError, git.BadName): # Commit isn't in this repo (could be upstream) return set() pr = env.gh_wpt.pr_for_commit(sha) if pr is None: return set() sync = DownstreamSync.for_pr(self.git_gecko, self.git_wpt, pr) if sync is None: return set() assert isinstance(sync, DownstreamSync) if sync not in unreverted_commits: # Ensure we have the latest commits for the reverted sync with SyncLock.for_process(sync.process_name) as revert_lock: assert isinstance(revert_lock, SyncLock) with sync.as_mut(revert_lock): sync.update_wpt_commits() unreverted_commits[sync] = {item.sha1 for item in sync.wpt_commits} if sha in unreverted_commits[sync]: unreverted_commits[sync].remove(sha) # If the commit is not part of the sync, check if the PR was squashed and then reverted, # in that case all commits of the sync should be reverted. elif sha == env.gh_wpt.merge_sha(pr): unreverted_commits[sync] = set() rv = {sync for sync, unreverted in unreverted_commits.items() if not unreverted} return rv @entry_point("downstream") def new_wpt_pr(git_gecko: Repo, git_wpt: Repo, pr_data: Mapping[str, Any], raise_on_error: bool = True, repo_update: bool = True) -> None: """ Start a new downstream sync """ if pr_data["user"]["login"] == env.config["web-platform-tests"]["github"]["user"]: raise ValueError("Tried to create a downstream sync for a PR created " "by the wpt bot") if repo_update: update_repositories(git_gecko, git_wpt) pr_id = pr_data["number"] if DownstreamSync.for_pr(git_gecko, git_wpt, pr_id): return wpt_base = "origin/%s" % pr_data["base"]["ref"] with SyncLock("downstream", str(pr_id)) as lock: sync = DownstreamSync.new(lock, git_gecko, git_wpt, wpt_base, pr_id, pr_data["title"], pr_data["body"] or "") with sync.as_mut(lock): try: sync.update_commits() sync.update_github_check() except Exception as e: sync.error = e if raise_on_error: raise traceback.print_exc() logger.error(e) # Now wait for the status to change before we take any actions @entry_point("downstream") @mut("try_push", "sync") def try_push_complete(git_gecko, git_wpt, try_push, sync): if not try_push.taskgroup_id: logger.error("No taskgroup id set for try push") return if not try_push.status == "complete": # Ensure we don't have some old set of tasks tasks = try_push.tasks() if not tasks.complete(allow_unscheduled=True): logger.info("Try push %s is not complete" % try_push.treeherder_url) return logger.info("Try push %s is complete" % try_push.treeherder_url) try: if not tasks.validate(): try_push.infra_fail = True if len(sync.latest_busted_try_pushes()) > 5: message = ("Too many busted try pushes. " "Check the try results for infrastructure issues.") sync.error = message env.bz.comment(sync.bug, message) try_push.status = "complete" raise AbortError(message) elif len(tasks.failed_builds()): message = ("Try push had build failures") sync.error = message env.bz.comment(sync.bug, message) try_push.status = "complete" try_push.infra_fail = True raise AbortError(message) else: logger.info(f"Try push {try_push!r} for PR {sync.pr} complete") disabled = [] if tasks.has_failures(): if sync.affected_tests(): log_files = [] wpt_tasks = try_push.download_logs(tasks.wpt_tasks) for task in wpt_tasks: for run in task.get("status", {}).get("runs", []): log = run.get("_log_paths", {}).get("wptreport.json") if log: log_files.append(log) if not log_files: raise ValueError("No log files found for try push %r" % try_push) disabled = sync.update_metadata(log_files, stability=try_push.stability) else: env.bz.comment(sync.bug, ("The PR was not expected to affect any tests, " "but the try push wasn't a success. " "Check the try results for infrastructure " "issues")) # TODO: consider marking the push an error here so that we can't # land without manual intervention if try_push.stability and disabled: logger.info("The following tests were disabled:\n%s" % "\n".join(disabled)) # TODO notify relevant people about test expectation changes, stability env.bz.comment(sync.bug, ("The following tests were disabled " "based on stability try push:\n %s" % "\n".join(disabled))) try_push.status = "complete" sync.next_try_push() finally: sync.update_github_check() else: sync.next_try_push() sync.update_github_check() if sync.metadata_commit is not None and len(sync.gecko_commits) == 1: # Apparently we only have a metadata commit and the actual change got rebased away # In this case the metadata commit is probably wrong, # and we just want to skip this sync. sync.skip = True if sync.landable_status == LandableStatus.ready: sync.try_notify() @entry_point("downstream") @mut("sync") def update_pr(git_gecko: Repo, git_wpt: Repo, sync: DownstreamSync, action: str, merge_sha: str, base_sha: str, merged_by: str | None = None, ) -> None: try: if action == "closed" and not merge_sha: sync.pr_status = "closed" # type: ignore if sync.bug: env.bz.set_status(sync.bug, "RESOLVED", "INVALID") sync.finish() elif action == "closed": # We are storing the wpt base as a reference sync.data["wpt-base"] = base_sha sync.next_try_push() elif action == "reopened" or action == "open": sync.status = "open" # type: ignore sync.pr_status = "open" # type: ignore sync.next_try_push() assert sync.bug is not None if sync.bug: status = env.bz.get_status(sync.bug) if status is not None and status[0] == "RESOLVED": env.bz.set_status(sync.bug, "REOPENED") sync.update_github_check() except Exception as e: sync.error = e raise