sync/upstream.py (902 lines of code) (raw):
from __future__ import annotations
import enum
import os
import re
import time
import traceback
import git
import newrelic
from bugsy.errors import BugsyException
from github import GithubException
from mozautomation import commitparser
from . import log
from . import commit as sync_commit
from .base import entry_point
from .commit import GeckoCommit
from .downstream import DownstreamSync
from .errors import AbortError
from .env import Environment
from .gitutils import update_repositories, gecko_repo
from .gh import AttrDict
from .lock import SyncLock, constructor, mut
from .sync import CommitFilter, LandableStatus, SyncProcess, CommitRange
from .repos import cinnabar, pygit2_get
from typing import (Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple, Union, cast,
TYPE_CHECKING)
from git.repo.base import Repo
if TYPE_CHECKING:
from sync.base import BranchRefObject, ProcessName
from sync.commit import Commit
CreateSyncs = Dict[Optional[int], Union[List, "Endpoints"]]
UpdateSyncs = Dict[int, Tuple["UpstreamSync", GeckoCommit]]
env = Environment()
logger = log.get_logger(__name__)
class BackoutCommitFilter(CommitFilter):
def __init__(self, bug_id: int) -> None:
self.bug = bug_id
self.seen: set[str] = set()
self._commits = {}
def _filter_commit(self, commit: Commit) -> bool:
assert isinstance(commit, GeckoCommit)
if commit.metadata.get("wptsync-skip"):
return False
if DownstreamSync.has_metadata(commit.msg):
return False
if commit.is_backout:
commits, _ = commit.wpt_commits_backed_out()
for backout_commit in commits:
if backout_commit.sha1 in self.seen:
return True
if commit.bug == self.bug:
if commit.is_empty(env.config["gecko"]["path"]["wpt"]):
return False
self.seen.add(commit.sha1)
return True
return False
def filter_commits(self, commits: Iterable[Commit]) -> Sequence[Commit]:
return remove_complete_backouts(commits)
class UpstreamSync(SyncProcess):
sync_type = "upstream"
obj_id = "bug"
statuses = ("open", "wpt-merged", "complete", "incomplete")
status_transitions = [("open", "wpt-merged"),
("open", "complete"),
("open", "incomplete"),
("incomplete", "open"),
("wpt-merged", "complete")]
multiple_syncs = True
def __init__(self, git_gecko: Repo, git_wpt: Repo, process_name: ProcessName) -> None:
super().__init__(git_gecko, git_wpt, process_name)
self._upstreamed_gecko_commits: list[GeckoCommit] | None = None
self._upstreamed_gecko_head: str | None = None
@classmethod
@constructor(lambda args: ("upstream", args['bug']))
def new(cls,
lock: SyncLock,
git_gecko: Repo,
git_wpt: Repo,
gecko_base: str,
gecko_head: str,
wpt_base: str = "origin/master",
wpt_head: str | None = None,
bug: str | None = None,
status: str = "open",
) -> UpstreamSync:
self = super().new(lock,
git_gecko,
git_wpt,
gecko_base,
gecko_head,
wpt_base=wpt_base,
wpt_head=wpt_head,
bug=bug,
status=status)
with self.as_mut(lock):
for commit in self.gecko_commits:
commit.set_upstream_sync(self)
return self
@classmethod
def from_pr(cls,
lock: SyncLock,
git_gecko: Repo,
git_wpt: Repo,
pr_id: int,
body: str | None
) -> UpstreamSync | None:
gecko_commits = []
bug = None
integration_branch = None
if body is None or not cls.has_metadata(body.encode("utf8", "replace")):
return None
commits = env.gh_wpt.get_commits(pr_id)
for gh_commit in commits:
commit = sync_commit.WptCommit(git_wpt, gh_commit.sha)
if cls.has_metadata(commit.msg):
gecko_commits.append(cinnabar(git_gecko).hg2git(commit.metadata["gecko-commit"]))
commit_bug = env.bz.id_from_url(commit.metadata["bugzilla-url"])
if bug is not None and commit_bug != bug:
logger.error("Got multiple bug numbers in URL from commits")
break
elif bug is None:
bug = commit_bug
if (integration_branch is not None and
commit.metadata["integration_branch"] != integration_branch):
logger.warning("Got multiple integration branches from commits")
elif integration_branch is None:
integration_branch = commit.metadata["integration_branch"]
else:
break
if not gecko_commits:
return None
assert bug
gecko_base = git_gecko.rev_parse("%s^" % gecko_commits[0])
gecko_head = git_gecko.rev_parse(gecko_commits[-1])
wpt_head = commits[-1].sha
wpt_base = commits[0].sha
return cls.new(lock, git_gecko, git_wpt, gecko_base, gecko_head,
wpt_base, wpt_head, bug, pr_id)
@classmethod
def has_metadata(cls, message: bytes) -> bool:
required_keys = ["gecko-commit",
"bugzilla-url"]
metadata = sync_commit.get_metadata(message)
return all(item in metadata for item in required_keys)
def gecko_commit_filter(self) -> BackoutCommitFilter:
return BackoutCommitFilter(self.bug)
@property
def landable_status(self):
return LandableStatus.upstream
@property # type: ignore[override]
def bug(self) -> int:
return int(self.process_name.obj_id)
@bug.setter # type: ignore
@mut()
def bug(self, value: int) -> None:
raise AttributeError("Can't set attribute")
@property
def pr_status(self):
return self.data.get("pr-status", "open")
@pr_status.setter
def pr_status(self, value):
self.data["pr-status"] = value
@property
def merge_sha(self) -> str:
return self.data.get("merge-sha", None)
@merge_sha.setter
def merge_sha(self, value: str | None) -> None:
self.data["merge-sha"] = value
@property
def remote_branch(self) -> str | None:
return self.data.get("remote-branch")
@remote_branch.setter # type: ignore
@mut()
def remote_branch(self, value: str | None) -> None:
if value:
assert not value.startswith("refs/")
self.data["remote-branch"] = value
@mut()
def get_or_create_remote_branch(self) -> str:
if not self.remote_branch:
pygit2_gecko = pygit2_get(self.git_gecko)
pygit2_wpt = pygit2_get(self.git_wpt)
if self.branch_name in pygit2_gecko.branches:
upstream = pygit2_gecko.branches[self.branch_name].upstream
if upstream:
self.remote_branch = upstream.shortname # type: ignore
if not self.remote_branch:
count = 0
refs = pygit2_wpt.references
initial_path = path = "refs/remotes/origin/gecko/%s" % self.bug
while path in refs:
count += 1
path = f"{initial_path}-{count}"
self.remote_branch = path[len("refs/remotes/origin/"):] # type: ignore
return self.remote_branch
@property
def upstreamed_gecko_commits(self) -> list[GeckoCommit]:
if (self._upstreamed_gecko_commits is None or
self._upstreamed_gecko_head != self.wpt_commits.head.sha1):
self._upstreamed_gecko_commits = [
sync_commit.GeckoCommit(self.git_gecko,
cinnabar(self.git_gecko).hg2git(
wpt_commit.metadata["gecko-commit"]))
for wpt_commit in self.wpt_commits
if "gecko-commit" in wpt_commit.metadata]
self._upstreamed_gecko_head = self.wpt_commits.head.sha1
return self._upstreamed_gecko_commits
@mut()
def update_wpt_commits(self) -> bool:
if len(self.gecko_commits) == 0:
return False
# Find the commits that were already upstreamed. Some gecko commits may not
# result in an upstream commit, if the patch has no effect. But if we find
# the last commit that was previously upstreamed then all earlier ones must
# also match.
upstreamed_commits = {item.sha1 for item in self.upstreamed_gecko_commits}
matching_commits = list(self.gecko_commits[:])
for gecko_commit in reversed(list(self.gecko_commits)):
if gecko_commit.sha1 in upstreamed_commits:
break
matching_commits.pop()
if len(matching_commits) == len(self.gecko_commits) == len(self.upstreamed_gecko_commits):
return False
if len(matching_commits) == 0:
self.wpt_commits.head = self.wpt_commits.base # type: ignore
elif len(matching_commits) < len(self.upstreamed_gecko_commits):
self.wpt_commits.head = self.wpt_commits[len(matching_commits) - 1] # type: ignore
# Ensure the worktree is clean
wpt_work = self.wpt_worktree.get()
wpt_work.git.reset(hard=True)
wpt_work.git.clean(f=True, d=True, x=True)
for commit in self.gecko_commits[len(matching_commits):]:
commit = self.add_commit(commit)
assert (len(self.wpt_commits) ==
len(self.upstreamed_gecko_commits))
return True
def gecko_landed(self) -> bool:
if not len(self.gecko_commits):
return False
central_commit = self.git_gecko.rev_parse(env.config["gecko"]["refs"]["central"])
landed = [self.git_gecko.is_ancestor(commit.commit, central_commit)
for commit in self.gecko_commits]
if not all(item == landed[0] for item in landed):
logger.warning("Got some commits landed and some not for upstream sync %s" %
self.branch_name)
return False
return landed[0]
@property
def repository(self) -> str:
# Need to check central before landing repos
head = self.gecko_commits.head
repo = gecko_repo(self.git_gecko, head.commit)
if repo is None:
raise ValueError("Commit %s not part of any repository" % head.sha1)
return repo
@mut()
def add_commit(self, gecko_commit: GeckoCommit) -> tuple[Commit | None, bool]:
git_work = self.wpt_worktree.get()
metadata = {"gecko-commit": gecko_commit.canonical_rev}
if os.path.exists(os.path.join(git_work.working_dir, gecko_commit.canonical_rev + ".diff")):
# If there's already a patch file here then don't try to create a new one
# because we'll presumbaly fail again
raise AbortError("Skipping due to existing patch")
wpt_commit = gecko_commit.move(git_work,
metadata=metadata,
msg_filter=commit_message_filter,
src_prefix=env.config["gecko"]["path"]["wpt"])
assert not git_work.is_dirty()
if wpt_commit:
self.wpt_commits.head = wpt_commit # type: ignore
return wpt_commit, True
@mut()
def create_pr(self) -> int:
if self.pr:
return self.pr
assert self.remote_branch is not None
assert self.remote_branch in self.git_wpt.remotes.origin.refs
while not env.gh_wpt.has_branch(self.remote_branch):
logger.debug("Waiting for branch")
time.sleep(1)
commit_summary = self.wpt_commits[0].commit.summary
if isinstance(commit_summary, bytes):
commit_summary = commit_summary.decode("utf8", "ignore")
msg = self.wpt_commits[0].msg.split(b"\n", 1)
body = msg[1].decode("utf8", "replace") if len(msg) != 1 else ""
pr_id = env.gh_wpt.create_pull(
title="[Gecko{}] {}".format(" Bug %s" % self.bug if self.bug else "",
commit_summary),
body=body.strip(),
base="master",
head=self.remote_branch)
self.pr = pr_id # type: ignore
# TODO: add label to bug
env.bz.comment(self.bug,
"Created web-platform-tests PR %s for changes under "
"testing/web-platform/tests" %
env.gh_wpt.pr_url(pr_id))
return pr_id
@mut()
def push_commits(self) -> None:
remote_branch = self.get_or_create_remote_branch()
logger.info(f"Pushing commits from bug {self.bug} to branch {remote_branch}")
push_info = self.git_wpt.remotes.origin.push("refs/heads/%s:%s" %
(self.branch_name, remote_branch),
force=True,
set_upstream=True)
for item in push_info:
if item.flags & item.ERROR:
raise AbortError(item.summary)
def push_required(self) -> bool:
return not (self.remote_branch and
self.remote_branch in self.git_wpt.remotes.origin.refs and
self.git_wpt.remotes.origin.refs[self.remote_branch].commit.hexsha ==
self.wpt_commits.head.sha1)
@mut()
def update_github(self) -> None:
if self.pr:
state = env.gh_wpt.pull_state(self.pr)
if not len(self.gecko_commits):
env.gh_wpt.close_pull(self.pr)
elif state == "closed":
pr = env.gh_wpt.get_pull(self.pr)
if not pr.merged:
env.gh_wpt.reopen_pull(self.pr)
else:
# If all the local commits are represented upstream, everything is
# fine and close out the sync. Otherwise we have a problem.
if len(self.upstreamed_gecko_commits) == len(self.gecko_commits):
if self.status not in ("wpt-merged", "complete"):
env.bz.comment(self.bug, "Upstream PR merged")
self.finish()
else:
# It's unclear what to do in this case, so mark the sync for manual
# fixup
self.error = ("Upstream PR merged, " # type: ignore
"but additional commits added after merge")
return
if not len(self.gecko_commits):
return
if not len(self.upstreamed_gecko_commits):
return
if self.push_required():
self.push_commits()
if not self.pr:
self.create_pr()
self.set_landed_status()
def set_landed_status(self) -> None:
"""
Set the status of the check on the GitHub commit upstream. This check
is used to tell if the code has been landed into Gecko.
"""
if not self.pr:
return
landed_status = "success" if self.gecko_landed() else "failure"
logger.info("Setting landed status to %s" % landed_status)
# TODO - Maybe ignore errors setting the status
if env.gh_wpt.get_status(self.pr, "upstream/gecko") != landed_status:
env.gh_wpt.set_status(self.pr,
landed_status,
target_url=env.bz.bugzilla_url(self.bug),
description="Landed on mozilla-central",
context="upstream/gecko")
@mut()
def try_land_pr(self) -> bool:
logger.info("Checking if sync for bug %s can land" % self.bug)
if not self.status == "open":
logger.info("Sync is %s" % self.status)
return False
if not self.gecko_landed():
logger.info("Commits are not yet landed in gecko")
return False
if not self.pr:
logger.info("No upstream PR created")
return False
merge_sha = env.gh_wpt.merge_sha(self.pr)
if merge_sha:
logger.info("PR already merged")
self.merge_sha = merge_sha
self.finish("wpt-merged")
return False
try:
self.set_landed_status()
except Exception:
logger.warning("Failed setting status on PR for bug %s" % self.bug)
logger.info("Commit are landable; trying to land %s" % self.pr)
msg = None
check_status, checks = get_check_status(self.pr)
if check_status not in [CheckStatus.SUCCESS, CheckStatus.PENDING]:
details = ["Github PR %s" % env.gh_wpt.pr_url(self.pr)]
msg = ("Can't merge web-platform-tests PR due to failing upstream checks:\n%s" %
details)
elif not env.gh_wpt.is_mergeable(self.pr):
msg = "Can't merge web-platform-tests PR because it has merge conflicts"
elif not env.gh_wpt.is_approved(self.pr):
# This should be handled by the pr-bot
msg = "Can't merge web-platform-tests PR because it is missing approval"
else:
try:
merge_sha = env.gh_wpt.merge_pull(self.pr)
env.bz.comment(self.bug, "Upstream PR merged by %s" %
env.config["web-platform-tests"]["github"]["user"])
except GithubException as e:
if isinstance(e.data, dict):
err_msg = e.data.get("message", "Unknown GitHub Error")
else:
err_msg = e.data
msg = ("Merging PR %s failed.\nMessage: %s" %
(env.gh_wpt.pr_url(self.pr), err_msg))
except Exception as e:
msg = ("Merging PR %s failed.\nMessage: %s" %
(env.gh_wpt.pr_url(self.pr), e))
else:
self.merge_sha = merge_sha
self.finish("wpt-merged")
return True
if msg is not None:
logger.error(msg)
return False
@mut()
def finish(self, status: str = "complete") -> None:
super().finish(status)
if status in ("wpt-merged", "complete") and self.remote_branch:
# Delete the remote branch after a merge
try:
self.git_wpt.remotes.origin.push(self.remote_branch, delete=True)
except git.GitCommandError:
pass
else:
self.remote_branch = None # type: ignore
@property
def pr_head(self) -> str | None:
"""
Retrieves the head of the PR ref: origin/pr/{pr_id}
:return: The SHA of the head commit.
"""
if not self.pr:
logger.error("No PR ID found for %s" % self.process_name)
return None
pr_ref = f'origin/pr/{self.pr}'
if pr_ref not in self.git_wpt.references:
# PR ref doesn't seem to exist
logger.error("No ref found for %s" % pr_ref)
return None
ref = self.git_wpt.references[pr_ref]
return ref.commit.hexsha
@property
def pr_commits(self) -> CommitRange:
pr_head_sha = self.pr_head
if not pr_head_sha:
raise ValueError("Can't get PR commits as the ref head could not be found for %s" %
self.process_name)
pr_head = sync_commit.WptCommit(self.git_wpt, pr_head_sha)
merge_bases = []
# Check if the PR Head is reachable from origin/master
origin_master_sha = self.git_wpt.references['origin/master'].commit.hexsha
pr_head_reachable = self.git_wpt.is_ancestor(pr_head.commit,
self.git_wpt.rev_parse('origin/master'))
# If not reachable, then it either hasn't landed yet, it was a Squash + Merge,
# or a Rebase and merge.
if not pr_head_reachable:
merge_bases = self.git_wpt.merge_base(origin_master_sha, pr_head.sha1)
else:
if not self.merge_sha:
raise ValueError('The merge SHA for %s could not be found in the UpstreamSync' %
self.process_name)
merge_commit = sync_commit.WptCommit(self.git_wpt, self.merge_sha)
# If the commit has two parents, one of them being our pr head, it is a merge commit
parents = list(merge_commit.commit.parents)
if len(parents) == 2 and pr_head in parents:
other_parent = parents[0] if parents[1] == pr_head.commit else parents[1]
merge_bases = self.git_wpt.merge_base(pr_head.sha1, other_parent)
# Not a merge commit, so just use the base we have stored
else:
merge_bases = [self.wpt_commits.base.commit]
# Check that we found the merge base
if len(merge_bases) == 0:
raise ValueError("Problem determining merge base for %s" % self.process_name)
else:
merge_base = merge_bases[0]
# Create a CommitRange object and return it
base = sync_commit.WptCommit(self.git_wpt, merge_base)
head_ref_dict = AttrDict({'commit': pr_head})
if TYPE_CHECKING:
# This is a terrible hack.
head_ref = cast(BranchRefObject, head_ref_dict)
else:
head_ref = head_ref_dict
return CommitRange(self.git_wpt, base, head_ref, sync_commit.WptCommit, CommitFilter())
def commit_message_filter(msg: bytes) -> tuple[bytes, dict[str, str]]:
metadata = {}
m = commitparser.BUG_RE.match(msg)
if m:
bug_bytes, bug_number = m.groups()[:2]
if msg.startswith(bug_bytes):
prefix = re.compile(br"^%s[^\w\d\[\(]*" % bug_bytes)
msg = prefix.sub(b"", msg)
metadata["bugzilla-url"] = env.bz.bugzilla_url(int(bug_number))
reviewers = ", ".join(item.decode("utf8", "replace")
for item in commitparser.parse_reviewers(msg))
if reviewers:
metadata["gecko-reviewers"] = reviewers
msg = commitparser.replace_reviewers(msg, "")
msg = commitparser.strip_commit_metadata(msg)
description = msg.splitlines()
if description:
summary = description.pop(0)
summary = summary.rstrip(b"!#$%&(*+,-/:;<=>@[\\^_`{|~").rstrip()
msg = summary + (b"\n" + b"\n".join(description) if description else b"")
return msg, metadata
def wpt_commits(git_gecko: Repo, first_commit: GeckoCommit,
head_commit: GeckoCommit) -> list[GeckoCommit]:
# List of syncs that have changed, so we can update them all as appropriate at the end
revish = f"{first_commit.sha1}..{head_commit.sha1}"
logger.info("Getting commits in range %s" % revish)
commits = [sync_commit.GeckoCommit(git_gecko, item.hexsha) for item in
git_gecko.iter_commits(revish,
paths=env.config["gecko"]["path"]["wpt"],
reverse=True,
max_parents=1)]
return filter_commits(commits)
def filter_commits(commits: list[GeckoCommit]) -> list[GeckoCommit]:
rv = []
for commit in commits:
if (commit.metadata.get("wptsync-skip") or
DownstreamSync.has_metadata(commit.msg) or
(commit.is_backout and not commit.wpt_commits_backed_out()[0])):
continue
rv.append(commit)
return rv
def remove_complete_backouts(commits: Iterable[Commit]) -> Sequence[Commit]:
"""Given a list of commits, remove any commits for which a backout exists
in the list"""
commits_remaining: set[str] = set()
for commit in commits:
assert isinstance(commit, GeckoCommit)
if commit.is_backout:
backed_out_commits, _ = commit.wpt_commits_backed_out()
backed_out = {item.sha1 for item in backed_out_commits}
if backed_out.issubset(commits_remaining):
commits_remaining -= backed_out
continue
commits_remaining.add(commit.sha1)
return [item for item in commits if item.sha1 in commits_remaining]
class Endpoints:
def __init__(self, first: GeckoCommit) -> None:
self._first: GeckoCommit = first
self._second: GeckoCommit | None = None
@property
def base(self) -> GeckoCommit:
return GeckoCommit(self._first.repo, self._first.commit.parents[0])
@property
def head(self) -> GeckoCommit:
if self._second is not None:
return self._second
return self._first
@head.setter
def head(self, value: GeckoCommit) -> None:
self._second = value
def __repr__(self):
return f"<Endpoints {self.base}:{self.head}>"
def updates_for_backout(git_gecko: Repo,
git_wpt: Repo,
commit: GeckoCommit,
) -> tuple[CreateSyncs, UpdateSyncs]:
backed_out_commits, bugs = commit.wpt_commits_backed_out()
backed_out_commit_shas = {item.sha1 for item in backed_out_commits}
create_syncs: CreateSyncs = {None: []}
update_syncs: UpdateSyncs = {}
for backed_out_commit in backed_out_commits:
syncs: list[SyncProcess] = []
backed_out_bug = backed_out_commit.bug
if backed_out_bug:
syncs = UpstreamSync.for_bug(git_gecko,
git_wpt,
backed_out_bug,
statuses={"open", "incomplete"},
flat=True)
if len(syncs) not in (0, 1):
raise ValueError("Lookup of upstream syncs for bug %s returned syncs: %r" %
(len(syncs), syncs))
if syncs:
sync = syncs.pop()
assert isinstance(sync, UpstreamSync)
if commit in sync.gecko_commits:
# This commit was already processed
backed_out_commit_shas = set()
return {}, {}
if backed_out_commit in sync.upstreamed_gecko_commits:
backed_out_commit_shas.remove(backed_out_commit.sha1)
assert sync.bug is not None
update_syncs[sync.bug] = (sync, commit)
if backed_out_commit_shas:
# This backout covers something other than known open syncs, so we need to
# create a new sync especially for it
# TODO: we should check for this already existing before we process the backout
# Need to create a bug for this backout
backout_bug = None
for bug in bugs:
open_bug_syncs = UpstreamSync.for_bug(git_gecko,
git_wpt,
bug,
statuses={"open", "incomplete"},
flat=False)
if bug not in update_syncs and not open_bug_syncs:
backout_bug = bug
break
if backout_bug is None:
# TODO: Turn create_syncs into a class
new_no_bug = create_syncs[None]
assert isinstance(new_no_bug, list)
new_no_bug.append(Endpoints(commit))
else:
create_syncs[backout_bug] = Endpoints(commit)
return create_syncs, update_syncs
def updated_syncs_for_push(git_gecko: Repo,
git_wpt: Repo,
first_commit: GeckoCommit,
head_commit: GeckoCommit,
) -> tuple[CreateSyncs, UpdateSyncs] | None:
# TODO: Check syncs with pushes that no longer exist on autoland
all_commits = wpt_commits(git_gecko, first_commit, head_commit)
if not all_commits:
logger.info("No new commits affecting wpt found")
return None
else:
logger.info("Got %i commits since the last sync point" % len(all_commits))
commits = remove_complete_backouts(all_commits)
if not commits:
logger.info("No commits remain after removing backout pairs")
return None
create_syncs: CreateSyncs = {None: []}
update_syncs: UpdateSyncs = {}
for commit in commits:
assert isinstance(commit, GeckoCommit)
if commit.upstream_sync(git_gecko, git_wpt) is not None:
# This commit was already processed e.g. by a manual invocation, so skip
continue
if commit.is_backout:
create, update = updates_for_backout(git_gecko, git_wpt, commit)
create_syncs.update(create)
update_syncs.update(update)
if commit.is_downstream or commit.is_landing:
continue
else:
bug = commit.bug
if bug is None:
continue
sync: SyncProcess | None = None
if bug in update_syncs:
sync, _ = update_syncs[bug]
else:
statuses = ["open", "incomplete"]
syncs = UpstreamSync.for_bug(git_gecko, git_wpt, bug, statuses=statuses,
flat=True)
if len(syncs) not in (0, 1):
logger.warning("Lookup of upstream syncs for bug %s returned syncs: %r" %
(len(syncs), syncs))
# Try to pick the most recent sync
for status in statuses:
status_syncs = [s for s in syncs if s.status == status]
if status_syncs:
status_syncs.sort(key=lambda x: int(x.process_name.obj_id))
sync = status_syncs.pop()
break
if syncs:
sync = syncs[0]
if sync:
assert isinstance(sync, UpstreamSync)
if commit not in sync.gecko_commits:
update_syncs[bug] = (sync, commit)
elif sync.pr is None:
head = sync.gecko_commits.head
assert isinstance(head, GeckoCommit)
update_syncs[bug] = (sync, head)
else:
if bug is None:
create_syncs[None].append(Endpoints(commit))
elif bug in create_syncs:
bug_endpoints = create_syncs[bug]
assert isinstance(bug_endpoints, Endpoints)
bug_endpoints.head = commit
else:
create_syncs[bug] = Endpoints(commit)
return create_syncs, update_syncs
def create_syncs(lock: SyncLock,
git_gecko: Repo,
git_wpt: Repo,
create_endpoints: dict[int | None, list | Endpoints],
) -> list[UpstreamSync]:
rv = []
for bug, endpoints in create_endpoints.items():
if bug is not None:
assert isinstance(endpoints, Endpoints)
endpoints = [endpoints]
assert isinstance(endpoints, list)
for endpoint in endpoints:
if bug is None:
# TODO: Loading the commits doesn't work in this case, because we depend on the bug
commit = sync_commit.GeckoCommit(git_gecko, endpoint.head)
bug = env.bz.new("Upstream commit %s to web-platform-tests" %
commit.canonical_rev,
"",
"Testing",
"web-platform-tests",
whiteboard="[wptsync upstream]")
sync = UpstreamSync.new(lock,
git_gecko,
git_wpt,
bug=bug,
gecko_base=endpoint.base.sha1,
gecko_head=endpoint.head.sha1,
wpt_base="origin/master",
wpt_head="origin/master")
rv.append(sync)
return rv
def update_sync_heads(lock: SyncLock,
syncs_by_bug: dict[int, tuple[UpstreamSync, GeckoCommit]],
) -> list[UpstreamSync]:
rv = []
for bug, (sync, commit) in syncs_by_bug.items():
if sync.status not in ("open", "incomplete"):
# TODO: Create a new sync with a non-zero seq-id in this case
raise ValueError("Tried to modify a closed sync for bug %s with commit %s" %
(bug, commit.canonical_rev))
with sync.as_mut(lock):
sync.gecko_commits.head = commit # type: ignore
for gecko_commit in sync.gecko_commits:
assert isinstance(gecko_commit, GeckoCommit)
gecko_commit.set_upstream_sync(sync)
rv.append(sync)
return rv
def update_modified_sync(git_gecko: Repo, git_wpt: Repo, sync: UpstreamSync) -> None:
assert sync._lock is not None
if len(sync.gecko_commits) == 0:
# In the case that there are no gecko commits, we presumably had a backout
# In this case we don't touch the wpt commits, but just mark the PR
# as closed. That's pretty counterintuitive, but it turns out that GitHub
# will only let you reopen a closed PR if you don't change the branch head in
# the meantime. So we carefully avoid touching the wpt side until something
# relands and we have a chance to reopen the PR
logger.info("Sync has no commits, so marking as incomplete")
sync.status = "incomplete" # type: ignore
if not sync.pr:
logger.info("Sync was already fully applied upstream, not creating a PR")
return
else:
sync.status = "open" # type: ignore
try:
sync.update_wpt_commits()
except AbortError:
# If we got a merge conflict and the PR doesn't exist yet then try
# recreating the commits on top of the current sync point in order that
# we get a PR and it's visible that it fails
if not sync.pr:
logger.info("Applying to origin/master failed; "
"retrying with the current sync point")
from .landing import load_sync_point
sync_point = load_sync_point(git_gecko, git_wpt)
sync.set_wpt_base(sync_point["upstream"])
try:
sync.update_wpt_commits()
except AbortError:
# Reset the base to origin/master
sync.set_wpt_base("origin/master")
with env.bz.bug_ctx(sync.bug) as bug:
bug.add_comment("Failed to create upstream wpt PR due to "
"merge conflicts. This requires fixup from a wpt sync "
"admin.")
needinfo_users = [item.strip() for item in
(env.config["gecko"]["needinfo"]
.get("upstream", "")
.split(","))]
needinfo_users = [item for item in needinfo_users if item]
bug.needinfo(*needinfo_users)
raise
sync.update_github()
def update_sync_prs(lock: SyncLock,
git_gecko: Repo,
git_wpt: Repo,
create_endpoints: dict[int | None, list | Endpoints],
update_syncs: dict[int, tuple[UpstreamSync, GeckoCommit]],
raise_on_error: bool = False,
) -> tuple[set[UpstreamSync], set]:
pushed_syncs = set()
failed_syncs = set()
to_push = create_syncs(lock, git_gecko, git_wpt, create_endpoints)
to_push.extend(update_sync_heads(lock, update_syncs))
for sync in to_push:
with sync.as_mut(lock):
try:
update_modified_sync(git_gecko, git_wpt, sync)
except Exception as e:
sync.error = e # type: ignore
if raise_on_error:
raise
traceback.print_exc()
logger.error(e)
failed_syncs.add((sync, e))
else:
sync.error = None # type: ignore
pushed_syncs.add(sync)
return pushed_syncs, failed_syncs
def try_land_syncs(lock: SyncLock, syncs: set[UpstreamSync]) -> set[UpstreamSync]:
landed_syncs = set()
for sync in syncs:
with sync.as_mut(lock):
if sync.try_land_pr():
landed_syncs.add(sync)
return landed_syncs
@entry_point("upstream")
@mut('sync')
def update_sync(git_gecko, git_wpt, sync, raise_on_error=True, repo_update=True):
if sync.status in ("wpt-merged", "complete"):
logger.info("Nothing to do for sync with status %s" % sync.status)
return set(), set(), set()
if repo_update:
update_repositories(git_gecko, git_wpt)
assert isinstance(sync, UpstreamSync)
update_syncs = {sync.bug: (sync, sync.gecko_commits.head.sha1)}
pushed_syncs, failed_syncs = update_sync_prs(sync._lock,
git_gecko,
git_wpt,
{},
update_syncs,
raise_on_error=raise_on_error)
if sync not in failed_syncs:
landed_syncs = try_land_syncs(sync._lock, [sync])
else:
landed_syncs = set()
return pushed_syncs, failed_syncs, landed_syncs
@entry_point("upstream")
def gecko_push(git_gecko: Repo,
git_wpt: Repo,
repository_name: str,
hg_rev: str,
raise_on_error: bool = False,
base_rev: Any | None = None,
) -> tuple[set[UpstreamSync], set[UpstreamSync], set] | None:
rev = git_gecko.rev_parse(cinnabar(git_gecko).hg2git(hg_rev))
last_sync_point, prev_commit = UpstreamSync.prev_gecko_commit(git_gecko,
repository_name)
assert last_sync_point.commit is not None
if base_rev is None and git_gecko.is_ancestor(rev, last_sync_point.commit.commit):
logger.info("Last sync point moved past commit")
return None
with SyncLock("upstream", None) as lock:
assert isinstance(lock, SyncLock)
updated = updated_syncs_for_push(git_gecko,
git_wpt,
prev_commit,
sync_commit.GeckoCommit(git_gecko, rev))
if updated is None:
return set(), set(), set()
create_endpoints, update_syncs = updated
pushed_syncs, failed_syncs = update_sync_prs(lock,
git_gecko,
git_wpt,
create_endpoints,
update_syncs,
raise_on_error=raise_on_error)
landable_syncs = {item for item in UpstreamSync.load_by_status(git_gecko, git_wpt, "open")
if item.error is None}
if TYPE_CHECKING:
landable = cast(Set[UpstreamSync], landable_syncs)
else:
landable = landable_syncs
landed_syncs = try_land_syncs(lock, landable)
# TODO
if not git_gecko.is_ancestor(rev, last_sync_point.commit.commit):
with last_sync_point.as_mut(lock):
last_sync_point.commit = rev.hexsha # type: ignore
return pushed_syncs, landed_syncs, failed_syncs
@enum.unique
class CheckStatus(enum.Enum):
SUCCESS = "success"
PENDING = "pending"
FAILURE = "failure"
def get_check_status(pr_id):
checks = env.gh_wpt.get_check_runs(pr_id)
if commit_checks_pass(checks):
status = CheckStatus.SUCCESS
elif not commit_checks_complete(checks):
status = CheckStatus.PENDING
else:
status = CheckStatus.FAILURE
return status, checks
def commit_checks_pass(checks):
"""Boolean indicating whether all required check runs pass"""
return all(item["required"] is False or (item["status"] == "completed" and
item["conclusion"] in ("success", "neutral"))
for item in checks.values())
def commit_checks_complete(checks):
"""Boolean indicating whether all check runs are complete"""
return all(item["status"] == "completed" for item in checks.values())
@entry_point("upstream")
@mut('sync')
def commit_check_changed(git_gecko, git_wpt, sync):
landed = False
if sync.status != "open":
return True
check_status, checks = get_check_status(sync.pr)
if not checks:
logger.error("No checks found for pr %s" % sync.pr)
return
# Record the overall status and commit so we only notify once per commit
this_pr_check = {"state": check_status.value,
"sha": next(iter(checks.values()))["head_sha"]}
last_pr_check = sync.last_pr_check
sync.last_pr_check = this_pr_check
if check_status == CheckStatus.SUCCESS:
sync.error = None
if sync.gecko_landed():
landed = sync.try_land_pr()
elif this_pr_check != last_pr_check:
env.bz.comment(sync.bug,
"Upstream web-platform-tests status checks passed, "
"PR will merge once commit reaches central.")
elif check_status == CheckStatus.FAILURE and last_pr_check != this_pr_check:
details = ["Github PR %s" % env.gh_wpt.pr_url(sync.pr)]
for name, check_run in checks.items():
if check_run["conclusion"] not in ("success", "neutral"):
details.append("* {} ({})".format(name, check_run["url"]))
details = "\n".join(details)
msg = ("Can't merge web-platform-tests PR due to failing upstream checks:\n%s" %
details)
try:
with env.bz.bug_ctx(sync.bug) as bug:
bug["comment"] = msg
# Do this as a seperate operation
with env.bz.bug_ctx(sync.bug) as bug:
commit_author = sync.gecko_commits[0].email
if commit_author:
bug.needinfo(commit_author)
except BugsyException:
msg = traceback.format_exc()
logger.warning("Failed to update bug:\n%s" % msg)
# Sometimes needinfos fail because emails addresses in bugzilla don't
# match the commits. That's non-fatal, but record the exception here in
# case something more unexpected happens
newrelic.agent.record_exception()
sync.error = "Checks failed"
else:
logger.info("Some upstream web-platform-tests status checks still pending.")
return landed
@entry_point("upstream")
@mut('sync')
def update_pr(git_gecko: Repo,
git_wpt: Repo,
sync: UpstreamSync,
action: str,
merge_sha: str | None = None,
base_sha: str | None = None,
merged_by: str | None = None,
) -> None:
"""Update the sync status for a PR event on github
:param action string: Either a PR action or a PR status
:param merge_sha string: SHA of the new head if the PR merged or None if it didn't"""
if action == "closed":
if not merge_sha and sync.pr_status != "closed":
env.bz.comment(sync.bug, "Upstream PR was closed without merging")
sync.pr_status = "closed"
else:
assert merge_sha is not None
sync.merge_sha = merge_sha
if not sync.wpt_commits and base_sha:
sync.set_wpt_base(base_sha)
if sync.status not in ("complete", "wpt-merged"):
env.bz.comment(sync.bug, "Upstream PR merged by %s" % merged_by)
sync.finish("wpt-merged")
elif action == "reopened" or action == "open":
sync.pr_status = "open"