sync/update.py (298 lines of code) (raw):
from __future__ import annotations
from . import downstream
from . import landing
from . import log
from . import tc
from . import trypush
from . import upstream
from .env import Environment
from .load import get_bug_sync, get_pr_sync
from .lock import SyncLock
from .gitutils import update_repositories
from .errors import AbortError
from .repos import cinnabar
from git import Repo
from github.PullRequest import PullRequest
from typing import Any, Iterable, TYPE_CHECKING
if TYPE_CHECKING:
from sync.sync import SyncProcess
from sync.trypush import TryPush
env = Environment()
logger = log.get_logger(__name__)
def handle_sync(task: str, body: dict[str, Any]) -> None:
from .tasks import get_handlers, setup
handlers = get_handlers()
if task in handlers:
logger.info("Running task %s" % task)
git_gecko, git_wpt = setup()
try:
handlers[task](git_gecko, git_wpt, body)
except Exception:
logger.error(body)
raise
else:
logger.error("No handler for %s" % task)
def construct_event(name: str, payload: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
event = {"event": name, "payload": payload}
event.update(**kwargs)
return event
def schedule_pr_task(action: str, pr: PullRequest, repo_update: bool = True) -> None:
event = construct_event("pull_request",
{"action": action, "number": pr.number, "pull_request": pr.raw_data},
_wptsync={"repo_update": repo_update})
logger.info(f"Action {action} for pr {pr.number}")
args = ("github", event)
handle_sync(*args)
def schedule_check_run_task(head_sha: str, name: str, check_run: dict[str, Any],
repo_update: bool = True) -> None:
check_run_data = check_run.copy()
del check_run_data["required"]
check_run_data["name"] = name
check_run_data["head_sha"] = head_sha
event = construct_event("check_run",
{"action": "completed",
"check_run": check_run_data},
_wptsync={"repo_update": repo_update})
logger.info("Status changed for commit %s" % head_sha)
args = ("github", event)
handle_sync(*args)
def update_for_status(pr: PullRequest, repo_update: bool = True) -> None:
for name, check_run in env.gh_wpt.get_check_runs(pr.number).items():
if check_run["required"]:
schedule_check_run_task(pr.head.sha, name, check_run)
return
def update_for_action(pr: PullRequest, action: str, repo_update: bool = True) -> None:
event = construct_event("pull_request",
{"action": action,
"number": pr.number,
"pull_request": pr.raw_data,
},
_wptsync={"repo_update": repo_update})
logger.info(f"Running action {action} for PR {pr.number}")
handle_sync("github", event)
def convert_rev(git_gecko: Repo, rev: str) -> tuple[str, str]:
try:
git_rev = cinnabar(git_gecko).hg2git(rev)
hg_rev = rev
except ValueError:
# This was probably a git rev
try:
hg_rev = cinnabar(git_gecko).git2hg(rev)
except ValueError:
raise ValueError(f"{rev} is not a valid git or hg rev")
git_rev = rev
return git_rev, hg_rev
def update_push(git_gecko: Repo, git_wpt: Repo, rev: str, base_rev: str | None = None,
processes: list[str] | None = None) -> None:
git_rev, hg_rev = convert_rev(git_gecko, rev)
git_rev_commit = git_gecko.rev_parse(git_rev)
hg_rev_base: str | None = None
if base_rev is not None:
_, hg_rev_base = convert_rev(git_gecko, base_rev)
if git_gecko.is_ancestor(git_rev_commit,
git_gecko.rev_parse(env.config["gecko"]["refs"]["central"])):
routing_key = "mozilla-central"
elif git_gecko.is_ancestor(git_rev_commit,
env.config["gecko"]["refs"]["autoland"]):
routing_key = "integration/autoland"
kwargs: dict[str, Any] = {"_wptsync": {}}
if hg_rev_base is not None:
kwargs["_wptsync"]["base_rev"] = hg_rev_base
if processes is not None:
kwargs["_wptsync"]["processes"] = processes
event = construct_event("push", {"data": {"heads": [hg_rev]}},
_meta={"routing_key": routing_key},
**kwargs)
args = ("push", event)
handle_sync(*args)
def update_pr(git_gecko: Repo, git_wpt: Repo, pr: PullRequest, force_rebase: bool = False,
repo_update: bool = True) -> None:
sync = get_pr_sync(git_gecko, git_wpt, pr.number)
if sync and sync.status == "complete":
logger.info("Sync already landed")
return
if sync:
logger.info("sync status %s" % sync.landable_status)
sync_point = landing.load_sync_point(git_gecko, git_wpt)
if not sync:
# If this looks like something that came from gecko, create
# a corresponding sync
with SyncLock("upstream", None) as lock:
assert isinstance(lock, SyncLock)
upstream_sync = upstream.UpstreamSync.from_pr(lock,
git_gecko,
git_wpt,
pr.number,
pr.body)
if upstream_sync is not None:
with upstream_sync.as_mut(lock):
assert isinstance(lock, SyncLock)
upstream.update_pr(git_gecko,
git_wpt,
upstream_sync,
pr.state,
pr.merged)
else:
if pr.state != "open" and not pr.merged:
return
schedule_pr_task("opened", pr, repo_update=repo_update)
update_for_status(pr, repo_update=repo_update)
elif isinstance(sync, downstream.DownstreamSync):
with SyncLock.for_process(sync.process_name) as lock:
assert isinstance(lock, SyncLock)
with sync.as_mut(lock):
if force_rebase:
central = git_gecko.rev_parse(sync.gecko_landing_branch())
commit = git_gecko.rev_parse(sync.gecko_commits.base.sha1)
# Check if the current central is already an ancestor of the commit
if git_gecko.is_ancestor(central, commit):
sync.gecko_rebase(sync.gecko_integration_branch())
else:
sync.gecko_rebase(sync.gecko_landing_branch())
if len(sync.wpt_commits) == 0:
sync.update_wpt_commits()
if not sync.bug and not (pr.state == "closed" and not pr.merged):
sync.create_bug(git_wpt, pr.number, pr.title, pr.body)
if pr.state == "open" or pr.merged:
if pr.head.sha != sync.wpt_commits.head:
# Upstream has different commits, so run a push handler
schedule_pr_task("push", pr, repo_update=repo_update)
elif sync.latest_valid_try_push:
logger.info("Treeherder url %s" % sync.latest_valid_try_push.treeherder_url)
if not sync.latest_valid_try_push.taskgroup_id:
update_taskgroup_ids(git_gecko, git_wpt,
sync.latest_valid_try_push)
if (sync.latest_valid_try_push.taskgroup_id and
not sync.latest_valid_try_push.status == "complete"):
update_tasks(git_gecko, git_wpt, sync=sync)
if not sync.latest_valid_try_push.taskgroup_id:
logger.info("Try push doesn't have a complete decision task")
return
if not pr.merged:
update_for_status(pr, repo_update=repo_update)
else:
update_for_action(pr, "closed", repo_update=repo_update)
elif isinstance(sync, upstream.UpstreamSync):
with SyncLock.for_process(sync.process_name) as lock:
assert isinstance(lock, SyncLock)
with sync.as_mut(lock):
merge_sha = pr.merge_commit_sha if pr.merged else None
upstream.update_pr(git_gecko, git_wpt, sync, pr.state, merge_sha)
sync.try_land_pr()
if merge_sha:
if git_wpt.is_ancestor(git_wpt.rev_parse(merge_sha),
git_wpt.rev_parse(sync_point["upstream"])):
# This sync already landed, so it should be finished
sync.finish()
else:
if sync.status != "complete":
sync.status = "wpt-merged" # type: ignore
def update_bug(git_gecko: Repo, git_wpt: Repo, bug: int) -> None:
syncs = get_bug_sync(git_gecko, git_wpt, bug)
if not syncs:
raise ValueError("No sync for bug %s" % bug)
for status in upstream.UpstreamSync.statuses:
syncs_for_status = syncs.get(status)
if not syncs_for_status:
continue
with SyncLock("upstream", None) as lock:
assert isinstance(lock, SyncLock)
for sync in syncs_for_status:
if isinstance(sync, upstream.UpstreamSync):
with sync.as_mut(lock):
upstream.update_sync(git_gecko, git_wpt, sync)
else:
logger.warning("Can't update sync %s" % sync)
def update_from_github(git_gecko: Repo, git_wpt: Repo, sync_classes: list[type[SyncProcess]],
statuses: list[str] | None = None) -> None:
if statuses is None:
statuses = ["*"]
update_repositories(git_gecko, git_wpt)
for cls in sync_classes:
for status in statuses:
if status != "*" and status not in cls.statuses:
continue
syncs = cls.load_by_status(git_gecko,
git_wpt,
status)
for sync in syncs:
if not sync.pr:
continue
logger.info("Updating sync for PR %s" % sync.pr)
pr = env.gh_wpt.get_pull(sync.pr)
update_pr(git_gecko, git_wpt, pr)
def update_taskgroup_ids(git_gecko: Repo, git_wpt: Repo,
try_push: TryPush | None = None) -> None:
if try_push is None:
try_pushes = trypush.TryPush.load_all(git_gecko)
else:
try_pushes = [try_push]
# Make this invalid so it's obvious if we try to use it below
try_push = None
for try_push_item in try_pushes:
if not try_push_item.taskgroup_id:
logger.info("Setting taskgroup id for try push %s" % try_push_item)
if try_push_item.try_rev is None:
logger.warning("Try push %s has no associated revision" %
try_push_item.process_name)
continue
taskgroup_id, state, runs = tc.get_taskgroup_id("try", try_push_item.try_rev)
logger.info("Got taskgroup id %s" % taskgroup_id)
if state in ("completed", "failed", "exception"):
msg = {"status": {"taskId": taskgroup_id,
"taskGroupId": taskgroup_id,
"state": state,
"runs": runs},
"task": {"tags": {"kind": "decision-task"}},
"runId": len(runs) - 1,
"version": 1}
handle_sync("decision-task", msg)
else:
logger.warning("Not setting taskgroup id because decision task is in state %s" %
state)
def update_tasks(git_gecko: Repo, git_wpt: Repo, pr_id: int | None = None,
sync: SyncProcess | None = None) -> None:
logger.info("Running update_tasks%s" % ("for PR %s" % pr_id if pr_id else ""))
syncs: Iterable[SyncProcess] = []
if not sync:
if pr_id is not None:
pr_syncs = downstream.DownstreamSync.load_by_obj(git_gecko, git_wpt, pr_id)
if not pr_syncs:
logger.error("No sync for pr_id %s" % pr_id)
return
assert len(pr_syncs) == 1
syncs = [pr_syncs.pop()]
else:
current_landing = landing.current(git_gecko, git_wpt)
syncs = downstream.DownstreamSync.load_by_status(git_gecko, git_wpt, "open")
if current_landing is not None:
syncs.add(current_landing)
else:
syncs = [sync]
for sync in syncs:
try_push = sync.latest_try_push
if try_push and try_push.taskgroup_id:
try:
handle_sync("taskgroup", {"taskGroupId": try_push.taskgroup_id})
except AbortError:
pass
def retrigger(git_gecko: Repo, git_wpt: Repo, unlandable_prs: list[tuple[int, list[Any], str]],
rebase: bool = False) -> list[int]:
from .sync import LandableStatus
retriggerable_prs = [(pr_id, commits, status)
for (pr_id, commits, status) in unlandable_prs
if status not in (LandableStatus.ready,
LandableStatus.skip,
LandableStatus.upstream,
LandableStatus.no_pr)]
errors = []
for pr_data in retriggerable_prs:
error = do_retrigger(git_gecko, git_wpt, pr_data, rebase=rebase)
if error:
errors.append(error)
return errors
def do_retrigger(git_gecko: Repo, git_wpt: Repo, pr_data: tuple[int, list[Any], str],
rebase: bool = False) -> int | None:
pr_id, commits, status = pr_data
try:
logger.info(f"Retriggering {pr_id} (status {status})")
pr = env.gh_wpt.get_pull(pr_id)
if pr is None:
return pr_id
update_pr(git_gecko, git_wpt, pr, repo_update=False, force_rebase=rebase)
except Exception:
return pr_id
return None