sync/sync.py (711 lines of code) (raw):

from __future__ import annotations import enum import itertools import traceback from collections import defaultdict import git from . import bug from . import log from .base import ( BranchRefObject, CommitBuilder, IdentityMap, ProcessData, ProcessName, ProcessNameIndex, ) from .commit import GeckoCommit, WptCommit from .env import Environment from .errors import AbortError from .lock import MutGuard, mut, constructor from .repos import cinnabar from .worktree import Worktree from typing import (Any, Iterable, Iterator, Sequence, cast, TYPE_CHECKING) from git.repo.base import Repo from os import PathLike from typing_extensions import Literal if TYPE_CHECKING: from sync.commit import Commit from sync.index import Index from sync.lock import SyncLock from sync.trypush import TryPush try: from typing import (List, Tuple, Type, overload) except ImportError: def typing(f): return f env = Environment() logger = log.get_logger(__name__) class CommitFilter: """Filter of a range of commits""" def __init__(self) -> None: self._commits: dict[str, bool] = {} def path_filter(self) -> Sequence[PathLike]: """Path filter for the commit range, returning a list of paths that match. An empty list matches all paths.""" return [] def filter_commit(self, commit: Commit) -> bool: """Per-commit filter. :param commit: wpt_commit.Commit object :returns: A boolean indicating whether to include the commit""" if commit.sha1 not in self._commits: self._commits[commit.sha1] = self._filter_commit(commit) return self._commits[commit.sha1] def _filter_commit(self, commit: Commit) -> bool: return True def filter_commits( self, commits: Iterable[Commit], ) -> Sequence[Commit]: """Filter that applies to the set of commits that were selected by the per-commit filter. Useful for e.g. removing backouts from a set of commits. :param commits: List of wpt_commit.Commit objects :returns: List of commits to return""" return list(commits) class CommitRange: # This class should probably be generic in the commit subtype it returns # but trying that ended up with lots of errors, so for now it just specifies # Commit, meaning that users need to cast to use properies of subtypes """Range of commits in a specific repository. This is assumed to be between a tag and a branch sharing a single process_name i.e. the tag represents the base commit of the branch. TODO: Maybe just store the base branch name in the tag rather than making it an actual pointer since that works better with rebases. """ def __init__( self, repo: Repo, base: str | Commit, head_ref: BranchRefObject, commit_cls: type, commit_filter: CommitFilter, ) -> None: self.repo = repo # This ended up a little confused because these used to both be # VcsRefObjects, but now the base is stored as a ref not associated # with a process_name. This should be refactored. self._base_commit: Commit | None = None self._base = base self._head_ref = head_ref self.commit_cls = commit_cls self.commit_filter = commit_filter # Cache for the commits in this range self._commits: Sequence[Commit] = [] self._head_sha: str | None = None self._base_sha: str | None = None self._lock = None def as_mut(self, lock: SyncLock) -> MutGuard: return MutGuard(lock, self, [self._head_ref]) @property def lock_key(self) -> tuple[str, str]: return (self._head_ref.name.subtype, self._head_ref.name.obj_id) @overload def __getitem__( self, index: int ) -> Commit: pass @overload # noqa: F811 def __getitem__( self, index: slice ) -> Sequence[Commit]: pass def __getitem__( self, # noqa: F811 index: int | slice, ) -> Commit | Sequence[Commit]: return self.commits[index] def __iter__(self) -> Iterator[Commit]: yield from self.commits def __len__(self) -> int: return len(self.commits) def __contains__(self, other_commit: Any) -> bool: for commit in self: if commit == other_commit: return True return False @property def commits(self) -> Sequence[Commit]: if self._commits: if self.head.sha1 == self._head_sha and self.base.sha1 == self._base_sha: return self._commits revish = f"{self.base.sha1}..{self.head.sha1}" commits: list[Commit] = [] for git_commit in self.repo.iter_commits( revish, reverse=True, paths=self.commit_filter.path_filter() ): commit = self.commit_cls(self.repo, git_commit) if not self.commit_filter.filter_commit(commit): continue commits.append(commit) commits = self.commit_filter.filter_commits(commits) # type: ignore self._commits = commits self._head_sha = self.head.sha1 self._base_sha = self.base.sha1 return self._commits @property def files_changed(self) -> set[str]: # We avoid using diffs because that's harder to get right in the face of merges files = set() for commit in self.commits: commit_files = self.repo.git.show(commit.sha1, name_status=True, format="") for item in commit_files.splitlines(): parts = item.split("\t") for part in parts[1:]: files.add(part.strip()) return files @property def base(self) -> Commit: if self._base_commit is None: self._base_commit = self.commit_cls(self.repo, self._base) assert self._base_commit is not None return self._base_commit @base.setter # type: ignore @mut() def base(self, value: str) -> None: # Note that this doesn't actually update the stored value of the base # anywhere, unlike the head setter which will update the associated ref self._commits = [] self._base_sha = value self._base = self.commit_cls(self.repo, value) self._base_commit = None @property def head(self) -> Commit: head_commit = self._head_ref.commit assert head_commit is not None if TYPE_CHECKING: cast(Commit, head_commit) return head_commit @head.setter # type: ignore @mut() def head(self, value: Commit) -> None: self._head_ref.commit = value # type: ignore @enum.unique class LandableStatus(enum.Enum): ready = 0 no_pr = 1 upstream = 2 no_sync = 3 error = 4 missing_try_results = 5 skip = 6 def reason_str(self) -> str: return { LandableStatus.ready: "Ready", LandableStatus.no_pr: "No PR", LandableStatus.upstream: "From gecko", LandableStatus.no_sync: "No sync created", LandableStatus.error: "Error", LandableStatus.missing_try_results: "Incomplete try results", LandableStatus.skip: "Skip", }.get(self, "Unknown") class SyncPointName(metaclass=IdentityMap): """Like a process name but for pointers that aren't associated with a specific sync object, but with a general process e.g. the last update point for an upstream sync.""" def __init__(self, subtype: str, obj_id: str) -> None: self._obj_type = "sync" self._subtype = subtype self._obj_id = str(obj_id) self._lock = None @property def obj_type(self) -> str: return self._obj_type @property def subtype(self) -> str: return self._subtype @property def obj_id(self) -> str: return self._obj_id @classmethod def _cache_key(cls, subtype: str, obj_id: str) -> tuple[str, str]: return (subtype, str(obj_id)) def key(self) -> tuple[str, str]: return (self._subtype, self._obj_id) def __str__(self) -> str: return f"{self._obj_type}/{self._subtype}/{self._obj_id}" def path(self): return f"{self._obj_type}/{self._subtype}/{self._obj_id}" class SyncData(ProcessData): obj_type = "sync" class SyncProcess(metaclass=IdentityMap): obj_type: str = "sync" sync_type: str = "*" # Either "bug" or "pr" obj_id: str | None = None statuses: tuple[str, ...] = () status_transitions: list[tuple[str, str]] = [] # Can multiple syncs have the same obj_id multiple_syncs: bool = False def __init__(self, git_gecko: Repo, git_wpt: Repo, process_name: ProcessName) -> None: self._lock: SyncLock | None = None assert process_name.obj_type == self.obj_type assert process_name.subtype == self.sync_type self.git_gecko = git_gecko self.git_wpt = git_wpt self.process_name = process_name self.data = SyncData(git_gecko, process_name) self.gecko_commits = CommitRange( git_gecko, self.data["gecko-base"], BranchRefObject(git_gecko, self.process_name, commit_cls=GeckoCommit), commit_cls=GeckoCommit, commit_filter=self.gecko_commit_filter(), ) self.wpt_commits = CommitRange( git_wpt, self.data["wpt-base"], BranchRefObject(git_wpt, self.process_name, commit_cls=WptCommit), commit_cls=WptCommit, commit_filter=self.wpt_commit_filter(), ) self.gecko_worktree = Worktree(git_gecko, process_name) self.wpt_worktree = Worktree(git_wpt, process_name) # Hold onto indexes for the lifetime of the SyncProcess object self._indexes = {ProcessNameIndex(git_gecko)} @classmethod def _cache_key(cls, git_gecko: Repo, git_wpt: Repo, process_name: ProcessName) -> tuple[str, str, str, str]: return process_name.key() def as_mut(self, lock: SyncLock) -> MutGuard: return MutGuard( lock, self, [ self.data, self.gecko_commits, self.wpt_commits, self.gecko_worktree, self.wpt_worktree, ], ) @property def lock_key(self) -> tuple[str, str]: return (self.process_name.subtype, self.process_name.obj_id) def __repr__(self) -> str: return "<{} {} {}>".format( self.__class__.__name__, self.sync_type, self.process_name ) @classmethod def for_pr( cls, git_gecko: Repo, git_wpt: Repo, pr_id: str | int, ) -> SyncProcess | None: from . import index idx = index.PrIdIndex(git_gecko) process_name = idx.get((str(pr_id),)) if process_name and process_name.subtype == cls.sync_type: return cls(git_gecko, git_wpt, process_name) return None @overload # noqa: F811 @classmethod def for_bug( cls, git_gecko: Repo, git_wpt: Repo, bug: int, statuses: Iterable[str] | None, flat: Literal[True], ) -> list[SyncProcess]: pass @overload # noqa: F811 @classmethod def for_bug( cls, git_gecko: Repo, git_wpt: Repo, bug: int, statuses: Iterable[str] | None, flat: Literal[False], ) -> dict[str, set[SyncProcess]]: pass @classmethod # noqa: F811 def for_bug( cls, git_gecko: Repo, git_wpt: Repo, bug: int, statuses: Iterable[str] | None = None, flat: bool = False, ) -> dict[str, set[SyncProcess]] | list[SyncProcess]: """Get the syncs for a specific bug. :param bug: The bug number for which to find syncs. :param statuses: An optional list of sync statuses to include. Defaults to all statuses. :param flat: Return a flat list of syncs instead of a dictionary. :returns: By default a dictionary of {status: [syncs]}, but if flat is true, just returns a list of matching syncs. """ from . import index bug_str = str(bug) statuses = set(statuses) if statuses is not None else set(cls.statuses) rv = defaultdict(set) idx_key = (bug_str,) if len(statuses) == 1: idx_key == (bug_str, list(statuses)[0]) idx = index.BugIdIndex(git_gecko) process_names = idx.get(idx_key) for process_name in process_names: if process_name.subtype == cls.sync_type: sync = cls(git_gecko, git_wpt, process_name) if sync.status in statuses: rv[sync.status].add(sync) if flat: return list(itertools.chain.from_iterable(rv.values())) return rv @classmethod def load_by_obj(cls, git_gecko: Repo, git_wpt: Repo, obj_id: int, seq_id: int | None = None) -> set[SyncProcess]: process_names = ProcessNameIndex(git_gecko).get( cls.obj_type, cls.sync_type, str(obj_id) ) if seq_id is not None: process_names = {item for item in process_names if item.seq_id == seq_id} return {cls(git_gecko, git_wpt, process_name) for process_name in process_names} @classmethod def load_by_status(cls, git_gecko: Repo, git_wpt: Repo, status: str) -> set[SyncProcess]: from . import index idx = index.SyncIndex(git_gecko) key = (cls.obj_type, cls.sync_type, status) process_names = idx.get(key) rv = set() for process_name in process_names: rv.add(cls(git_gecko, git_wpt, process_name)) return rv # End of getter methods @classmethod def prev_gecko_commit( cls, git_gecko: Repo, repository_name: str, base_rev: str | None = None, ) -> tuple[BranchRefObject, GeckoCommit]: """Get the last gecko commit processed by a sync process. :param str repository_name: The name of the gecko branch being processed :param base_rev: The SHA1 for a commit to use as the previous commit, overriding the stored value :returns: Tuple of (LastSyncPoint, GeckoCommit) for the previous gecko commit. In case the base_rev override is passed in the LastSyncPoint may be pointing at a different commit to the returned gecko commit""" last_sync_point = cls.last_sync_point(git_gecko, repository_name) assert last_sync_point.commit is not None logger.info("Last sync point was %s" % last_sync_point.commit.sha1) if base_rev is None: prev_commit = last_sync_point.commit # TODO: Would be good if we could link the repo class to the commit type # in the type system assert isinstance(prev_commit, GeckoCommit) else: prev_commit = GeckoCommit(git_gecko, cinnabar(git_gecko).hg2git(base_rev)) return last_sync_point, prev_commit @classmethod def last_sync_point(cls, git_gecko: Repo, repository_name: str) -> BranchRefObject: assert "/" not in repository_name name = SyncPointName(cls.sync_type, repository_name) return BranchRefObject(git_gecko, name, commit_cls=GeckoCommit) @property def landable_status(self) -> LandableStatus: raise NotImplementedError def _output_data(self) -> list[str]: rv = [ "{}{}".format("*" if self.error else " ", self.process_name.path()), "gecko range: {}..{}".format( self.gecko_commits.base.sha1, self.gecko_commits.head.sha1 ), "wpt range: {}..{}".format( self.wpt_commits.base.sha1, self.wpt_commits.head.sha1 ), ] if self.error: rv.extend( ["ERROR:", self.error["message"] or "", self.error["stack"] or ""] ) landable_status = self.landable_status if landable_status: rv.append("Landable status: %s" % landable_status.reason_str()) for key, value in sorted(self.data.items()): if key != "error": rv.append(f"{key}: {value}") try_pushes = self.try_pushes() if try_pushes: try_pushes = sorted(try_pushes, key=lambda x: x.process_name.seq_id) rv.append("Try pushes:") for try_push in try_pushes: rv.append(f" {try_push.status} {try_push.treeherder_url}") return rv def output(self) -> str: return "\n".join(self._output_data()) def __ne__(self, other: Any) -> bool: return not self == other def set_wpt_base(self, ref: str) -> None: # This is kind of an appaling hack try: self.git_wpt.commit(ref) except Exception: raise ValueError self.data["wpt-base"] = ref self.wpt_commits._base = WptCommit(self.git_wpt, ref) @staticmethod def gecko_integration_branch() -> str: return env.config["gecko"]["refs"][env.config["gecko"]["landing"]] @staticmethod def gecko_landing_branch() -> str: return env.config["gecko"]["refs"]["central"] def gecko_commit_filter(self) -> CommitFilter: return CommitFilter() def wpt_commit_filter(self) -> CommitFilter: return CommitFilter() @property def branch_name(self) -> str: return self.process_name.path() @property def status(self) -> str: return self.data["status"] @status.setter # type: ignore @mut() def status(self, value: str) -> None: if value not in self.statuses: raise ValueError("Unrecognised status %s" % value) current = self.status if current == value: return if (current, value) not in self.status_transitions: raise ValueError( f"Tried to change status from {current} to {value}" ) from . import index index.SyncIndex(self.git_gecko).delete( index.SyncIndex.make_key(self), self.process_name ) index.BugIdIndex(self.git_gecko).delete( index.BugIdIndex.make_key(self), self.process_name ) self.data["status"] = value index.SyncIndex(self.git_gecko).insert( index.SyncIndex.make_key(self), self.process_name ) index.BugIdIndex(self.git_gecko).insert( index.BugIdIndex.make_key(self), self.process_name ) @property def bug(self) -> int | None: if self.obj_id == "bug": return int(self.process_name.obj_id) else: bug = self.data.get("bug") if bug: return int(bug) return None @bug.setter # type: ignore @mut() def bug(self, value: int) -> None: from . import index if self.obj_id == "bug": raise AttributeError("Can't set attribute") old_key = None if self.data.get("bug"): old_key = index.BugIdIndex.make_key(self) self.data["bug"] = value new_key = index.BugIdIndex.make_key(self) index.BugIdIndex(self.git_gecko).move(old_key, new_key, self.process_name) @property def pr(self) -> int | None: if self.obj_id == "pr": return int(self.process_name.obj_id) else: pr = self.data.get("pr") if pr is not None: return int(pr) return None @pr.setter # type: ignore @mut() def pr(self, value: int) -> None: from . import index if self.obj_id == "pr": raise AttributeError("Can't set attribute") old_key = None if self.data.get("pr"): old_key = index.PrIdIndex.make_key(self) self.data["pr"] = value new_key = index.PrIdIndex.make_key(self) index.PrIdIndex(self.git_gecko).move(old_key, new_key, self.process_name) @property def seq_id(self) -> int: return self.process_name.seq_id @property def last_pr_check(self) -> dict[str, str]: return self.data.get("last-pr-check", {}) @last_pr_check.setter # type: ignore @mut() def last_pr_check(self, value: dict[str, str]) -> None: if value is not None: self.data["last-pr-check"] = value else: del self.data["last-pr-check"] @property def error(self) -> dict[str, str | None] | None: return self.data.get("error") @error.setter # type: ignore @mut() def error(self, value: str | None) -> None: def encode(item: str | None) -> str | None: if item is None: return item if isinstance(item, str): return item if isinstance(item, str): return item.encode("utf8", "replace") return repr(item) if value is not None: if isinstance(value, (str, str)): message = value stack = None else: message = str(value) stack = traceback.format_exc() error = {"message": encode(message), "stack": encode(stack)} self.data["error"] = error self.set_bug_data("error") else: del self.data["error"] self.set_bug_data(None) def try_pushes(self, status: str | None = None) -> list[TryPush]: from . import trypush try_pushes = trypush.TryPush.load_by_obj( self.git_gecko, self.sync_type, int(self.process_name.obj_id) ) # I tried cast(Set[TryPush], try_pushes) here but it didn't work if status is not None: try_pushes_for_status: set[TryPush] = set() for item in try_pushes: assert isinstance(item, trypush.TryPush) if item.status == status: try_pushes_for_status.add(item) else: try_pushes_for_status = try_pushes # type: ignore return list(sorted(try_pushes_for_status, key=lambda x: x.process_name.seq_id)) def latest_busted_try_pushes(self) -> list[TryPush]: try_pushes = self.try_pushes(status="complete") busted = [] for push in reversed(try_pushes): if push.infra_fail: busted.append(push) else: break return busted @property def latest_try_push(self) -> TryPush | None: try_pushes = self.try_pushes() if try_pushes: try_pushes = sorted(try_pushes, key=lambda x: x.process_name.seq_id) return try_pushes[-1] return None def wpt_renames(self) -> dict[str, str]: renames = {} diff_blobs = self.wpt_commits.head.commit.diff( self.git_wpt.merge_base(self.data["wpt-base"], self.wpt_commits.head.sha1)[0] ) for item in diff_blobs: if item.rename_from: renames[item.rename_from] = item.rename_to return renames @classmethod @constructor( lambda args: ( args["cls"].sync_type, args["bug"] if args["cls"].obj_id == "bug" else str(args["pr"]), ) ) def new( cls, lock: SyncLock, git_gecko: Repo, git_wpt: Repo, gecko_base: str, gecko_head: str, wpt_base: str = "origin/master", wpt_head: str | None = None, bug: int | None = None, pr: int | None = None, status: str = "open", ): # type(...) -> SyncProcess # TODO: this object creation is extremely non-atomic :/ from . import index if cls.obj_id == "bug": assert bug is not None obj_id = str(bug) elif cls.obj_id == "pr": assert pr is not None obj_id = str(pr) else: raise ValueError("Invalid cls.obj_id: %s" % cls.obj_id) if wpt_head is None: wpt_head = wpt_base data = { "gecko-base": gecko_base, "wpt-base": wpt_base, "pr": pr, "bug": bug, "status": status, } # This is pretty ugly process_name = ProcessName.with_seq_id( git_gecko, cls.obj_type, cls.sync_type, obj_id ) if not cls.multiple_syncs and process_name.seq_id != 0: raise ValueError( "Tried to create new {} sync for {} {} but one already exists".format( cls.obj_id, cls.sync_type, obj_id ) ) commit_builder = CommitBuilder(git_gecko, f"Create sync {process_name}\n", env.config["sync"]["ref"]) refs = [BranchRefObject.create(lock, git_gecko, process_name, gecko_head, GeckoCommit, force=True), BranchRefObject.create(lock, git_wpt, process_name, wpt_head, WptCommit, force=True)] try: # This will commit all the data in a single commit when we exit the "with" block with commit_builder: SyncData.create(lock, git_gecko, process_name, data, commit_builder=commit_builder) sync_idx_key = (process_name, status) idxs: List[Tuple[Any, Type[Index]]] = [(sync_idx_key, index.SyncIndex)] if cls.obj_id == "bug": idxs.append(((process_name, status), index.BugIdIndex)) elif cls.obj_id == "pr": idxs.append((process_name, index.PrIdIndex)) for (key, idx_cls) in idxs: idx = idx_cls(git_gecko) idx.insert(idx.make_key(key), process_name).save(commit_builder) except Exception: for ref in refs: ref.delete() raise return cls(git_gecko, git_wpt, process_name) @mut() def finish(self, status: str = "complete") -> None: # TODO: cancel related try pushes &c. logger.info(f"Marking sync {self.process_name} as {status}") self.status = status # type: ignore self.error = None # type: ignore for worktree in [self.gecko_worktree, self.wpt_worktree]: worktree.delete() for repo in [self.git_gecko, self.git_wpt]: repo.git.worktree("prune") @mut() def gecko_rebase(self, new_base_ref: str, abort_on_fail: bool = False) -> None: new_base = GeckoCommit(self.git_gecko, new_base_ref) git_worktree = self.gecko_worktree.get() set_new_base = True try: git_worktree.git.rebase(new_base.sha1) except git.GitCommandError as e: if abort_on_fail: set_new_base = False try: git_worktree.git.rebase(abort=True) except git.GitCommandError: pass raise AbortError("Rebasing onto latest gecko failed:\n%s" % e) finally: if set_new_base: self.data["gecko-base"] = new_base_ref self.gecko_commits.base = new_base.sha1 # type: ignore @mut() def wpt_rebase(self, ref: str) -> None: assert ref in self.git_wpt.references git_worktree = self.wpt_worktree.get() git_worktree.git.rebase(ref) self.set_wpt_base(ref) @mut() def set_bug_data(self, status: str | None = None) -> None: if self.bug: whiteboard = env.bz.get_whiteboard(self.bug) if not whiteboard: return current_subtype, current_status = bug.get_sync_data(whiteboard) if current_subtype != self.sync_type or current_status != status: new_whiteboard = bug.set_sync_data(whiteboard, self.sync_type, status) env.bz.set_whiteboard(self.bug, new_whiteboard) @mut() def delete(self) -> None: from . import index for worktree in [self.gecko_worktree, self.wpt_worktree]: worktree.delete() assert self._lock is not None for try_push in self.try_pushes(): with try_push.as_mut(self._lock): try_push.delete() for git_repo, commit_cls in [ (self.git_wpt, WptCommit), (self.git_gecko, GeckoCommit), ]: BranchRefObject(git_repo, self.process_name, commit_cls=commit_cls).delete() for idx_cls in [index.SyncIndex, index.PrIdIndex, index.BugIdIndex]: key = idx_cls.make_key(self) idx = idx_cls(self.git_gecko) idx.delete(key, self.process_name).save() self.data.delete()