sync/upstream.py (902 lines of code) (raw):

from __future__ import annotations import enum import os import re import time import traceback import git import newrelic from bugsy.errors import BugsyException from github import GithubException from mozautomation import commitparser from . import log from . import commit as sync_commit from .base import entry_point from .commit import GeckoCommit from .downstream import DownstreamSync from .errors import AbortError from .env import Environment from .gitutils import update_repositories, gecko_repo from .gh import AttrDict from .lock import SyncLock, constructor, mut from .sync import CommitFilter, LandableStatus, SyncProcess, CommitRange from .repos import cinnabar, pygit2_get from typing import (Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple, Union, cast, TYPE_CHECKING) from git.repo.base import Repo if TYPE_CHECKING: from sync.base import BranchRefObject, ProcessName from sync.commit import Commit CreateSyncs = Dict[Optional[int], Union[List, "Endpoints"]] UpdateSyncs = Dict[int, Tuple["UpstreamSync", GeckoCommit]] env = Environment() logger = log.get_logger(__name__) class BackoutCommitFilter(CommitFilter): def __init__(self, bug_id: int) -> None: self.bug = bug_id self.seen: set[str] = set() self._commits = {} def _filter_commit(self, commit: Commit) -> bool: assert isinstance(commit, GeckoCommit) if commit.metadata.get("wptsync-skip"): return False if DownstreamSync.has_metadata(commit.msg): return False if commit.is_backout: commits, _ = commit.wpt_commits_backed_out() for backout_commit in commits: if backout_commit.sha1 in self.seen: return True if commit.bug == self.bug: if commit.is_empty(env.config["gecko"]["path"]["wpt"]): return False self.seen.add(commit.sha1) return True return False def filter_commits(self, commits: Iterable[Commit]) -> Sequence[Commit]: return remove_complete_backouts(commits) class UpstreamSync(SyncProcess): sync_type = "upstream" obj_id = "bug" statuses = ("open", "wpt-merged", "complete", "incomplete") status_transitions = [("open", "wpt-merged"), ("open", "complete"), ("open", "incomplete"), ("incomplete", "open"), ("wpt-merged", "complete")] multiple_syncs = True def __init__(self, git_gecko: Repo, git_wpt: Repo, process_name: ProcessName) -> None: super().__init__(git_gecko, git_wpt, process_name) self._upstreamed_gecko_commits: list[GeckoCommit] | None = None self._upstreamed_gecko_head: str | None = None @classmethod @constructor(lambda args: ("upstream", args['bug'])) def new(cls, lock: SyncLock, git_gecko: Repo, git_wpt: Repo, gecko_base: str, gecko_head: str, wpt_base: str = "origin/master", wpt_head: str | None = None, bug: str | None = None, status: str = "open", ) -> UpstreamSync: self = super().new(lock, git_gecko, git_wpt, gecko_base, gecko_head, wpt_base=wpt_base, wpt_head=wpt_head, bug=bug, status=status) with self.as_mut(lock): for commit in self.gecko_commits: commit.set_upstream_sync(self) return self @classmethod def from_pr(cls, lock: SyncLock, git_gecko: Repo, git_wpt: Repo, pr_id: int, body: str | None ) -> UpstreamSync | None: gecko_commits = [] bug = None integration_branch = None if body is None or not cls.has_metadata(body.encode("utf8", "replace")): return None commits = env.gh_wpt.get_commits(pr_id) for gh_commit in commits: commit = sync_commit.WptCommit(git_wpt, gh_commit.sha) if cls.has_metadata(commit.msg): gecko_commits.append(cinnabar(git_gecko).hg2git(commit.metadata["gecko-commit"])) commit_bug = env.bz.id_from_url(commit.metadata["bugzilla-url"]) if bug is not None and commit_bug != bug: logger.error("Got multiple bug numbers in URL from commits") break elif bug is None: bug = commit_bug if (integration_branch is not None and commit.metadata["integration_branch"] != integration_branch): logger.warning("Got multiple integration branches from commits") elif integration_branch is None: integration_branch = commit.metadata["integration_branch"] else: break if not gecko_commits: return None assert bug gecko_base = git_gecko.rev_parse("%s^" % gecko_commits[0]) gecko_head = git_gecko.rev_parse(gecko_commits[-1]) wpt_head = commits[-1].sha wpt_base = commits[0].sha return cls.new(lock, git_gecko, git_wpt, gecko_base, gecko_head, wpt_base, wpt_head, bug, pr_id) @classmethod def has_metadata(cls, message: bytes) -> bool: required_keys = ["gecko-commit", "bugzilla-url"] metadata = sync_commit.get_metadata(message) return all(item in metadata for item in required_keys) def gecko_commit_filter(self) -> BackoutCommitFilter: return BackoutCommitFilter(self.bug) @property def landable_status(self): return LandableStatus.upstream @property # type: ignore[override] def bug(self) -> int: return int(self.process_name.obj_id) @bug.setter # type: ignore @mut() def bug(self, value: int) -> None: raise AttributeError("Can't set attribute") @property def pr_status(self): return self.data.get("pr-status", "open") @pr_status.setter def pr_status(self, value): self.data["pr-status"] = value @property def merge_sha(self) -> str: return self.data.get("merge-sha", None) @merge_sha.setter def merge_sha(self, value: str | None) -> None: self.data["merge-sha"] = value @property def remote_branch(self) -> str | None: return self.data.get("remote-branch") @remote_branch.setter # type: ignore @mut() def remote_branch(self, value: str | None) -> None: if value: assert not value.startswith("refs/") self.data["remote-branch"] = value @mut() def get_or_create_remote_branch(self) -> str: if not self.remote_branch: pygit2_gecko = pygit2_get(self.git_gecko) pygit2_wpt = pygit2_get(self.git_wpt) if self.branch_name in pygit2_gecko.branches: upstream = pygit2_gecko.branches[self.branch_name].upstream if upstream: self.remote_branch = upstream.shortname # type: ignore if not self.remote_branch: count = 0 refs = pygit2_wpt.references initial_path = path = "refs/remotes/origin/gecko/%s" % self.bug while path in refs: count += 1 path = f"{initial_path}-{count}" self.remote_branch = path[len("refs/remotes/origin/"):] # type: ignore return self.remote_branch @property def upstreamed_gecko_commits(self) -> list[GeckoCommit]: if (self._upstreamed_gecko_commits is None or self._upstreamed_gecko_head != self.wpt_commits.head.sha1): self._upstreamed_gecko_commits = [ sync_commit.GeckoCommit(self.git_gecko, cinnabar(self.git_gecko).hg2git( wpt_commit.metadata["gecko-commit"])) for wpt_commit in self.wpt_commits if "gecko-commit" in wpt_commit.metadata] self._upstreamed_gecko_head = self.wpt_commits.head.sha1 return self._upstreamed_gecko_commits @mut() def update_wpt_commits(self) -> bool: if len(self.gecko_commits) == 0: return False # Find the commits that were already upstreamed. Some gecko commits may not # result in an upstream commit, if the patch has no effect. But if we find # the last commit that was previously upstreamed then all earlier ones must # also match. upstreamed_commits = {item.sha1 for item in self.upstreamed_gecko_commits} matching_commits = list(self.gecko_commits[:]) for gecko_commit in reversed(list(self.gecko_commits)): if gecko_commit.sha1 in upstreamed_commits: break matching_commits.pop() if len(matching_commits) == len(self.gecko_commits) == len(self.upstreamed_gecko_commits): return False if len(matching_commits) == 0: self.wpt_commits.head = self.wpt_commits.base # type: ignore elif len(matching_commits) < len(self.upstreamed_gecko_commits): self.wpt_commits.head = self.wpt_commits[len(matching_commits) - 1] # type: ignore # Ensure the worktree is clean wpt_work = self.wpt_worktree.get() wpt_work.git.reset(hard=True) wpt_work.git.clean(f=True, d=True, x=True) for commit in self.gecko_commits[len(matching_commits):]: commit = self.add_commit(commit) assert (len(self.wpt_commits) == len(self.upstreamed_gecko_commits)) return True def gecko_landed(self) -> bool: if not len(self.gecko_commits): return False central_commit = self.git_gecko.rev_parse(env.config["gecko"]["refs"]["central"]) landed = [self.git_gecko.is_ancestor(commit.commit, central_commit) for commit in self.gecko_commits] if not all(item == landed[0] for item in landed): logger.warning("Got some commits landed and some not for upstream sync %s" % self.branch_name) return False return landed[0] @property def repository(self) -> str: # Need to check central before landing repos head = self.gecko_commits.head repo = gecko_repo(self.git_gecko, head.commit) if repo is None: raise ValueError("Commit %s not part of any repository" % head.sha1) return repo @mut() def add_commit(self, gecko_commit: GeckoCommit) -> tuple[Commit | None, bool]: git_work = self.wpt_worktree.get() metadata = {"gecko-commit": gecko_commit.canonical_rev} if os.path.exists(os.path.join(git_work.working_dir, gecko_commit.canonical_rev + ".diff")): # If there's already a patch file here then don't try to create a new one # because we'll presumbaly fail again raise AbortError("Skipping due to existing patch") wpt_commit = gecko_commit.move(git_work, metadata=metadata, msg_filter=commit_message_filter, src_prefix=env.config["gecko"]["path"]["wpt"]) assert not git_work.is_dirty() if wpt_commit: self.wpt_commits.head = wpt_commit # type: ignore return wpt_commit, True @mut() def create_pr(self) -> int: if self.pr: return self.pr assert self.remote_branch is not None assert self.remote_branch in self.git_wpt.remotes.origin.refs while not env.gh_wpt.has_branch(self.remote_branch): logger.debug("Waiting for branch") time.sleep(1) commit_summary = self.wpt_commits[0].commit.summary if isinstance(commit_summary, bytes): commit_summary = commit_summary.decode("utf8", "ignore") msg = self.wpt_commits[0].msg.split(b"\n", 1) body = msg[1].decode("utf8", "replace") if len(msg) != 1 else "" pr_id = env.gh_wpt.create_pull( title="[Gecko{}] {}".format(" Bug %s" % self.bug if self.bug else "", commit_summary), body=body.strip(), base="master", head=self.remote_branch) self.pr = pr_id # type: ignore # TODO: add label to bug env.bz.comment(self.bug, "Created web-platform-tests PR %s for changes under " "testing/web-platform/tests" % env.gh_wpt.pr_url(pr_id)) return pr_id @mut() def push_commits(self) -> None: remote_branch = self.get_or_create_remote_branch() logger.info(f"Pushing commits from bug {self.bug} to branch {remote_branch}") push_info = self.git_wpt.remotes.origin.push("refs/heads/%s:%s" % (self.branch_name, remote_branch), force=True, set_upstream=True) for item in push_info: if item.flags & item.ERROR: raise AbortError(item.summary) def push_required(self) -> bool: return not (self.remote_branch and self.remote_branch in self.git_wpt.remotes.origin.refs and self.git_wpt.remotes.origin.refs[self.remote_branch].commit.hexsha == self.wpt_commits.head.sha1) @mut() def update_github(self) -> None: if self.pr: state = env.gh_wpt.pull_state(self.pr) if not len(self.gecko_commits): env.gh_wpt.close_pull(self.pr) elif state == "closed": pr = env.gh_wpt.get_pull(self.pr) if not pr.merged: env.gh_wpt.reopen_pull(self.pr) else: # If all the local commits are represented upstream, everything is # fine and close out the sync. Otherwise we have a problem. if len(self.upstreamed_gecko_commits) == len(self.gecko_commits): if self.status not in ("wpt-merged", "complete"): env.bz.comment(self.bug, "Upstream PR merged") self.finish() else: # It's unclear what to do in this case, so mark the sync for manual # fixup self.error = ("Upstream PR merged, " # type: ignore "but additional commits added after merge") return if not len(self.gecko_commits): return if not len(self.upstreamed_gecko_commits): return if self.push_required(): self.push_commits() if not self.pr: self.create_pr() self.set_landed_status() def set_landed_status(self) -> None: """ Set the status of the check on the GitHub commit upstream. This check is used to tell if the code has been landed into Gecko. """ if not self.pr: return landed_status = "success" if self.gecko_landed() else "failure" logger.info("Setting landed status to %s" % landed_status) # TODO - Maybe ignore errors setting the status if env.gh_wpt.get_status(self.pr, "upstream/gecko") != landed_status: env.gh_wpt.set_status(self.pr, landed_status, target_url=env.bz.bugzilla_url(self.bug), description="Landed on mozilla-central", context="upstream/gecko") @mut() def try_land_pr(self) -> bool: logger.info("Checking if sync for bug %s can land" % self.bug) if not self.status == "open": logger.info("Sync is %s" % self.status) return False if not self.gecko_landed(): logger.info("Commits are not yet landed in gecko") return False if not self.pr: logger.info("No upstream PR created") return False merge_sha = env.gh_wpt.merge_sha(self.pr) if merge_sha: logger.info("PR already merged") self.merge_sha = merge_sha self.finish("wpt-merged") return False try: self.set_landed_status() except Exception: logger.warning("Failed setting status on PR for bug %s" % self.bug) logger.info("Commit are landable; trying to land %s" % self.pr) msg = None check_status, checks = get_check_status(self.pr) if check_status not in [CheckStatus.SUCCESS, CheckStatus.PENDING]: details = ["Github PR %s" % env.gh_wpt.pr_url(self.pr)] msg = ("Can't merge web-platform-tests PR due to failing upstream checks:\n%s" % details) elif not env.gh_wpt.is_mergeable(self.pr): msg = "Can't merge web-platform-tests PR because it has merge conflicts" elif not env.gh_wpt.is_approved(self.pr): # This should be handled by the pr-bot msg = "Can't merge web-platform-tests PR because it is missing approval" else: try: merge_sha = env.gh_wpt.merge_pull(self.pr) env.bz.comment(self.bug, "Upstream PR merged by %s" % env.config["web-platform-tests"]["github"]["user"]) except GithubException as e: if isinstance(e.data, dict): err_msg = e.data.get("message", "Unknown GitHub Error") else: err_msg = e.data msg = ("Merging PR %s failed.\nMessage: %s" % (env.gh_wpt.pr_url(self.pr), err_msg)) except Exception as e: msg = ("Merging PR %s failed.\nMessage: %s" % (env.gh_wpt.pr_url(self.pr), e)) else: self.merge_sha = merge_sha self.finish("wpt-merged") return True if msg is not None: logger.error(msg) return False @mut() def finish(self, status: str = "complete") -> None: super().finish(status) if status in ("wpt-merged", "complete") and self.remote_branch: # Delete the remote branch after a merge try: self.git_wpt.remotes.origin.push(self.remote_branch, delete=True) except git.GitCommandError: pass else: self.remote_branch = None # type: ignore @property def pr_head(self) -> str | None: """ Retrieves the head of the PR ref: origin/pr/{pr_id} :return: The SHA of the head commit. """ if not self.pr: logger.error("No PR ID found for %s" % self.process_name) return None pr_ref = f'origin/pr/{self.pr}' if pr_ref not in self.git_wpt.references: # PR ref doesn't seem to exist logger.error("No ref found for %s" % pr_ref) return None ref = self.git_wpt.references[pr_ref] return ref.commit.hexsha @property def pr_commits(self) -> CommitRange: pr_head_sha = self.pr_head if not pr_head_sha: raise ValueError("Can't get PR commits as the ref head could not be found for %s" % self.process_name) pr_head = sync_commit.WptCommit(self.git_wpt, pr_head_sha) merge_bases = [] # Check if the PR Head is reachable from origin/master origin_master_sha = self.git_wpt.references['origin/master'].commit.hexsha pr_head_reachable = self.git_wpt.is_ancestor(pr_head.commit, self.git_wpt.rev_parse('origin/master')) # If not reachable, then it either hasn't landed yet, it was a Squash + Merge, # or a Rebase and merge. if not pr_head_reachable: merge_bases = self.git_wpt.merge_base(origin_master_sha, pr_head.sha1) else: if not self.merge_sha: raise ValueError('The merge SHA for %s could not be found in the UpstreamSync' % self.process_name) merge_commit = sync_commit.WptCommit(self.git_wpt, self.merge_sha) # If the commit has two parents, one of them being our pr head, it is a merge commit parents = list(merge_commit.commit.parents) if len(parents) == 2 and pr_head in parents: other_parent = parents[0] if parents[1] == pr_head.commit else parents[1] merge_bases = self.git_wpt.merge_base(pr_head.sha1, other_parent) # Not a merge commit, so just use the base we have stored else: merge_bases = [self.wpt_commits.base.commit] # Check that we found the merge base if len(merge_bases) == 0: raise ValueError("Problem determining merge base for %s" % self.process_name) else: merge_base = merge_bases[0] # Create a CommitRange object and return it base = sync_commit.WptCommit(self.git_wpt, merge_base) head_ref_dict = AttrDict({'commit': pr_head}) if TYPE_CHECKING: # This is a terrible hack. head_ref = cast(BranchRefObject, head_ref_dict) else: head_ref = head_ref_dict return CommitRange(self.git_wpt, base, head_ref, sync_commit.WptCommit, CommitFilter()) def commit_message_filter(msg: bytes) -> tuple[bytes, dict[str, str]]: metadata = {} m = commitparser.BUG_RE.match(msg) if m: bug_bytes, bug_number = m.groups()[:2] if msg.startswith(bug_bytes): prefix = re.compile(br"^%s[^\w\d\[\(]*" % bug_bytes) msg = prefix.sub(b"", msg) metadata["bugzilla-url"] = env.bz.bugzilla_url(int(bug_number)) reviewers = ", ".join(item.decode("utf8", "replace") for item in commitparser.parse_reviewers(msg)) if reviewers: metadata["gecko-reviewers"] = reviewers msg = commitparser.replace_reviewers(msg, "") msg = commitparser.strip_commit_metadata(msg) description = msg.splitlines() if description: summary = description.pop(0) summary = summary.rstrip(b"!#$%&(*+,-/:;<=>@[\\^_`{|~").rstrip() msg = summary + (b"\n" + b"\n".join(description) if description else b"") return msg, metadata def wpt_commits(git_gecko: Repo, first_commit: GeckoCommit, head_commit: GeckoCommit) -> list[GeckoCommit]: # List of syncs that have changed, so we can update them all as appropriate at the end revish = f"{first_commit.sha1}..{head_commit.sha1}" logger.info("Getting commits in range %s" % revish) commits = [sync_commit.GeckoCommit(git_gecko, item.hexsha) for item in git_gecko.iter_commits(revish, paths=env.config["gecko"]["path"]["wpt"], reverse=True, max_parents=1)] return filter_commits(commits) def filter_commits(commits: list[GeckoCommit]) -> list[GeckoCommit]: rv = [] for commit in commits: if (commit.metadata.get("wptsync-skip") or DownstreamSync.has_metadata(commit.msg) or (commit.is_backout and not commit.wpt_commits_backed_out()[0])): continue rv.append(commit) return rv def remove_complete_backouts(commits: Iterable[Commit]) -> Sequence[Commit]: """Given a list of commits, remove any commits for which a backout exists in the list""" commits_remaining: set[str] = set() for commit in commits: assert isinstance(commit, GeckoCommit) if commit.is_backout: backed_out_commits, _ = commit.wpt_commits_backed_out() backed_out = {item.sha1 for item in backed_out_commits} if backed_out.issubset(commits_remaining): commits_remaining -= backed_out continue commits_remaining.add(commit.sha1) return [item for item in commits if item.sha1 in commits_remaining] class Endpoints: def __init__(self, first: GeckoCommit) -> None: self._first: GeckoCommit = first self._second: GeckoCommit | None = None @property def base(self) -> GeckoCommit: return GeckoCommit(self._first.repo, self._first.commit.parents[0]) @property def head(self) -> GeckoCommit: if self._second is not None: return self._second return self._first @head.setter def head(self, value: GeckoCommit) -> None: self._second = value def __repr__(self): return f"<Endpoints {self.base}:{self.head}>" def updates_for_backout(git_gecko: Repo, git_wpt: Repo, commit: GeckoCommit, ) -> tuple[CreateSyncs, UpdateSyncs]: backed_out_commits, bugs = commit.wpt_commits_backed_out() backed_out_commit_shas = {item.sha1 for item in backed_out_commits} create_syncs: CreateSyncs = {None: []} update_syncs: UpdateSyncs = {} for backed_out_commit in backed_out_commits: syncs: list[SyncProcess] = [] backed_out_bug = backed_out_commit.bug if backed_out_bug: syncs = UpstreamSync.for_bug(git_gecko, git_wpt, backed_out_bug, statuses={"open", "incomplete"}, flat=True) if len(syncs) not in (0, 1): raise ValueError("Lookup of upstream syncs for bug %s returned syncs: %r" % (len(syncs), syncs)) if syncs: sync = syncs.pop() assert isinstance(sync, UpstreamSync) if commit in sync.gecko_commits: # This commit was already processed backed_out_commit_shas = set() return {}, {} if backed_out_commit in sync.upstreamed_gecko_commits: backed_out_commit_shas.remove(backed_out_commit.sha1) assert sync.bug is not None update_syncs[sync.bug] = (sync, commit) if backed_out_commit_shas: # This backout covers something other than known open syncs, so we need to # create a new sync especially for it # TODO: we should check for this already existing before we process the backout # Need to create a bug for this backout backout_bug = None for bug in bugs: open_bug_syncs = UpstreamSync.for_bug(git_gecko, git_wpt, bug, statuses={"open", "incomplete"}, flat=False) if bug not in update_syncs and not open_bug_syncs: backout_bug = bug break if backout_bug is None: # TODO: Turn create_syncs into a class new_no_bug = create_syncs[None] assert isinstance(new_no_bug, list) new_no_bug.append(Endpoints(commit)) else: create_syncs[backout_bug] = Endpoints(commit) return create_syncs, update_syncs def updated_syncs_for_push(git_gecko: Repo, git_wpt: Repo, first_commit: GeckoCommit, head_commit: GeckoCommit, ) -> tuple[CreateSyncs, UpdateSyncs] | None: # TODO: Check syncs with pushes that no longer exist on autoland all_commits = wpt_commits(git_gecko, first_commit, head_commit) if not all_commits: logger.info("No new commits affecting wpt found") return None else: logger.info("Got %i commits since the last sync point" % len(all_commits)) commits = remove_complete_backouts(all_commits) if not commits: logger.info("No commits remain after removing backout pairs") return None create_syncs: CreateSyncs = {None: []} update_syncs: UpdateSyncs = {} for commit in commits: assert isinstance(commit, GeckoCommit) if commit.upstream_sync(git_gecko, git_wpt) is not None: # This commit was already processed e.g. by a manual invocation, so skip continue if commit.is_backout: create, update = updates_for_backout(git_gecko, git_wpt, commit) create_syncs.update(create) update_syncs.update(update) if commit.is_downstream or commit.is_landing: continue else: bug = commit.bug if bug is None: continue sync: SyncProcess | None = None if bug in update_syncs: sync, _ = update_syncs[bug] else: statuses = ["open", "incomplete"] syncs = UpstreamSync.for_bug(git_gecko, git_wpt, bug, statuses=statuses, flat=True) if len(syncs) not in (0, 1): logger.warning("Lookup of upstream syncs for bug %s returned syncs: %r" % (len(syncs), syncs)) # Try to pick the most recent sync for status in statuses: status_syncs = [s for s in syncs if s.status == status] if status_syncs: status_syncs.sort(key=lambda x: int(x.process_name.obj_id)) sync = status_syncs.pop() break if syncs: sync = syncs[0] if sync: assert isinstance(sync, UpstreamSync) if commit not in sync.gecko_commits: update_syncs[bug] = (sync, commit) elif sync.pr is None: head = sync.gecko_commits.head assert isinstance(head, GeckoCommit) update_syncs[bug] = (sync, head) else: if bug is None: create_syncs[None].append(Endpoints(commit)) elif bug in create_syncs: bug_endpoints = create_syncs[bug] assert isinstance(bug_endpoints, Endpoints) bug_endpoints.head = commit else: create_syncs[bug] = Endpoints(commit) return create_syncs, update_syncs def create_syncs(lock: SyncLock, git_gecko: Repo, git_wpt: Repo, create_endpoints: dict[int | None, list | Endpoints], ) -> list[UpstreamSync]: rv = [] for bug, endpoints in create_endpoints.items(): if bug is not None: assert isinstance(endpoints, Endpoints) endpoints = [endpoints] assert isinstance(endpoints, list) for endpoint in endpoints: if bug is None: # TODO: Loading the commits doesn't work in this case, because we depend on the bug commit = sync_commit.GeckoCommit(git_gecko, endpoint.head) bug = env.bz.new("Upstream commit %s to web-platform-tests" % commit.canonical_rev, "", "Testing", "web-platform-tests", whiteboard="[wptsync upstream]") sync = UpstreamSync.new(lock, git_gecko, git_wpt, bug=bug, gecko_base=endpoint.base.sha1, gecko_head=endpoint.head.sha1, wpt_base="origin/master", wpt_head="origin/master") rv.append(sync) return rv def update_sync_heads(lock: SyncLock, syncs_by_bug: dict[int, tuple[UpstreamSync, GeckoCommit]], ) -> list[UpstreamSync]: rv = [] for bug, (sync, commit) in syncs_by_bug.items(): if sync.status not in ("open", "incomplete"): # TODO: Create a new sync with a non-zero seq-id in this case raise ValueError("Tried to modify a closed sync for bug %s with commit %s" % (bug, commit.canonical_rev)) with sync.as_mut(lock): sync.gecko_commits.head = commit # type: ignore for gecko_commit in sync.gecko_commits: assert isinstance(gecko_commit, GeckoCommit) gecko_commit.set_upstream_sync(sync) rv.append(sync) return rv def update_modified_sync(git_gecko: Repo, git_wpt: Repo, sync: UpstreamSync) -> None: assert sync._lock is not None if len(sync.gecko_commits) == 0: # In the case that there are no gecko commits, we presumably had a backout # In this case we don't touch the wpt commits, but just mark the PR # as closed. That's pretty counterintuitive, but it turns out that GitHub # will only let you reopen a closed PR if you don't change the branch head in # the meantime. So we carefully avoid touching the wpt side until something # relands and we have a chance to reopen the PR logger.info("Sync has no commits, so marking as incomplete") sync.status = "incomplete" # type: ignore if not sync.pr: logger.info("Sync was already fully applied upstream, not creating a PR") return else: sync.status = "open" # type: ignore try: sync.update_wpt_commits() except AbortError: # If we got a merge conflict and the PR doesn't exist yet then try # recreating the commits on top of the current sync point in order that # we get a PR and it's visible that it fails if not sync.pr: logger.info("Applying to origin/master failed; " "retrying with the current sync point") from .landing import load_sync_point sync_point = load_sync_point(git_gecko, git_wpt) sync.set_wpt_base(sync_point["upstream"]) try: sync.update_wpt_commits() except AbortError: # Reset the base to origin/master sync.set_wpt_base("origin/master") with env.bz.bug_ctx(sync.bug) as bug: bug.add_comment("Failed to create upstream wpt PR due to " "merge conflicts. This requires fixup from a wpt sync " "admin.") needinfo_users = [item.strip() for item in (env.config["gecko"]["needinfo"] .get("upstream", "") .split(","))] needinfo_users = [item for item in needinfo_users if item] bug.needinfo(*needinfo_users) raise sync.update_github() def update_sync_prs(lock: SyncLock, git_gecko: Repo, git_wpt: Repo, create_endpoints: dict[int | None, list | Endpoints], update_syncs: dict[int, tuple[UpstreamSync, GeckoCommit]], raise_on_error: bool = False, ) -> tuple[set[UpstreamSync], set]: pushed_syncs = set() failed_syncs = set() to_push = create_syncs(lock, git_gecko, git_wpt, create_endpoints) to_push.extend(update_sync_heads(lock, update_syncs)) for sync in to_push: with sync.as_mut(lock): try: update_modified_sync(git_gecko, git_wpt, sync) except Exception as e: sync.error = e # type: ignore if raise_on_error: raise traceback.print_exc() logger.error(e) failed_syncs.add((sync, e)) else: sync.error = None # type: ignore pushed_syncs.add(sync) return pushed_syncs, failed_syncs def try_land_syncs(lock: SyncLock, syncs: set[UpstreamSync]) -> set[UpstreamSync]: landed_syncs = set() for sync in syncs: with sync.as_mut(lock): if sync.try_land_pr(): landed_syncs.add(sync) return landed_syncs @entry_point("upstream") @mut('sync') def update_sync(git_gecko, git_wpt, sync, raise_on_error=True, repo_update=True): if sync.status in ("wpt-merged", "complete"): logger.info("Nothing to do for sync with status %s" % sync.status) return set(), set(), set() if repo_update: update_repositories(git_gecko, git_wpt) assert isinstance(sync, UpstreamSync) update_syncs = {sync.bug: (sync, sync.gecko_commits.head.sha1)} pushed_syncs, failed_syncs = update_sync_prs(sync._lock, git_gecko, git_wpt, {}, update_syncs, raise_on_error=raise_on_error) if sync not in failed_syncs: landed_syncs = try_land_syncs(sync._lock, [sync]) else: landed_syncs = set() return pushed_syncs, failed_syncs, landed_syncs @entry_point("upstream") def gecko_push(git_gecko: Repo, git_wpt: Repo, repository_name: str, hg_rev: str, raise_on_error: bool = False, base_rev: Any | None = None, ) -> tuple[set[UpstreamSync], set[UpstreamSync], set] | None: rev = git_gecko.rev_parse(cinnabar(git_gecko).hg2git(hg_rev)) last_sync_point, prev_commit = UpstreamSync.prev_gecko_commit(git_gecko, repository_name) assert last_sync_point.commit is not None if base_rev is None and git_gecko.is_ancestor(rev, last_sync_point.commit.commit): logger.info("Last sync point moved past commit") return None with SyncLock("upstream", None) as lock: assert isinstance(lock, SyncLock) updated = updated_syncs_for_push(git_gecko, git_wpt, prev_commit, sync_commit.GeckoCommit(git_gecko, rev)) if updated is None: return set(), set(), set() create_endpoints, update_syncs = updated pushed_syncs, failed_syncs = update_sync_prs(lock, git_gecko, git_wpt, create_endpoints, update_syncs, raise_on_error=raise_on_error) landable_syncs = {item for item in UpstreamSync.load_by_status(git_gecko, git_wpt, "open") if item.error is None} if TYPE_CHECKING: landable = cast(Set[UpstreamSync], landable_syncs) else: landable = landable_syncs landed_syncs = try_land_syncs(lock, landable) # TODO if not git_gecko.is_ancestor(rev, last_sync_point.commit.commit): with last_sync_point.as_mut(lock): last_sync_point.commit = rev.hexsha # type: ignore return pushed_syncs, landed_syncs, failed_syncs @enum.unique class CheckStatus(enum.Enum): SUCCESS = "success" PENDING = "pending" FAILURE = "failure" def get_check_status(pr_id): checks = env.gh_wpt.get_check_runs(pr_id) if commit_checks_pass(checks): status = CheckStatus.SUCCESS elif not commit_checks_complete(checks): status = CheckStatus.PENDING else: status = CheckStatus.FAILURE return status, checks def commit_checks_pass(checks): """Boolean indicating whether all required check runs pass""" return all(item["required"] is False or (item["status"] == "completed" and item["conclusion"] in ("success", "neutral")) for item in checks.values()) def commit_checks_complete(checks): """Boolean indicating whether all check runs are complete""" return all(item["status"] == "completed" for item in checks.values()) @entry_point("upstream") @mut('sync') def commit_check_changed(git_gecko, git_wpt, sync): landed = False if sync.status != "open": return True check_status, checks = get_check_status(sync.pr) if not checks: logger.error("No checks found for pr %s" % sync.pr) return # Record the overall status and commit so we only notify once per commit this_pr_check = {"state": check_status.value, "sha": next(iter(checks.values()))["head_sha"]} last_pr_check = sync.last_pr_check sync.last_pr_check = this_pr_check if check_status == CheckStatus.SUCCESS: sync.error = None if sync.gecko_landed(): landed = sync.try_land_pr() elif this_pr_check != last_pr_check: env.bz.comment(sync.bug, "Upstream web-platform-tests status checks passed, " "PR will merge once commit reaches central.") elif check_status == CheckStatus.FAILURE and last_pr_check != this_pr_check: details = ["Github PR %s" % env.gh_wpt.pr_url(sync.pr)] for name, check_run in checks.items(): if check_run["conclusion"] not in ("success", "neutral"): details.append("* {} ({})".format(name, check_run["url"])) details = "\n".join(details) msg = ("Can't merge web-platform-tests PR due to failing upstream checks:\n%s" % details) try: with env.bz.bug_ctx(sync.bug) as bug: bug["comment"] = msg # Do this as a seperate operation with env.bz.bug_ctx(sync.bug) as bug: commit_author = sync.gecko_commits[0].email if commit_author: bug.needinfo(commit_author) except BugsyException: msg = traceback.format_exc() logger.warning("Failed to update bug:\n%s" % msg) # Sometimes needinfos fail because emails addresses in bugzilla don't # match the commits. That's non-fatal, but record the exception here in # case something more unexpected happens newrelic.agent.record_exception() sync.error = "Checks failed" else: logger.info("Some upstream web-platform-tests status checks still pending.") return landed @entry_point("upstream") @mut('sync') def update_pr(git_gecko: Repo, git_wpt: Repo, sync: UpstreamSync, action: str, merge_sha: str | None = None, base_sha: str | None = None, merged_by: str | None = None, ) -> None: """Update the sync status for a PR event on github :param action string: Either a PR action or a PR status :param merge_sha string: SHA of the new head if the PR merged or None if it didn't""" if action == "closed": if not merge_sha and sync.pr_status != "closed": env.bz.comment(sync.bug, "Upstream PR was closed without merging") sync.pr_status = "closed" else: assert merge_sha is not None sync.merge_sha = merge_sha if not sync.wpt_commits and base_sha: sync.set_wpt_base(base_sha) if sync.status not in ("complete", "wpt-merged"): env.bz.comment(sync.bug, "Upstream PR merged by %s" % merged_by) sync.finish("wpt-merged") elif action == "reopened" or action == "open": sync.pr_status = "open"