sync/command.py (937 lines of code) (raw):

from __future__ import annotations import argparse import itertools import json import os import re import subprocess import traceback import git from . import listen from .phab import listen as phablisten from . import log from .tasks import setup from .env import Environment from .gitutils import update_repositories from .load import get_syncs from .lock import RepoLock, SyncLock from typing import Any, Iterable, Set, cast, TYPE_CHECKING from git.repo.base import Repo if TYPE_CHECKING: from sync.sync import SyncProcess from sync.trypush import TryPush logger = log.get_logger(__name__) env = Environment() # HACK for docker if "SHELL" not in os.environ: os.environ["SHELL"] = "/bin/bash" def get_parser(): # type () -> argparse.ArgumentParser parser = argparse.ArgumentParser() subparsers = parser.add_subparsers() parser.add_argument("--pdb", action="store_true", help="Run in pdb") parser.add_argument("--profile", action="store", help="Run in profile, dump stats to " "specified filename") parser.add_argument("--config", action="append", help="Set a config option") parser_update = subparsers.add_parser("update", help="Update the local state by reading from GH + etc.") parser_update.add_argument("--sync-type", nargs="*", help="Type of sync to update", choices=["upstream", "downstream"]) parser_update.add_argument("--status", nargs="*", help="Statuses of syncs to update e.g. open") parser_update.set_defaults(func=do_update) parser_update_tasks = subparsers.add_parser("update-tasks", help="Update the state of try pushes") parser_update_tasks.add_argument("pr_id", nargs="?", type=int, help="Downstream PR id for sync to update") parser_update_tasks.set_defaults(func=do_update_tasks) parser_list = subparsers.add_parser("list", help="List all in-progress syncs") parser_list.add_argument("sync_type", nargs="*", help="Type of sync to list") parser_list.add_argument("--error", action="store_true", help="List only syncs with errors") parser_list.set_defaults(func=do_list) parser_detail = subparsers.add_parser("detail", help="List all in-progress syncs") parser_detail.add_argument("sync_type", help="Type of sync") parser_detail.add_argument("obj_id", type=int, help="Bug or PR id for the sync") parser_detail.set_defaults(func=do_detail) parser_landing = subparsers.add_parser("landing", help="Trigger the landing code") parser_landing.add_argument("--prev-wpt-head", help="First commit to use as the base") parser_landing.add_argument("--wpt-head", help="wpt commit to land to") parser_landing.add_argument("--no-push", dest="push", action="store_false", default=True, help="Don't actually push anything to gecko") parser_landing.add_argument("--include-incomplete", action="store_true", default=False, help="Consider PRs with incomplete syncs as landable.") parser_landing.add_argument("--accept-failures", action="store_true", default=False, help="Consider the latest try push a success even if it has " "more than the allowed number of failures") parser_landing.add_argument("--retry", action="store_true", default=False, help="Rebase onto latest central and do another try push") parser_landing.set_defaults(func=do_landing) parser_fetch = subparsers.add_parser("repo-config", help="Configure repo.") parser_fetch.set_defaults(func=do_configure_repos) parser_fetch.add_argument('repo', choices=['gecko', 'web-platform-tests', 'wpt-metadata']) parser_fetch.add_argument('config_file', help="Path to git config file to copy.") parser_listen = subparsers.add_parser("listen", help="Start pulse listener") parser_listen.set_defaults(func=do_start_listener) parser_listen = subparsers.add_parser("phab-listen", help="Start phabricator listener") parser_listen.set_defaults(func=do_start_phab_listener) parser_pr = subparsers.add_parser("pr", help="Update the downstreaming for a specific PR") parser_pr.add_argument("pr_ids", default=None, type=int, nargs="*", help="PR numbers") parser_pr.add_argument("--rebase", default=False, action="store_true", help="Force the PR to be rebase onto the integration branch") parser_pr.set_defaults(func=do_pr) parser_bug = subparsers.add_parser("bug", help="Update the upstreaming for a specific bug") parser_bug.add_argument("bug", default=None, nargs="?", help="Bug number") parser_bug.set_defaults(func=do_bug) parser_push = subparsers.add_parser("push", help="Run the push handler") parser_push.add_argument("--base-rev", help="Base revision for push or landing") parser_push.add_argument("--rev", help="Revision pushed") parser_push.add_argument("--process", dest="processes", action="append", choices=["landing", "upstream"], default=None, help="Select process to run on push (default: landing, upstream)") parser_push.set_defaults(func=do_push) parser_delete = subparsers.add_parser("delete", help="Delete a sync by bug number or pr") parser_delete.add_argument("sync_type", choices=["downstream", "upstream", "landing"], help="Type of sync to delete") parser_delete.add_argument("obj_ids", nargs="+", type=int, help="Bug or PR id for the sync(s)") parser_delete.add_argument("--seq-id", default=None, type=int, help="Sync sequence id") parser_delete.add_argument("--all", dest="delete_all", action="store_true", help="Delete all matches, not just most recent") parser_delete.add_argument("--try", dest="try_push", action="store_true", help="Delete try pushes for a sync") parser_delete.set_defaults(func=do_delete) parser_worktree = subparsers.add_parser("worktree", help="Create worktree for a sync") parser_worktree.add_argument("sync_type", help="Type of sync") parser_worktree.add_argument("obj_id", type=int, help="Bug or PR id for the sync") parser_worktree.add_argument("worktree_type", choices=["gecko", "wpt"], help="Repo type of worktree") parser_worktree.set_defaults(func=do_worktree) parser_status = subparsers.add_parser("status", help="Set the status of a Sync or Try push") parser_status.add_argument("obj_type", choices=["try", "sync"], help="Object type") parser_status.add_argument("sync_type", choices=["downstream", "upstream", "landing"], help="Sync type") parser_status.add_argument("obj_id", type=int, help="Object id (pr number or bug)") parser_status.add_argument("new_status", help="Status to set") parser_status.add_argument("--old-status", help="Current status") parser_status.add_argument("--seq-id", default=None, type=int, help="Sequence number") parser_status.set_defaults(func=do_status) parser_test = subparsers.add_parser("test", help="Run the tests with pytest") parser_test.add_argument("--no-flake8", dest="flake8", action="store_false", default=True, help="Don't run flake8") parser_test.add_argument("--no-pytest", dest="pytest", action="store_false", default=True, help="Don't run pytest") parser_test.add_argument("--no-mypy", dest="mypy", action="store_false", help="Don't run mypy") parser_test.add_argument("args", nargs="*", help="Arguments to pass to pytest") parser_test.set_defaults(func=do_test) parser_cleanup = subparsers.add_parser("cleanup", help="Run the cleanup code") parser_cleanup.set_defaults(func=do_cleanup) parser_skip = subparsers.add_parser("skip", help="Mark the sync for a PR as skip so that " "it doesn't have to complete before a landing") parser_skip.add_argument("pr_ids", type=int, nargs="*", help="PR ids for which to skip") parser_skip.set_defaults(func=do_skip) parser_notify = subparsers.add_parser("notify", help="Try to perform results notification " "for specified PRs") parser_notify.add_argument("pr_ids", nargs="*", type=int, help="PR ids for which to notify " "(tries to use the PR for the current working directory " "if not specified)") parser_notify.add_argument("--force", action="store_true", help="Run even if the sync is already marked as notified") parser_notify.set_defaults(func=do_notify) parser_landable = subparsers.add_parser("landable", help="Display commits from upstream " "that are able to land") parser_landable.add_argument("--prev-wpt-head", help="First commit to use as the base") parser_landable.add_argument("--quiet", action="store_false", dest="include_all", default=True, help="Only print the first PR with an error") parser_landable.add_argument("--blocked", action="store_true", dest="blocked", default=False, help="Only print unlandable PRs that are blocking") parser_landable.add_argument("--retrigger", action="store_true", default=False, help="Try to update all unlanded PRs that aren't Ready " "(requires --all)") parser_landable.add_argument("--include-incomplete", action="store_true", default=False, help="Consider PRs with incomplete syncs as landable.") parser_landable.set_defaults(func=do_landable) parser_retrigger = subparsers.add_parser("retrigger", help="Retrigger syncs that are not read") parser_retrigger.add_argument("--no-upstream", action="store_false", default=True, dest="upstream", help="Don't retrigger upstream syncs") parser_retrigger.add_argument("--no-downstream", action="store_false", default=True, dest="downstream", help="Don't retrigger downstream syncs") parser_retrigger.add_argument("--rebase", default=False, action="store_true", help="Force downstream syncs to be rebased onto the " "integration branch") parser_retrigger.set_defaults(func=do_retrigger) parser_try_push_add = subparsers.add_parser("add-try", help="Add a try push to an existing sync") parser_try_push_add.add_argument("try_rev", help="Revision on try") parser_try_push_add.add_argument("sync_type", nargs="?", choices=["downstream", "landing"], help="Revision on try") parser_try_push_add.add_argument("sync_id", nargs="?", type=int, help="PR id for downstream sync or bug number " "for upstream sync") parser_try_push_add.add_argument("--stability", action="store_true", help="Push is stability try push") parser_try_push_add.add_argument("--rebuild-count", default=None, type=int, help="Rebuild count") parser_try_push_add.set_defaults(func=do_try_push_add) parser_download_logs = subparsers.add_parser("download-logs", help="Download logs for a given try push") parser_download_logs.add_argument("--log-path", help="Destination path for the logs") parser_download_logs.add_argument("taskgroup_id", help="id of the taskgroup (decision task)") parser_download_logs.set_defaults(func=do_download_logs) parser_bugupdate = subparsers.add_parser("bug-update", help="Run the bug update task") parser_bugupdate.set_defaults(func=do_bugupdate) parser_build_index = subparsers.add_parser("build-index", help="Build indexes") parser_build_index.add_argument("index_name", nargs="*", help="Index names to rebuild (default all)") parser_build_index.set_defaults(func=do_build_index) parser_migrate = subparsers.add_parser("migrate", help="Migrate to latest data storage format") parser_migrate.set_defaults(func=do_migrate) return parser def sync_from_path(git_gecko: Repo, git_wpt: Repo) -> SyncProcess | None: from . import base git_work = git.Repo(os.curdir) branch = git_work.active_branch.name parts = branch.split("/") if not parts[0] == "sync": return None if parts[1] == "downstream": from . import downstream cls: type[SyncProcess] = downstream.DownstreamSync elif parts[1] == "upstream": from . import upstream cls = upstream.UpstreamSync elif parts[1] == "landing": from . import landing cls = landing.LandingSync else: raise ValueError process_name = base.ProcessName.from_path(branch) if process_name is None: return None return cls(git_gecko, git_wpt, process_name) def do_list(git_gecko: Repo, git_wpt: Repo, sync_type: str, error: bool = False, **kwargs: Any) -> None: from . import downstream from . import landing from . import upstream syncs: list[SyncProcess] = [] def filter_sync(sync): if error: return sync.error is not None and sync.status == "open" return True for cls in [upstream.UpstreamSync, downstream.DownstreamSync, landing.LandingSync]: if not sync_type or cls.sync_type in sync_type: syncs.extend(item for item in cls.load_by_status(git_gecko, git_wpt, "open") if filter_sync(item)) for sync in syncs: extra = [] if isinstance(sync, downstream.DownstreamSync): try_push = sync.latest_try_push if try_push: if try_push.try_rev: extra.append("https://treeherder.mozilla.org/#/jobs?repo=try&revision=%s" % try_push.try_rev) if try_push.taskgroup_id: extra.append(try_push.taskgroup_id) error_data = sync.error error_msg = "" if error_data is not None: msg = error_data["message"] if msg is not None: error_msg = ("ERROR: %s" % msg.split("\n", 1)[0]) print("%s %s %s bug:%s PR:%s %s%s" % ("*"if sync.error else " ", sync.sync_type, sync.status, sync.bug, sync.pr, " ".join(extra), error_msg)) def do_detail(git_gecko: Repo, git_wpt: Repo, sync_type: str, obj_id: int, **kwargs: Any) -> None: syncs = get_syncs(git_gecko, git_wpt, sync_type, obj_id) for sync in syncs: print(sync.output()) def do_landing(git_gecko: Repo, git_wpt: Repo, wpt_head: str | None = None, prev_wpt_head: str | None = None, include_incomplete: bool = False, accept_failures: bool = False, retry: bool = False, push: bool = True, **kwargs: Any ) -> None: from . import errors from . import landing from . import update current_landing = landing.current(git_gecko, git_wpt) def update_landing(): landing.update_landing(git_gecko, git_wpt, prev_wpt_head, wpt_head, include_incomplete, retry=retry, accept_failures=accept_failures) if current_landing and current_landing.latest_try_push: with SyncLock("landing", None) as lock: assert isinstance(lock, SyncLock) try_push = current_landing.latest_try_push logger.info("Found try push %s" % try_push.treeherder_url) if try_push.taskgroup_id is None: update.update_taskgroup_ids(git_gecko, git_wpt, try_push) assert try_push.taskgroup_id is not None with try_push.as_mut(lock), current_landing.as_mut(lock): if retry: update_landing() elif try_push.status == "open": tasks = try_push.tasks() try_result = current_landing.try_result(tasks=tasks) if try_result == landing.TryPushResult.pending: logger.info("Landing in bug %s is waiting for try results" % landing.bug) else: try: landing.try_push_complete(git_gecko, git_wpt, try_push, current_landing, allow_push=push, accept_failures=accept_failures, tasks=tasks) except errors.AbortError: # Don't need to raise an error here because # the logging is the important part return else: update_landing() else: update_landing() def do_update(git_gecko: Repo, git_wpt: Repo, sync_type: list[str] | None = None, status: list[str] | None = None, **kwargs: Any ) -> None: from . import downstream from . import update from . import upstream sync_classes: list[type] = [] if not sync_type: sync_type = ["upstream", "downstream"] for key in sync_type: sync_classes.append({"upstream": upstream.UpstreamSync, "downstream": downstream.DownstreamSync}[key]) update.update_from_github(git_gecko, git_wpt, sync_classes, status) def do_update_tasks(git_gecko: Repo, git_wpt: Repo, pr_id: int, **kwargs: Any) -> None: from . import update update.update_taskgroup_ids(git_gecko, git_wpt) update.update_tasks(git_gecko, git_wpt, pr_id) def do_pr(git_gecko: Repo, git_wpt: Repo, pr_ids: list[int], rebase: bool = False, **kwargs: Any) -> None: from . import update if not pr_ids: sync = sync_from_path(git_gecko, git_wpt) if not sync: logger.error("No PR id supplied and no sync for current path") return if not sync.pr: logger.error("No PR id supplied and no sync for current path") return pr_ids = [sync.pr] for pr_id in pr_ids: pr = env.gh_wpt.get_pull(pr_id) if pr is None: logger.error("PR %s not found" % pr_id) continue update_repositories(git_gecko, git_wpt) update.update_pr(git_gecko, git_wpt, pr, rebase) def do_bug(git_gecko: Repo, git_wpt: Repo, bug: int, **kwargs: Any) -> None: from . import update if bug is None: sync = sync_from_path(git_gecko, git_wpt) if sync is None: logger.error("No bug supplied and no sync for current path") return bug = sync.bug if bug is None: logger.error("Sync for current path has no bug") return update.update_bug(git_gecko, git_wpt, bug) def do_push(git_gecko: Repo, git_wpt: Repo, rev: str | None = None, base_rev: str | None = None, processes: list[str] | None = None, **kwargs: Any ) -> None: from . import update if rev is None: rev = git_gecko.commit(env.config["gecko"]["refs"]["mozilla-inbound"]).hexsha update.update_push(git_gecko, git_wpt, rev, base_rev=base_rev, processes=processes) def do_delete(git_gecko: Repo, git_wpt: Repo, sync_type: str, obj_ids: list[int], try_push: bool = False, delete_all: bool = False, seq_id: int | None = None, **kwargs) -> None: from . import trypush objs: Iterable[Any] = [] for obj_id in obj_ids: logger.info(f"{sync_type} {obj_id}") if try_push: objs = trypush.TryPush.load_by_obj(git_gecko, sync_type, obj_id, seq_id=seq_id) else: objs = get_syncs(git_gecko, git_wpt, sync_type, obj_id, seq_id=seq_id) if not delete_all and objs: objs = sorted(objs, key=lambda x: -int(x.process_name.seq_id))[:1] for obj in objs: with SyncLock.for_process(obj.process_name) as lock: assert isinstance(lock, SyncLock) with obj.as_mut(lock): obj.delete() def do_worktree(git_gecko: Repo, git_wpt: Repo, sync_type: str, obj_id: int, worktree_type: str, **kwargs: Any ) -> None: attr_name = worktree_type + "_worktree" syncs = get_syncs(git_gecko, git_wpt, sync_type, obj_id) for sync in syncs: with SyncLock.for_process(sync.process_name) as lock: assert isinstance(lock, SyncLock) with sync.as_mut(lock): getattr(sync, attr_name).get() def do_start_listener(git_gecko: Repo, git_wpt: Repo, **kwargs: Any) -> None: listen.run_pulse_listener(env.config) def do_start_phab_listener(git_gecko: Repo, git_wpt: Repo, **kwargs: Any) -> None: phablisten.run_phabricator_listener(env.config) def do_configure_repos(git_gecko: Repo, git_wpt: Repo, repo: str, config_file: str, **kwargs: Any) -> None: from . import repos r = repos.wrappers[repo](env.config) with RepoLock(r.repo()): r.configure(os.path.abspath(os.path.normpath(config_file))) def do_status(git_gecko: Repo, git_wpt: Repo, obj_type: str, sync_type: str, obj_id: int, new_status: str, seq_id: int | None = None, old_status: int | None = None, **kwargs: Any ) -> None: from . import upstream from . import downstream from . import landing from . import trypush objs: Iterable[TryPush | SyncProcess] = [] if obj_type == "try": try_pushes = trypush.TryPush.load_by_obj(git_gecko, sync_type, obj_id, seq_id=seq_id) if TYPE_CHECKING: objs = cast(Set[trypush.TryPush], try_pushes) else: objs = try_pushes else: if sync_type == "upstream": cls: type[SyncProcess] = upstream.UpstreamSync if sync_type == "downstream": cls = downstream.DownstreamSync if sync_type == "landing": cls = landing.LandingSync objs = cls.load_by_obj(git_gecko, git_wpt, obj_id, seq_id=seq_id) if old_status is not None: objs = {item for item in objs if item.status == old_status} if not objs: logger.error("No matching syncs found") for obj in objs: logger.info(f"Setting status of {obj.process_name} to {new_status}") with SyncLock.for_process(obj.process_name) as lock: assert isinstance(lock, SyncLock) with obj.as_mut(lock): obj.status = new_status # type: ignore def do_test(**kwargs: Any) -> None: if kwargs.pop("flake8", True): logger.info("Running flake8") cmd = ["flake8"] subprocess.check_call(cmd, cwd="/app/wpt-sync/sync/") subprocess.check_call(cmd, cwd="/app/wpt-sync/test/") if kwargs.pop("mypy", True): logger.info("Running mypy") cmd = ["mypy", "sync"] subprocess.check_call(cmd, cwd="/app/wpt-sync/") if kwargs.pop("pytest", True): args = kwargs["args"] if not any(item.startswith("test") for item in args): args.append("test") logger.info("Running pytest") cmd = ["pytest", "-s", "-v", "-p", "no:cacheprovider"] + args subprocess.check_call(cmd, cwd="/app/wpt-sync/") def do_cleanup(git_gecko: Repo, git_wpt: Repo, **kwargs: Any) -> None: from .tasks import cleanup cleanup() def do_skip(git_gecko: Repo, git_wpt: Repo, pr_ids: list[int], **kwargs: Any) -> None: from . import downstream if not pr_ids: sync = sync_from_path(git_gecko, git_wpt) if sync is None: logger.error("No pr_id supplied and no sync for current path") return syncs = {sync.pr: sync} else: syncs = {} for pr_id in pr_ids: sync = downstream.DownstreamSync.for_pr(git_gecko, git_wpt, pr_id) if sync is not None: assert sync.pr is not None syncs[sync.pr] = sync for sync_pr_id, sync in syncs.items(): if not isinstance(sync, downstream.DownstreamSync): logger.error("PR %s is for an upstream sync" % sync_pr_id) continue with SyncLock.for_process(sync.process_name) as lock: assert isinstance(lock, SyncLock) with sync.as_mut(lock): sync.skip = True # type: ignore def do_notify(git_gecko: Repo, git_wpt: Repo, pr_ids: list[int], force: bool = False, **kwargs: Any) -> None: from . import downstream if not pr_ids: sync = sync_from_path(git_gecko, git_wpt) if sync is None: logger.error("No pr_id supplied and no sync for current path") return syncs = {sync.pr: sync} else: syncs = {} for pr_id in pr_ids: sync = downstream.DownstreamSync.for_pr(git_gecko, git_wpt, pr_id) if sync is not None: assert sync.pr is not None syncs[sync.pr] = sync for sync_pr_id, sync in syncs.items(): if sync is None: logger.error("No active sync for PR %s" % sync_pr_id) elif not isinstance(sync, downstream.DownstreamSync): logger.error("PR %s is for an upstream sync" % sync_pr_id) continue else: with SyncLock.for_process(sync.process_name) as lock: assert isinstance(lock, SyncLock) with sync.as_mut(lock): sync.try_notify(force=force) def do_landable(git_gecko, git_wpt, prev_wpt_head: str | None = None, include_incomplete: bool = False, include_all: bool = True, retrigger: bool = False, blocked: bool = True, **kwargs: Any ) -> None: from . import update from .sync import LandableStatus from .downstream import DownstreamAction, DownstreamSync from .landing import current, load_sync_point, landable_commits, unlanded_with_type current_landing = current(git_gecko, git_wpt) if current_landing: print("Current landing will update head to %s" % current_landing.wpt_commits.head.sha1) prev_wpt_head = current_landing.wpt_commits.head.sha1 else: sync_point = load_sync_point(git_gecko, git_wpt) print("Last sync was to commit %s" % sync_point["upstream"]) prev_wpt_head = sync_point["upstream"] landable = landable_commits(git_gecko, git_wpt, prev_wpt_head, include_incomplete=include_incomplete) if landable is None: print("Next landing will not add any new commits") wpt_head = None else: wpt_head, commits = landable print("Next landing will update wpt head to %s, adding %i new PRs" % (wpt_head, len(commits))) if include_all or retrigger: unlandable = unlanded_with_type(git_gecko, git_wpt, wpt_head, prev_wpt_head) count = 0 for pr, _, status in unlandable: count += 1 if blocked and status in (LandableStatus.ready, LandableStatus.upstream, LandableStatus.skip): continue msg = status.reason_str() if status == LandableStatus.missing_try_results: sync = DownstreamSync.for_pr(git_gecko, git_wpt, pr) assert isinstance(sync, DownstreamSync) next_action = sync.next_action reason = next_action.reason_str() if next_action == DownstreamAction.wait_try: latest_try_push = sync.latest_try_push assert latest_try_push is not None reason = "{} {}".format(reason, latest_try_push.treeherder_url) elif next_action == DownstreamAction.manual_fix: latest_try_push = sync.latest_try_push assert latest_try_push is not None reason = "Manual fixup required {}".format( latest_try_push.treeherder_url) msg = f"{msg} ({reason})" elif status == LandableStatus.error: sync = DownstreamSync.for_pr(git_gecko, git_wpt, pr) assert sync is not None if sync.error: err_msg = sync.error["message"] or "" err_msg = err_msg.splitlines()[0] if err_msg else err_msg msg = f"{msg} ({err_msg})" print(f"{pr}: {msg}") print("%i PRs are unlandable:" % count) if retrigger: errors = update.retrigger(git_gecko, git_wpt, unlandable) if errors: print("The following PRs have errors:\n%s" % "\n".join( str(item) for item in errors)) def do_retrigger(git_gecko: Repo, git_wpt: Repo, upstream: bool = False, downstream: bool = False, rebase: bool = False, **kwargs: Any) -> None: from . import errors from . import update from . import upstream as upstream_sync from .landing import current, load_sync_point, unlanded_with_type update_repositories(git_gecko, git_wpt) if upstream: print("Retriggering upstream syncs with errors") for sync in upstream_sync.UpstreamSync.load_by_status(git_gecko, git_wpt, "open"): if sync.error: with SyncLock.for_process(sync.process_name) as lock: assert isinstance(lock, SyncLock) with sync.as_mut(lock): try: upstream_sync.update_sync(git_gecko, git_wpt, sync, repo_update=False) except errors.AbortError as e: print("Update failed:\n%s" % e) pass if downstream: print("Retriggering downstream syncs on master") current_landing = current(git_gecko, git_wpt) if current_landing is None: sync_point = load_sync_point(git_gecko, git_wpt) prev_wpt_head = sync_point["upstream"] else: prev_wpt_head = current_landing.wpt_commits.head.sha1 unlandable = unlanded_with_type(git_gecko, git_wpt, None, prev_wpt_head) pr_errors = update.retrigger(git_gecko, git_wpt, unlandable, rebase=rebase) if pr_errors: print("The following PRs have errors:\n%s" % "\n".join(str(item) for item in pr_errors)) def do_try_push_add(git_gecko: Repo, git_wpt: Repo, try_rev: str, stability: bool = False, sync_type: str | None = None, sync_id: int | None = None, rebuild_count: int | None = None, **kwargs: Any ) -> None: from . import downstream from . import landing from . import trypush sync = None if sync_type is None: sync = sync_from_path(git_gecko, git_wpt) if sync is None: logger.error("No sync type supplied and no sync for current path") return else: if sync_id is None: logger.error("A sync id is required when a sync type is supplied") return if sync_type == "downstream": sync = downstream.DownstreamSync.for_pr(git_gecko, git_wpt, sync_id) elif sync_type == "landing": syncs = landing.LandingSync.for_bug(git_gecko, git_wpt, sync_id, statuses=None, flat=True) if syncs: sync = syncs[0] else: logger.error("Invalid sync type %s" % sync_type) return if not sync: raise ValueError class FakeTry: def __init__(self, *_args, **_kwargs): pass def __enter__(self): return self def __exit__(self, *args): pass def push(self): return try_rev with SyncLock.for_process(sync.process_name) as lock: assert isinstance(lock, SyncLock) with sync.as_mut(lock): trypush = trypush.TryPush.create(lock, sync, None, stability=stability, try_cls=FakeTry, rebuild_count=rebuild_count, check_open=False) print("Now run an update for the sync") def do_download_logs(git_gecko: Repo, git_wpt: Repo, log_path: str, taskgroup_id: str, **kwargs: Any ) -> None: from . import tc from . import trypush import tempfile if log_path is None: log_path = tempfile.mkdtemp() taskgroup_id = tc.normalize_task_id(taskgroup_id) tasks = tc.TaskGroup(taskgroup_id) tasks.refresh() try_tasks = trypush.TryPushTasks(tasks) try_tasks.wpt_tasks.download_logs(os.path.join(log_path, taskgroup_id), ["wptreport.json"]) def do_bugupdate(git_gecko: Repo, git_wpt: Repo, **kwargs: Any) -> None: from . import handlers handlers.BugUpdateHandler(env.config)(git_gecko, git_wpt, {}) def do_build_index(git_gecko: Repo, git_wpt: Repo, index_name: str, **kwargs: Any) -> None: from . import index if not index_name: index_names = None else: index_names = set(index_name) for idx_cls in index.indicies: if index_names and idx_cls.name not in index_names: continue print("Building %s index" % idx_cls.name) idx = idx_cls(git_gecko) idx.build(git_gecko, git_wpt) def do_migrate(git_gecko, git_wpt, **kwargs): assert False, "Running this is probably a bad idea" # Migrate refs from the refs/<type>/<subtype>/<status>/<obj_id>[/<seq_id>] format # to refs/<type>/<subtype>/<obj_id>/<seq_id> from collections import defaultdict from . import base import pygit2 git2_gecko = pygit2.Repository(git_gecko.working_dir) git2_wpt = pygit2.Repository(git_wpt.working_dir) repo_map = {git_gecko: git2_gecko, git_wpt: git2_wpt} rev_repo_map = {value: key for key, value in repo_map.items()} special = {} sync_ref = re.compile("^refs/" "(?P<reftype>[^/]+)/" "(?P<obj_type>[^/]+)/" "(?P<subtype>[^/]+)/" "(?P<status>[^0-9/]+)/" "(?P<obj_id>[0-9]+)" "(?:/(?P<seq_id>[0-9]*))?$") print("Updating refs") seen = defaultdict(list) total_refs = 0 processing_refs = 0 for ref in itertools.chain(git_gecko.refs, git_wpt.refs): git2_repo = repo_map[ref.repo] ref = git2_repo.lookup_reference(ref.path) total_refs += 1 if ref.name in special: continue m = sync_ref.match(ref.name) if not m: continue if m.group("reftype") not in ("heads", "syncs"): continue if m.group("obj_type") not in ("sync", "try"): continue processing_refs += 1 assert m.group("subtype") in ("upstream", "downstream", "landing") assert int(m.group("obj_id")) > 0 new_ref = "refs/{}/{}/{}/{}/{}".format(m.group("reftype"), m.group("obj_type"), m.group("subtype"), m.group("obj_id"), m.group("seq_id") or "0") seen[(git2_repo, new_ref)].append((ref, m.group("status"))) duplicate = {} delete = set() for (repo, new_ref), refs in seen.items(): if len(refs) > 1: # If we have multiple /syncs/ ref, but only one /heads/ ref, use the corresponding one if new_ref.startswith("refs/syncs/"): has_head = set() no_head = set() for ref, status in refs: if "refs/heads/%s" % ref.name[len("refs/syncs/")] in repo.references: has_head.add((ref.name, status)) else: no_head.add((ref.name, status)) if len(has_head) == 1: print(" Using {} from {}".format(list(has_head)[0][0].path, " ".join(ref.name for ref, _ in refs))) refs[:] = list(has_head) delete |= {(repo, ref_name) for ref_name, _ in no_head} if len(refs) > 1: # If we have a later status, prefer that over an earlier one matches = {ref.name: sync_ref.match(ref.name) for ref, _ in refs} by_status = {matches[ref.name].group("status"): (ref, status) for (ref, status) in refs} for target_status in ["complete", "wpt-merged", "incomplete", "infra-fail"]: if target_status in by_status: print(" Using {} from {}".format(by_status[target_status][0].name, " ".join(ref.name for ref, _ in refs))) delete |= {(repo, ref.name) for ref, status in refs if ref != by_status[target_status]} refs[:] = [by_status[target_status]] if len(refs) > 1: duplicate[(repo, new_ref)] = refs if duplicate: print(" ERROR! Got duplicate %s source refs" % len(duplicate)) for (repo, new_ref), refs in duplicate.items(): print(" {} {}: {}".format(repo.working_dir, new_ref, " ".join(ref.name for ref, _ in refs))) return for (repo, new_ref), refs in seen.items(): ref, _ = refs[0] if ref.name.startswith("refs/syncs/sync/"): if "refs/heads/%s" % ref.name[len("refs/syncs/"):] not in repo.references: # Try with the post-migration head m = sync_ref.match(ref.name) ref_path = "refs/heads/{}/{}/{}/{}".format(m.group("obj_type"), m.group("subtype"), m.group("obj_id"), m.group("seq_id")) if ref_path not in repo.references: print(" Missing head %s" % (ref.name)) created = 0 for i, ((repo, new_ref), refs) in enumerate(seen.items()): assert len(refs) == 1 ref, status = refs[0] print("Updating %s" % ref.name) print(" Moving %s to %s %d/%d" % (ref.name, new_ref, i + 1, len(seen))) if "/syncs/" in ref.name: ref_obj = ref.peel().id data = json.loads(ref.peel().tree["data"].data) if data.get("status") != status: with base.CommitBuilder(rev_repo_map[repo], "Add status", ref=ref.name) as commit: now_ref_obj = ref.peel().id if ref_obj != now_ref_obj: data = json.loads(ref.peel().tree["data"].data) data["status"] = status commit.add_tree({"data": json.dumps(data)}) print("Making commit") commit = commit.get().sha1 else: commit = ref.peel().id print(" Got commit %s" % commit) if new_ref not in repo.references: print(f" Rename {ref.name} {new_ref}") repo.references.create(new_ref, commit) created += 1 else: print(" %s already exists" % new_ref) delete.add((repo, ref.name)) for repo, ref_name in delete: print(" Deleting %s" % ref_name) repo.references.delete(ref_name) print("%s total refs" % total_refs) print("%s refs to process" % processing_refs) print("%s refs to create" % created) print("%s refs to delete" % len(delete)) print("Moving to single history") # Migrate from refs/syncs/ to paths sync_ref = re.compile("^refs/" "syncs/" "(?P<obj_type>[^/]*)/" "(?P<subtype>[^/]*)/" "(?P<obj_id>[^/]*)/" "(?P<seq_id>[0-9]*)$") delete = set() initial_ref = git.Reference(git_gecko, "refs/syncs/data") if initial_ref.is_valid(): existing_paths = {item.path for item in initial_ref.commit.tree.traverse()} else: existing_paths = set() for ref in git_gecko.refs: m = sync_ref.match(ref.path) if not m: continue path = "{}/{}/{}/{}".format(m.group("obj_type"), m.group("subtype"), m.group("obj_id"), m.group("seq_id")) if path not in existing_paths: with base.CommitBuilder(git_gecko, "Migrate %s to single ref for data" % ref.path, ref="refs/syncs/data") as commit: data = json.load(ref.commit.tree["data"].data_stream) print(f" Moving path {path}") tree = {path: json.dumps(data)} commit.add_tree(tree) delete.add(ref.path) git2_repo = repo_map[git_gecko] for ref_name in delete: git2_repo.references.delete(ref_name) def set_config(opts: list[str]) -> None: for opt in opts: keys, value = opt.split("=", 1) key_parts = keys.split(".") target = env.config for key in key_parts[:-1]: target = target[key] logger.info("Setting config option %s from %s to %s" % (".".join(keys), target[keys[-1]], value)) target[keys[-1]] = value def main() -> None: parser = get_parser() args = parser.parse_args() if args.profile: import cProfile prof = cProfile.Profile() prof.enable() if args.config: set_config(args.config) try: func_name = args.func.__name__ except AttributeError: func_name = None if func_name == "do_test": def func(**kwargs): return do_test(**kwargs) else: git_gecko, git_wpt = setup() def func(**kwargs): return args.func(git_gecko, git_wpt, **kwargs) try: func(**vars(args)) except Exception: if args.pdb: traceback.print_exc() import pdb pdb.post_mortem() else: raise finally: if args.profile: prof.dump_stats(args.profile) prof.print_stats() if __name__ == "__main__": main()