sync/landing.py (1,123 lines of code) (raw):

from __future__ import annotations import os import shutil from collections import defaultdict import enum import git from celery.exceptions import OperationalError from . import bug from . import bugcomponents from . import commit as sync_commit from . import downstream from . import gitutils from . import log from . import tree from . import load from . import repos from . import trypush from . import upstream from .base import entry_point from .commit import GeckoCommit, first_non_merge from .env import Environment from .gitutils import update_repositories from .lock import SyncLock, constructor, mut from .errors import AbortError, RetryableError from .projectutil import Mach from .repos import cinnabar, pygit2_get from .sync import LandableStatus, SyncProcess from typing import Any, IO, List, Tuple, Union, cast, TYPE_CHECKING from git.repo.base import Repo if TYPE_CHECKING: from sync.base import ProcessName from sync.commit import Commit, WptCommit from sync.downstream import DownstreamSync from sync.gh import AttrDict from sync.trypush import TryPush, TryPushTasks from sync.upstream import UpstreamSync LandableCommits = List[Tuple[int, Union[DownstreamSync, UpstreamSync], List[WptCommit]]] env = Environment() logger = log.get_logger(__name__) class SyncPoint: def __init__(self, data: Any | None = None) -> None: self._items: dict[str, str] = {} if data is not None: self._items.update(data) def __getitem__(self, key: str) -> str: return self._items[key] def __setitem__(self, key: str, value: str) -> None: self._items[key] = value def load(self, fp): with open(fp, "rb") as f: self.loads(f) def loads(self, data: bytes) -> None: for line in data.split(b"\n"): if line: key, value = (item.decode("utf8") for item in line.split(b": ", 1)) self._items[key] = value def dump(self, fp: IO[bytes]) -> None: fp.write(self.dumps().encode("utf8") + b"\n") def dumps(self) -> str: return "\n".join(f"{key}: {value}" for key, value in self._items.items()) @enum.unique class TryPushResult(enum.Enum): success = 0 acceptable_failures = 1 infra_fail = 2 too_many_failures = 3 pending = 4 def is_failure(self) -> bool: return self in (TryPushResult.infra_fail, TryPushResult.too_many_failures) def is_ok(self) -> bool: return self in (TryPushResult.success, TryPushResult.acceptable_failures) class LandingSync(SyncProcess): sync_type = "landing" obj_id = "bug" statuses = ("open", "complete") status_transitions = [("open", "complete"), ("complete", "open")] def __init__(self, git_gecko: Repo, git_wpt: Repo, process_name: ProcessName) -> None: super().__init__(git_gecko, git_wpt, process_name) self._unlanded_gecko_commits = None @classmethod @constructor(lambda args: ("landing", None)) def new(cls, lock: SyncLock, git_gecko: Repo, git_wpt: Repo, wpt_base: str, wpt_head: str, bug: int | None = None, ) -> LandingSync: # There is some chance here we create a bug but never create the branch. # Probably need something to clean up orphan bugs # The gecko branch is a new one based on master gecko_base = cls.gecko_integration_branch() gecko_head = cls.gecko_integration_branch() if bug is None: bug = env.bz.new("Update web-platform-tests to %s" % wpt_head, "", "Testing", "web-platform-tests", whiteboard="[wptsync landing]") return super().new(lock, git_gecko, git_wpt, gecko_base, gecko_head, wpt_base=wpt_base, wpt_head=wpt_head, bug=bug) @classmethod def has_metadata(cls, message: bytes) -> bool: required_keys = ["wpt-head", "wpt-type"] metadata = sync_commit.get_metadata(message) return (all(item in metadata for item in required_keys) and metadata.get("wpt-type") == "landing") def unlanded_gecko_commits(self): """Get a list of gecko commits that correspond to commits which have landed on the gecko integration branch, but are not yet merged into the upstream commit we are updating to. There are two possible sources of such commits: * Unlanded PRs. These correspond to upstream syncs with status of "open" * Gecko PRs that landed between the wpt commit that we are syncing to and latest upstream master. :return: List of commits in the order in which they originally landed in gecko""" if self._unlanded_gecko_commits is None: commits = [] def on_integration_branch(commit): # Calling this continually is O(N*M) where N is the number of unlanded commits # and M is the average depth of the commit in the gecko tree # If we need a faster implementation one approach would be to store all the # commits not on the integration branch and check if this commit is in that set return self.git_gecko.is_ancestor(commit, self.git_gecko.rev_parse( self.gecko_integration_branch())) # All the commits from unlanded upstream syncs that are reachable from the # integration branch unlanded_syncs = set() for status in ["open", "wpt-merged"]: unlanded_syncs |= set(upstream.UpstreamSync.load_by_status(self.git_gecko, self.git_wpt, status)) for sync in unlanded_syncs: branch_commits = [commit.sha1 for commit in sync.gecko_commits if on_integration_branch(commit)] if branch_commits: logger.info("Commits from unlanded sync for bug %s (PR %s) will be reapplied" % (sync.bug, sync.pr)) commits.extend(branch_commits) # All the gecko commits that landed between the base sync point and master # We take the base here and then remove upstreamed commits that we are landing # as we reach them so that we can get the right diffs for the other PRs unlanded_commits = self.git_wpt.iter_commits("%s..origin/master" % self.wpt_commits.base.sha1) seen_bugs = set() for commit in unlanded_commits: wpt_commit = sync_commit.WptCommit(self.git_wpt, commit) gecko_commit = wpt_commit.metadata.get("gecko-commit") if gecko_commit: git_sha = cinnabar(self.git_gecko).hg2git(gecko_commit) commit = sync_commit.GeckoCommit(self.git_gecko, git_sha) bug_number = bug.bug_number_from_url(commit.metadata.get("bugzilla-url")) if on_integration_branch(commit): if bug_number and bug_number not in seen_bugs: logger.info("Commits from landed sync for bug %s will be reapplied" % bug_number) seen_bugs.add(bug_number) commits.append(commit.sha1) commits = set(commits) # Order the commits according to the order in which they landed in gecko ordered_commits = [] for commit in self.git_gecko.iter_commits(self.gecko_integration_branch(), paths=env.config["gecko"]["path"]["wpt"]): if commit.hexsha in commits: ordered_commits.append(commit.hexsha) commits.remove(commit.hexsha) if not commits: break self._unlanded_gecko_commits = list(reversed( [sync_commit.GeckoCommit(self.git_gecko, item) for item in ordered_commits])) return self._unlanded_gecko_commits def has_metadata_for_sync(self, sync: DownstreamSync) -> bool: for item in reversed(list(self.gecko_commits)): if (item.metadata.get("wpt-pr") == sync.pr and item.metadata.get("wpt-type") == "metadata"): return True return False @property def landing_commit(self) -> Any | None: head = self.gecko_commits.head if (head.metadata.get("wpt-type") == "landing" and head.metadata.get("wpt-head") == self.wpt_commits.head.sha1): return head return None @mut() def add_pr(self, pr_id: int, sync: DownstreamSync | UpstreamSync, wpt_commits: list[WptCommit], copy: bool = True, prev_wpt_head: str | None = None, ) -> Commit | None: if len(wpt_commits) > 1: assert all(item.pr() == pr_id for item in wpt_commits) # Assume we can always use the author of the first commit author = first_non_merge(wpt_commits).author git_work_wpt = self.wpt_worktree.get() git_work_gecko = self.gecko_worktree.get() pr = env.gh_wpt.get_pull(int(pr_id)) metadata = { "wpt-pr": pr_id, "wpt-commits": ", ".join(item.sha1 for item in wpt_commits) } message = b"""Bug %d [wpt PR %d] - %s, a=testonly Automatic update from web-platform-tests\n%s """ message = message % ((sync and sync.bug) or self.bug, pr.number, pr.title.encode("utf8"), b"\n--\n".join(item.msg for item in wpt_commits) + b"\n--") message = sync_commit.try_filter(message) upstream_changed = set() diffs = wpt_commits[-1].commit.diff(wpt_commits[0].commit.parents[0]) for diff in diffs: new_path = diff.b_path if new_path: upstream_changed.add(new_path) logger.info("Upstream files changed:\n%s" % "\n".join(sorted(upstream_changed))) # If this is originally an UpstreamSync and no new changes were introduced to the GH PR # then we can safely skip and not need to re-apply these changes. Compare the hash of # the upstreamed gecko commits against the final hash in the PR. if isinstance(sync, upstream.UpstreamSync): commit_is_local = False pr_head = sync.pr_head if sync.wpt_commits.head.sha1 == pr_head: commit_is_local = True else: # Check if we rebased locally without pushing the rebase; # this is a thing we used to do to check the PR would merge try: ref_log = sync.git_wpt.references[sync.branch_name].log() except Exception: # If we can't read the reflog just skip this pass else: commit_is_local = any(entry.newhexsha == pr_head for entry in ref_log) if commit_is_local: logger.info("Upstream sync doesn't introduce any gecko changes") return None if copy: commit = self.copy_pr(git_work_gecko, git_work_wpt, pr, wpt_commits, message, author, metadata) else: commit = self.move_pr(git_work_gecko, git_work_wpt, pr, wpt_commits, message, author, prev_wpt_head, metadata) if commit is not None: self.gecko_commits.head = commit # type: ignore return commit @mut() def copy_pr(self, git_work_gecko, git_work_wpt, pr, wpt_commits, message, author, metadata): # Ensure we have anything in a wpt submodule git_work_wpt.git.submodule("update", "--init", "--recursive") dest_path = os.path.join(git_work_gecko.working_dir, env.config["gecko"]["path"]["wpt"]) src_path = git_work_wpt.working_dir # Specific paths that should be re-checked out keep_paths = {"LICENSE", "resources/testdriver_vendor.js"} # file names that are ignored in any part of the tree ignore_files = {".git"} logger.info("Setting wpt HEAD to %s" % wpt_commits[-1].sha1) git_work_wpt.head.reference = wpt_commits[-1].commit git_work_wpt.head.reset(index=True, working_tree=True) # First remove all files so we handle deletion correctly shutil.rmtree(dest_path) ignore_paths = defaultdict(set) for name in keep_paths: src, name = os.path.split(os.path.join(src_path, name)) ignore_paths[src].add(name) def ignore_names(src, names): rv = [] for item in names: if item in ignore_files: rv.append(item) if src in ignore_paths: rv.extend(ignore_paths[src]) return rv shutil.copytree(src_path, dest_path, ignore=ignore_names) # Now re-checkout the files we don't want to change # checkout-index allows us to ignore files that don't exist git_work_gecko.git.checkout_index(*(os.path.join(env.config["gecko"]["path"]["wpt"], item) for item in keep_paths), force=True, quiet=True) allow_empty = False if not git_work_gecko.is_dirty(untracked_files=True): logger.info("PR %s didn't add any changes" % pr.number) allow_empty = True git_work_gecko.git.add(env.config["gecko"]["path"]["wpt"], no_ignore_removal=True) message = sync_commit.Commit.make_commit_msg(message, metadata) commit = git_work_gecko.index.commit(message=message, author=git.Actor._from_string(author)) logger.debug("Gecko files changed: \n%s" % "\n".join(list(commit.stats.files.keys()))) gecko_commit = sync_commit.GeckoCommit(self.git_gecko, commit.hexsha, allow_empty=allow_empty) return gecko_commit @mut() def move_pr(self, git_work_gecko: Repo, git_work_wpt: Repo, pr: AttrDict, wpt_commits: list[WptCommit], message: bytes, author: str, prev_wpt_head: str, metadata: dict[str, str], ) -> Commit | None: if prev_wpt_head is None: if wpt_commits[-1].is_merge: base = wpt_commits[-1].sha1 + "^" else: base = wpt_commits[0].sha1 + "^" else: base = self.git_wpt.git.merge_base(prev_wpt_head, wpt_commits[-1].sha1) head = sync_commit.GeckoCommit(self.git_gecko, git_work_gecko.head.commit) if (head.is_downstream and head.metadata.get("wpt-pr") == str(pr.number)): return None revish = f"{base}..{wpt_commits[-1].sha1}" logger.info("Moving wpt commits %s" % revish) return sync_commit.move_commits(self.git_wpt, revish, message, git_work_gecko, dest_prefix=env.config["gecko"]["path"]["wpt"], amend=False, metadata=metadata, rev_name="pr-%s" % pr.number, author=first_non_merge(wpt_commits).author, exclude={"resources/testdriver_vendor.js"}, allow_empty=True) @mut() def reapply_local_commits(self, gecko_commits_landed): # The local commits to apply are everything that hasn't been landed at this # point in the process commits = [item for item in self.unlanded_gecko_commits() if item.canonical_rev not in gecko_commits_landed] landing_commit = self.gecko_commits[-1] git_work_gecko = self.gecko_worktree.get() logger.debug("Reapplying commits: %s" % " ".join(item.canonical_rev for item in commits)) if not commits: return already_applied = landing_commit.metadata.get("reapplied-commits") if already_applied: already_applied = [item.strip() for item in already_applied.split(",")] else: already_applied = [] already_applied_set = set(already_applied) unapplied_gecko_commits = [item for item in commits if item.canonical_rev not in already_applied_set] try: for i, commit in enumerate(unapplied_gecko_commits): def msg_filter(_): msg = landing_commit.msg reapplied_commits = (already_applied + [commit.canonical_rev for commit in commits[:i + 1]]) metadata = {"reapplied-commits": ", ".join(reapplied_commits)} return msg, metadata logger.info(f"Reapplying {commit.sha1} - {commit.msg}") # Passing in a src_prefix here means that we only generate a patch for the # part of the commit that affects wpt, but then we need to undo it by adding # the same dest prefix commit = commit.move(git_work_gecko, msg_filter=msg_filter, src_prefix=env.config["gecko"]["path"]["wpt"], dest_prefix=env.config["gecko"]["path"]["wpt"], three_way=True, amend=True) if commit is None: break except AbortError as e: err_msg = ( f"Landing wpt failed because reapplying commits failed:\n{e.message}" ) env.bz.comment(self.bug, err_msg) raise AbortError(err_msg) @mut() def add_metadata(self, sync: DownstreamSync) -> None: logger.info("Adding metadata from downstream sync") if self.has_metadata_for_sync(sync): logger.info("Metadata already applied for PR %s" % sync.pr) return if not sync.metadata_commit or sync.metadata_commit.is_empty(): logger.info("No metadata commit available for PR %s" % sync.pr) return worktree = self.gecko_worktree.get() success = gitutils.cherry_pick(worktree, sync.metadata_commit.sha1) if not success: logger.info("Cherry-pick failed, trying again with only test-related changes") # Try to reset all metadata files that aren't related to an affected test. affected_metadata = {os.path.join(env.config["gecko"]["path"]["meta"], item) + ".ini" for items in sync.affected_tests_readonly.values() for item in items} checkout = [] status = gitutils.status(worktree) for head_path, data in status.items(): if data["code"] not in {"DD", "AU", "UD", "UA", "DU", "AA", "UU"}: # Only try to reset merge conflicts continue path = data["rename"] if data["rename"] else head_path if path not in affected_metadata: logger.debug("Resetting changes to %s" % head_path) if data["code"] == "DU": # Files that were deleted in master should just be removed worktree.git.rm(head_path) else: checkout.append(head_path) logger.debug("Resetting changes to %s" % " ".join(checkout)) try: worktree.git.checkout("HEAD", "--", *checkout) # Now try to commit again worktree.git.commit(c=sync.metadata_commit.sha1, no_edit=True) success = True except git.GitCommandError as e: if gitutils.handle_empty_commit(worktree, e): return if sync.skip: return success = False if not success: try: logger.info("Cherry-pick had merge conflicts trying to automatically resolve") status = gitutils.status(worktree) for head_path, data in status.items(): if data["code"] in {"DD", "UD", "DU"}: # Deleted by remote or local # Could do better here and have the mergetool handle this case logger.info("Removing %s which was deleted somewhere" % head_path) worktree.git.rm(head_path) if data["code"] in {"UA", "AU"}: logger.info("Adding %s which was added somewhere" % head_path) worktree.git.add(head_path) logger.info("Running mergetool") worktree.git.mergetool(tool="metamerge", env={"MOZBUILD_STATE_PATH": repos.Gecko.get_state_path(env.config, worktree.working_dir)}) worktree.git.commit(c=sync.metadata_commit.sha1, no_edit=True) worktree.git.clean(f=True) success = True except git.GitCommandError as e: if gitutils.handle_empty_commit(worktree, e): return if sync.skip: return logger.error("Failed trying to use mergetool to resolve conflicts") raise metadata_commit = sync_commit.GeckoCommit(worktree, worktree.head.commit) if metadata_commit.msg.startswith(b"Bug None"): # If the metadata commit didn't get a valid bug number for some reason, # we want to replace the placeholder bug number with the # either the sync or landing bug number, otherwise the push will be # rejected bug_number = sync.bug or self.bug new_message = b"Bug %s%s" % (str(bug_number).encode("utf8"), metadata_commit.msg[len(b"Bug None"):]) sync_commit.create_commit(worktree, new_message, amend=True) @mut() def apply_prs(self, prev_wpt_head: str, landable_commits: LandableCommits) -> None: """Main entry point to setting the commits for landing. For each upstream PR we want to create a separate commit in the gecko repository so that we are preserving a useful subset of the history. We also want to prevent divergence from upstream. So for each PR that landed upstream since our last sync, we take the following steps: 1) Copy the state of upstream at the commit where the PR landed over to the gecko repo 2) Reapply any commits that have been made to gecko on the integration branch but which are not yet landed upstream on top of the PR 3) Apply any updated metadata from the downstream sync for the PR. """ last_pr = None has_metadata = False have_prs = set() # Find the last gecko commit containing a PR if len(self.gecko_commits): head_commit = self.gecko_commits.head if TYPE_CHECKING: head = cast(GeckoCommit, head_commit) else: head = head_commit if head.is_landing: return for commit in list(self.gecko_commits): assert isinstance(commit, GeckoCommit) if commit.metadata.get("wpt-pr") is not None: last_pr = int(commit.metadata["wpt-pr"]) has_metadata = commit.metadata.get("wpt-type") == "metadata" have_prs.add(last_pr) pr_count_applied = len(have_prs) gecko_commits_landed = set() def update_gecko_landed(sync: DownstreamSync | UpstreamSync, commits: list[WptCommit]) -> None: if isinstance(sync, upstream.UpstreamSync): for commit in commits: gecko_commit = commit.metadata.get("gecko-commit") if gecko_commit: gecko_commits_landed.add(gecko_commit) unapplied_commits = [] pr_count_upstream_empty = 0 last_applied_seen = last_pr is None for i, (pr, sync, commits) in enumerate(landable_commits): if last_applied_seen: unapplied_commits.append((i, (pr, sync, commits, False))) else: prev_wpt_head = commits[-1].sha1 try: have_prs.remove(pr) except KeyError: if isinstance(sync, downstream.DownstreamSync): # This could be wrong if the changes already landed in gecko for some reason raise AbortError("Expected an existing gecko commit for PR %s, " "but not found" % (pr,)) pr_count_upstream_empty += 1 continue if pr == last_pr: last_applied_seen = True if not has_metadata: unapplied_commits.append((i, (pr, sync, commits, True))) update_gecko_landed(sync, commits) if have_prs: raise AbortError("Found unexpected gecko commit for PRs %s" % (", ".join(str(item) for item in have_prs),)) pr_count_unapplied = len(unapplied_commits) if pr_count_applied and not has_metadata: # If we have seen the commit but not the metadata it will both be in # have_prs and unapplied_commits, so avoid double counting pr_count_unapplied -= 1 if (pr_count_applied + pr_count_upstream_empty + pr_count_unapplied != len(landable_commits)): raise AbortError("PR counts don't match; got %d applied, %d unapplied %d upstream" "(total %s), expected total %d" % (pr_count_applied, pr_count_unapplied, pr_count_upstream_empty, pr_count_unapplied + pr_count_applied + pr_count_upstream_empty, len(landable_commits))) for i, (pr, sync, commits, meta_only) in unapplied_commits: logger.info("Applying PR %i of %i" % (i + 1, len(landable_commits))) update_gecko_landed(sync, commits) # If copy is set then we copy the commits and reapply in-progress upstream # syncs. This is currently always disabled, but the intent was to do this for # the first commit to ensure that the possible drift from upstream was limited. # However there were some difficulties reapplying all the right commits, so it's # disabled until this is worked out. # To reenable it change the below line to # copy = i == 0 copy = False pr_commit: Commit | None = None if not meta_only: # If we haven't applied it before then create the initial commit pr_commit = self.add_pr(pr, sync, commits, prev_wpt_head=prev_wpt_head, copy=copy) prev_wpt_head = commits[-1].sha1 if pr_commit: if copy: self.reapply_local_commits(gecko_commits_landed) if isinstance(sync, downstream.DownstreamSync): self.add_metadata(sync) @mut() def update_landing_commit(self) -> GeckoCommit: git_work = self.gecko_worktree.get() if not self.landing_commit: metadata = { "wpt-type": "landing", "wpt-head": self.wpt_commits.head.sha1 } msg = sync_commit.Commit.make_commit_msg( b"""Bug %s - [wpt-sync] Update web-platform-tests to %s, a=testonly MANUAL PUSH: wpt sync bot """ % (str(self.bug).encode("utf8"), self.wpt_commits.head.sha1.encode("utf8")), metadata) sync_commit.create_commit(git_work, msg, allow_empty=True) else: sync_commit.create_commit(git_work, self.landing_commit.msg, allow_empty=True, amend=True, no_edit=True) rv = self.gecko_commits[-1] assert isinstance(rv, GeckoCommit) return rv @mut() def update_bug_components(self) -> None: renames = self.wpt_renames() if renames is None: return gecko_work = self.gecko_worktree.get() mozbuild_path = bugcomponents.mozbuild_path(gecko_work) if not os.path.exists(mozbuild_path): return bugcomponents.update(gecko_work, renames) if gecko_work.is_dirty(path=mozbuild_path): gecko_work.git.add(mozbuild_path, all=True) self.update_landing_commit() @mut() def update_metadata(self, log_files: list, update_intermittents: bool = False) -> None: """Update the web-platform-tests metadata based on the logs generated in a try run. :param log_files: List of paths to the raw logs from the try run """ # TODO: this shares a lot of code with downstreaming meta_path = env.config["gecko"]["path"]["meta"] gecko_work = self.gecko_worktree.get() mach = Mach(gecko_work.working_dir) logger.info("Updating metadata from %s logs" % len(log_files)) args = ["--full"] if update_intermittents: args.append("--update-intermittent") args.extend(log_files) mach.wpt_update(*args) if gecko_work.is_dirty(untracked_files=True, path=meta_path): gecko_work.git.add(meta_path, all=True) self.update_landing_commit() gecko_work.git.reset(hard=True) @mut() def update_sync_point(self, sync_point: SyncPoint) -> None: """Update the in-tree record of the last sync point.""" new_sha1 = self.wpt_commits.head.sha1 if sync_point["upstream"] == new_sha1: return sync_point["upstream"] = new_sha1 gecko_work = self.gecko_worktree.get() with open(os.path.join(gecko_work.working_dir, env.config["gecko"]["path"]["meta"], "mozilla-sync"), "wb") as f: sync_point.dump(f) if gecko_work.is_dirty(): gecko_work.index.add([os.path.join(env.config["gecko"]["path"]["meta"], "mozilla-sync")]) self.update_landing_commit() @mut() def next_try_push(self, retry: bool = False) -> TryPush | None: if self.status != "open": return None latest_try_push = self.latest_try_push stability = False if latest_try_push: if latest_try_push.status != "complete": return None elif latest_try_push.stability and not retry: return None if retry: stability = latest_try_push.stability if latest_try_push is not None else False else: stability = (latest_try_push is not None and not latest_try_push.infra_fail) return trypush.TryPush.create( self._lock, self, hacks=False, stability=stability, rebuild_count=0, try_cls=trypush.TryFuzzyCommit, disable_target_task_filter=True, artifact=not stability, queries=["web-platform-tests !ccov !shippable", "web-platform-tests linux-32 shippable", "web-platform-tests mac !debug shippable"]) def try_result(self, try_push: TryPush | None = None, tasks: TryPushTasks | None = None) -> TryPushResult: """Determine whether a try push has infra failures, or an acceptable level of test passes for the current build""" if try_push is None: try_push = self.latest_try_push if try_push is None: raise ValueError("No try push found") target_success_rate = 0.5 if not try_push.stability else 0.8 if try_push.infra_fail and not try_push.accept_failures: return TryPushResult.infra_fail if tasks is None: tasks = try_push.tasks() if tasks is None: # This can happen if the taskgroup_id is not yet set return TryPushResult.pending if not tasks.complete(allow_unscheduled=True): return TryPushResult.pending if tasks.success(): return TryPushResult.success if tasks.failed_builds() and not try_push.accept_failures: return TryPushResult.infra_fail if (tasks.failure_limit_exceeded(target_success_rate) and not try_push.accept_failures): return TryPushResult.too_many_failures return TryPushResult.acceptable_failures def push(landing: LandingSync) -> None: """Push from git_work_gecko to inbound.""" success = False landing_tree = env.config["gecko"]["landing"] old_head = None err = None assert landing.bug is not None while not success: try: logger.info("Rebasing onto %s" % landing.gecko_integration_branch()) landing.gecko_rebase(landing.gecko_integration_branch()) except AbortError as e: logger.error(e) env.bz.comment(landing.bug, str(e)) raise e if old_head == landing.gecko_commits.head.sha1: err = ("Landing push failed and rebase didn't change head:%s" % ("\n%s" % err if err else "")) logger.error(err) env.bz.comment(landing.bug, err) raise AbortError(err) old_head = landing.gecko_commits.head.sha1 if not tree.is_open(landing_tree): logger.info("%s is closed" % landing_tree) raise RetryableError(AbortError("Tree is closed")) try: logger.info("Pushing landing") push_info = landing.git_gecko.remotes.mozilla.push( "{}:{}".format(landing.branch_name, landing.gecko_integration_branch().split("/", 1)[1]) ) for item in push_info: if item.flags & item.ERROR: raise AbortError(item.summary) except git.GitCommandError as e: changes = landing.git_gecko.remotes.mozilla.fetch() err = "Pushing update to remote failed:\n%s" % e if not changes: logger.error(err) env.bz.comment(landing.bug, err) raise AbortError(err) else: success = True # The landing is marked as finished when it reaches central def unlanded_with_type(git_gecko, git_wpt, wpt_head, prev_wpt_head): pr_commits = unlanded_wpt_commits_by_pr(git_gecko, git_wpt, wpt_head or prev_wpt_head, "origin/master") for pr, commits in pr_commits: if pr is None: status = LandableStatus.no_pr else: sync = load.get_pr_sync(git_gecko, git_wpt, pr, log=False) if sync is None: status = LandableStatus.no_sync elif isinstance(sync, upstream.UpstreamSync): status = LandableStatus.upstream else: assert isinstance(sync, downstream.DownstreamSync) status = sync.landable_status yield (pr, commits, status) def load_sync_point(git_gecko: Repo, git_wpt: Repo) -> SyncPoint: """Read the last sync point from the batch sync process""" pygit2_repo = pygit2_get(git_gecko) integration_sha = pygit2_repo.revparse_single(LandingSync.gecko_integration_branch()).id blob_id = pygit2_repo[integration_sha].tree["testing/web-platform/meta/mozilla-sync"].id mozilla_data = pygit2_repo[blob_id].data sync_point = SyncPoint() sync_point.loads(mozilla_data) return sync_point def unlanded_wpt_commits_by_pr(git_gecko: Repo, git_wpt: Repo, prev_wpt_head: str, wpt_head: str = "origin/master", ) -> list[tuple[int | None, list[WptCommit]]]: revish = f"{prev_wpt_head}..{wpt_head}" commits_by_pr: list[tuple[int | None, list[WptCommit]]] = [] index_by_pr: dict[int | None, int] = {} for commit in git_wpt.iter_commits(revish, reverse=True, first_parent=True): wpt_commit = sync_commit.WptCommit(git_wpt, commit.hexsha) pr = wpt_commit.pr() extra_commits = [] if pr not in index_by_pr: pr_data: tuple[int | None, list[WptCommit]] = (pr, []) # If we have a merge commit, also get the commits merged in if len(commit.parents) > 1: merged_revish = f"{commit.parents[0].hexsha}..{commit.hexsha}" for merged_commit in git_wpt.iter_commits(merged_revish, reverse=True): if merged_commit.hexsha != commit.hexsha: wpt_commit = sync_commit.WptCommit(git_wpt, merged_commit.hexsha) if wpt_commit.pr() == pr: extra_commits.append(wpt_commit) else: idx = index_by_pr[pr] pr_data = commits_by_pr.pop(idx) assert pr_data[0] == pr index_by_pr = {key: (value if value < idx else value - 1) for key, value in index_by_pr.items()} for c in extra_commits + [wpt_commit]: pr_data[1].append(c) commits_by_pr.append(pr_data) index_by_pr[pr] = len(commits_by_pr) - 1 return commits_by_pr def landable_commits(git_gecko: Repo, git_wpt: Repo, prev_wpt_head: str, wpt_head: str | None = None, include_incomplete: bool = False ) -> tuple[str, LandableCommits] | None: """Get the list of commits that are able to land. :param prev_wpt_head: The sha1 of the previous wpt commit landed to gecko. :param wpt_head: The sha1 of the latest possible commit to land to gecko, or None to use the head of the master branch :param include_incomplete: By default we don't attempt to land anything that hasn't completed a metadata update. This flag disables that and just lands everything up to the specified commit.""" if wpt_head is None: wpt_head = "origin/master" pr_commits = unlanded_wpt_commits_by_pr(git_gecko, git_wpt, prev_wpt_head, wpt_head) landable_commits = [] for pr, commits in pr_commits: last = False if not pr: # Assume this was some trivial fixup: continue first_commit = first_non_merge(commits) if not first_commit: # If we only have a merge commit just use that; it doesn't come from gecko anyway first_commit = commits[-1] def upstream_sync(bug_number): syncs = upstream.UpstreamSync.for_bug(git_gecko, git_wpt, bug_number, flat=True) for sync in syncs: if sync.merge_sha == commits[-1].sha1 and not sync.wpt_commits: # TODO: this shouldn't be mutating here with SyncLock("upstream", None) as lock: assert isinstance(lock, SyncLock) with sync.as_mut(lock): # If we merged with a merge commit, the set of commits # here will be empty sync.set_wpt_base(sync_commit.WptCommit(git_wpt, commits[0].sha1 + "~").sha1) # Only check the first commit since later ones could be added in the PR sync_revs = {item.canonical_rev for item in sync.upstreamed_gecko_commits} if any(commit.metadata.get("gecko-commit") in sync_revs for commit in commits): break else: sync = None return sync sync = None sync = load.get_pr_sync(git_gecko, git_wpt, pr) if isinstance(sync, downstream.DownstreamSync): if sync and "affected-tests" in sync.data and sync.data["affected-tests"] is None: del sync.data["affected-tests"] if not include_incomplete: if not sync: # TODO: schedule a downstream sync for this pr logger.info("PR %s has no corresponding sync" % pr) last = True elif (isinstance(sync, downstream.DownstreamSync) and sync.landable_status not in (LandableStatus.ready, LandableStatus.skip)): logger.info(f"PR {pr}: {sync.landable_status.reason_str()}") last = True if last: break assert isinstance(sync, (upstream.UpstreamSync, downstream.DownstreamSync)) landable_commits.append((pr, sync, commits)) if not landable_commits: logger.info("No new commits are landable") return None wpt_head = landable_commits[-1][2][-1].sha1 logger.info("Landing up to commit %s" % wpt_head) return wpt_head, landable_commits def current(git_gecko: Repo, git_wpt: Repo) -> LandingSync | None: landings = LandingSync.load_by_status(git_gecko, git_wpt, "open") if len(landings) > 1: raise ValueError("Multiple open landing branches") if landings: landing = landings.pop() assert isinstance(landing, LandingSync) return landing return None @entry_point("landing") def wpt_push(git_gecko: Repo, git_wpt: Repo, commits: list[str], create_missing: bool = True) -> None: prs = set() for commit_sha in commits: # This causes the PR to be recorded as a note commit = sync_commit.WptCommit(git_wpt, commit_sha) pr = commit.pr() if pr is not None and not upstream.UpstreamSync.has_metadata(commit.msg): prs.add(pr) if create_missing: for pr in prs: from . import update sync = load.get_pr_sync(git_gecko, git_wpt, pr) if not sync: # If we don't have a sync for this PR create one # It's easiest just to go via the GH API here pr_data = env.gh_wpt.get_pull(pr) update.update_pr(git_gecko, git_wpt, pr_data) @entry_point("landing") def update_landing(git_gecko: Repo, git_wpt: Repo, prev_wpt_head: Any | None = None, new_wpt_head: Any | None = None, include_incomplete: bool = False, retry: bool = False, allow_push: bool = True, accept_failures: bool = False, ) -> LandingSync | None: """Create or continue a landing of wpt commits to gecko. :param prev_wpt_head: The sha1 of the previous wpt commit landed to gecko. :param wpt_head: The sha1 of the latest possible commit to land to gecko, or None to use the head of the master branch" :param include_incomplete: By default we don't attempt to land anything that hasn't completed a metadata update. This flag disables that and just lands everything up to the specified commit. :param retry: Create a new try push for the landing even if there's an existing one :param allow_push: Allow pushing to gecko if try is complete :param accept_failures: Don't fail if an existing try push has too many failures """ landing = current(git_gecko, git_wpt) sync_point = load_sync_point(git_gecko, git_wpt) with SyncLock("landing", None) as lock: assert isinstance(lock, SyncLock) if landing is None: update_repositories(git_gecko, git_wpt) if prev_wpt_head is None: prev_wpt_head = sync_point["upstream"] landable = landable_commits(git_gecko, git_wpt, prev_wpt_head, wpt_head=new_wpt_head, include_incomplete=include_incomplete) if landable is None: return None wpt_head, commits = landable landing = LandingSync.new(lock, git_gecko, git_wpt, prev_wpt_head, wpt_head) # Set the landing to block all the bugs that will land with it blocks = [sync.bug for (pr_, sync, commits_) in commits if isinstance(sync, downstream.DownstreamSync) and sync.bug is not None] assert landing.bug is not None with env.bz.bug_ctx(landing.bug) as bug: for bug_id in blocks: bug.add_blocks(bug_id) else: if prev_wpt_head and landing.wpt_commits.base.sha1 != prev_wpt_head: raise AbortError("Existing landing base commit %s doesn't match" "supplied previous wpt head %s" % (landing.wpt_commits.base.sha1, prev_wpt_head)) elif new_wpt_head and landing.wpt_commits.head.sha1 != new_wpt_head: raise AbortError("Existing landing head commit %s doesn't match" "supplied wpt head %s" % (landing.wpt_commits.head.sha1, new_wpt_head)) head = landing.gecko_commits.head if git_gecko.is_ancestor(head.commit, git_gecko.rev_parse( env.config["gecko"]["refs"]["central"])): logger.info("Landing reached central") with landing.as_mut(lock): landing.finish() return None elif git_gecko.is_ancestor(head.commit, git_gecko.rev_parse(landing.gecko_integration_branch())): logger.info("Landing is on inbound but not yet on central") return None landable = landable_commits(git_gecko, git_wpt, landing.wpt_commits.base.sha1, landing.wpt_commits.head.sha1, include_incomplete=include_incomplete) if landable is None: raise AbortError("No new commits are landable") wpt_head, commits = landable assert wpt_head == landing.wpt_commits.head.sha1 pushed = False with landing.as_mut(lock): if landing.latest_try_push is None: landing.apply_prs(prev_wpt_head, commits) landing.update_bug_components() landing.update_sync_point(sync_point) landing.next_try_push() elif retry: try: landing.gecko_rebase(landing.gecko_landing_branch()) except AbortError: message = record_rebase_failure(landing) raise AbortError(message) with landing.latest_try_push.as_mut(lock): landing.latest_try_push.status = "complete" # type: ignore landing.next_try_push(retry=True) else: try_push = landing.latest_try_push try_result = landing.try_result() if try_push.status == "complete" and (try_result.is_ok() or accept_failures): try: landing.gecko_rebase(landing.gecko_landing_branch()) except AbortError: message = record_rebase_failure(landing) raise AbortError(message) if landing.next_try_push() is None: push_to_gecko(git_gecko, git_wpt, landing, allow_push) pushed = True elif try_result == TryPushResult.pending: logger.info("Existing try push %s is waiting for try results" % try_push.treeherder_url) else: logger.info("Existing try push %s requires manual fixup" % try_push.treeherder_url) try_notify_downstream(commits, landing_is_complete=pushed) if pushed: retrigger() return landing def retrigger() -> None: try: # Avoid circular import from . import tasks tasks.retrigger.apply_async() except OperationalError: logger.warning("Failed to retrigger blocked syncs") @entry_point("landing") @mut('try_push', 'sync') def try_push_complete(git_gecko: Repo, git_wpt: Repo, try_push: TryPush, sync: LandingSync, allow_push: bool = True, accept_failures: bool = False, tasks: Any | None = None, ) -> None: """Run after all jobs in a try push are complete. This function handles updating the metadata based on the try push, or scheduling more jobs. In the case that the metadata has been updated successfully, the try push is marked as complete. If there's an error e.g. an infrastructure failure the try push is not marked as complete; user action is required to complete the handling of the try push (either by passing in accept_failures=True to indicate that the failure is not significant or by retyring the try push in which case the existing one will be marked as complete).""" if try_push.status == "complete": logger.warning("Called try_push_complete on a completed try push") return None if accept_failures: try_push.accept_failures = True # type: ignore if tasks is None: tasks = try_push.tasks() if tasks is None: logger.error("Taskgroup id is not yet set") return None try_result = sync.try_result(tasks=tasks) if try_result == TryPushResult.pending: logger.info("Try push results are pending") return None if not try_result == TryPushResult.success: if try_result.is_failure(): if try_result == TryPushResult.infra_fail: message = record_build_failures(sync, try_push) try_push.infra_fail = True # type: ignore raise AbortError(message) elif try_result == TryPushResult.too_many_failures and not try_push.stability: message = record_too_many_failures(sync, try_push) raise AbortError(message) if not try_push.stability: update_metadata(sync, try_push) else: retriggered = tasks.retriggered_wpt_states() if not retriggered: if try_result == TryPushResult.too_many_failures: record_too_many_failures(sync, try_push) try_push.status = "complete" # type: ignore return None num_new_jobs = tasks.retrigger_failures() logger.info(f"{num_new_jobs} new tasks scheduled on try for {sync.bug}") if num_new_jobs: assert sync.bug is not None env.bz.comment(sync.bug, ("Retriggered failing web-platform-test tasks on " "try before final metadata update.")) return None update_metadata(sync, try_push, tasks) try_push.status = "complete" # type: ignore if try_result == TryPushResult.infra_fail: record_infra_fail(sync, try_push) return None update_landing(git_gecko, git_wpt, allow_push=allow_push) def needinfo_users() -> list[str]: needinfo_users = [item.strip() for item in (env.config["gecko"]["needinfo"] .get("landing", "") .split(","))] return [item for item in needinfo_users if item] def record_failure(sync: LandingSync, log_msg: str, bug_msg: str, fixup_msg: Any | None = None) -> str: if fixup_msg is None: fixup_msg = "Run `wptsync landing` with either --accept-failures or --retry" logger.error(f"Bug {sync.bug}:{log_msg}\n{fixup_msg}") sync.error = log_msg # type: ignore assert sync.bug is not None with env.bz.bug_ctx(sync.bug) as bug: bug.add_comment(f"{bug_msg}\nThis requires fixup from a wpt sync admin.") bug.needinfo(*needinfo_users()) return log_msg def record_build_failures(sync, try_push): log_msg = f"build failures in try push {try_push.treeherder_url}" bug_msg = f"Landing failed due to build failures in try push {try_push.treeherder_url}" return record_failure(sync, log_msg, bug_msg) def record_too_many_failures(sync: LandingSync, try_push: TryPush) -> str: log_msg = f"too many test failures in try push {try_push.treeherder_url}" bug_msg = "Landing failed due to too many test failures in try push {}".format( try_push.treeherder_url) return record_failure(sync, log_msg, bug_msg) def record_infra_fail(sync, try_push): log_msg = "infra failures in try push %s. " % (try_push.treeherder_url) bug_msg = "Landing failed due to infra failures in try push {}.".format( try_push.treeherder_url) return record_failure(sync, log_msg, bug_msg) def record_rebase_failure(sync): log_msg = "rebase failed" bug_msg = "Landing failed due to conficts during rebase" fixup_msg = "Resolve the conflicts in the worktree and run `wptsync landing`" return record_failure(sync, log_msg, bug_msg, fixup_msg) def update_metadata(sync: LandingSync, try_push: TryPush, tasks: TryPushTasks | None = None) -> None: if tasks is None: tasks = try_push.tasks() if tasks is None: raise AbortError("Try push has no taskgroup id set") wpt_tasks = try_push.download_logs(tasks.wpt_tasks) log_files = [] for task in wpt_tasks: for run in task.get("status", {}).get("runs", []): log = run.get("_log_paths", {}).get("wptreport.json") if log: log_files.append(log) if not log_files: logger.warning("No log files found for try push %r" % try_push) sync.update_metadata(log_files, update_intermittents=True) def push_to_gecko(git_gecko: Repo, git_wpt: Repo, sync: LandingSync, allow_push: bool = True) -> None: if not allow_push: logger.info("Landing in bug %s is ready for push.\n" "Working copy is in %s" % (sync.bug, sync.gecko_worktree.get().working_dir)) return update_repositories(git_gecko, git_wpt) push(sync) def try_notify_downstream(commits: Any, landing_is_complete: bool = False) -> None: for _, sync, _ in commits: if sync is not None: if isinstance(sync, downstream.DownstreamSync): with SyncLock.for_process(sync.process_name) as lock: assert isinstance(lock, SyncLock) with sync.as_mut(lock): try: if not sync.skip: sync.try_notify() except Exception as e: logger.error(str(e)) finally: if landing_is_complete: sync.finish() if sync.bug is not None and not sync.results_notified: env.bz.comment(sync.bug, "Test result changes from PR not available.") @entry_point("landing") def gecko_push(git_gecko: Repo, git_wpt: Repo, repository_name: str, hg_rev: str, raise_on_error: bool = False, base_rev: Any | None = None, ) -> None: rev = git_gecko.rev_parse(cinnabar(git_gecko).hg2git(hg_rev)) last_sync_point, base_commit = LandingSync.prev_gecko_commit(git_gecko, repository_name) if base_rev is None and git_gecko.is_ancestor(rev, base_commit.commit): logger.info("Last sync point moved past commit") return landed_central = repository_name == "mozilla-central" revish = f"{base_commit.sha1}..{rev.hexsha}" landing_sync = current(git_gecko, git_wpt) for commit in git_gecko.iter_commits(revish, reverse=True): gecko_commit = sync_commit.GeckoCommit(git_gecko, commit.hexsha) logger.debug("Processing commit %s" % gecko_commit.sha1) if landed_central and gecko_commit.is_landing: logger.info("Found wptsync landing in commit %s" % gecko_commit.sha1) if gecko_commit.bug is None: logger.error("Commit %s looked link a landing, but had no bug" % gecko_commit.sha1) continue syncs = LandingSync.for_bug(git_gecko, git_wpt, gecko_commit.bug, statuses=None, flat=True) if syncs: sync = syncs[0] logger.info("Found sync %s" % sync.process_name) with SyncLock("landing", None) as lock: assert isinstance(lock, SyncLock) with syncs[0].as_mut(lock): sync.finish() else: logger.error("Failed to find sync for commit") elif gecko_commit.is_backout: backed_out, _ = gecko_commit.landing_commits_backed_out() if backed_out: logger.info("Commit %s backs out wpt sync landings" % gecko_commit.sha1) for backed_out_commit in backed_out: bug = backed_out_commit.bug syncs = [] if bug is not None: syncs = LandingSync.for_bug(git_gecko, git_wpt, bug, statuses=None, flat=True) if syncs: # TODO: should really check if commit is actually part of the sync if there's >1 # TODO: reopen landing? But that affects the invariant that there is only one sync = syncs[0] logger.info("Found sync %s" % sync.process_name) with SyncLock("landing", None) as lock: assert isinstance(lock, SyncLock) with sync.as_mut(lock): sync.error = "Landing was backed out" # type: ignore else: logger.error("Failed to find sync for commit") elif gecko_commit.is_downstream: syncs = [] bug = gecko_commit.bug if bug is not None: syncs = LandingSync.for_bug(git_gecko, git_wpt, bug, statuses=None, flat=True) for sync in syncs: sync = syncs[0] with SyncLock("landing", None) as lock: assert isinstance(lock, SyncLock) with sync.as_mut(lock): sync.finish() # TODO: Locking here with SyncLock("landing", None) as lock: assert isinstance(lock, SyncLock) with last_sync_point.as_mut(lock): assert last_sync_point.commit is not None if not git_gecko.is_ancestor(rev, last_sync_point.commit.commit): last_sync_point.commit = rev.hexsha # type: ignore if landing_sync and landing_sync.status == "complete": start_next_landing() def start_next_landing(): from . import tasks tasks.land.apply_async()