# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""Functionality to support VCS syncing for WPT."""


from __future__ import annotations
import os
import re
import subprocess
import traceback
from collections import defaultdict
from datetime import datetime


import enum
import git
import newrelic

from . import bugcomponents
from . import gitutils
from . import log
from . import notify
from . import trypush
from . import commit as sync_commit
from .base import FrozenDict, entry_point
from .commit import GeckoCommit, WptCommit
from .env import Environment
from .errors import AbortError
from .gitutils import update_repositories
from .lock import SyncLock, mut, constructor
from .projectutil import Mach, WPT
from .sync import LandableStatus, SyncProcess
from .trypush import TryPush

from git.objects.tree import Tree
from git.repo.base import Repo
from typing import Any, List, Mapping, MutableMapping, cast, TYPE_CHECKING

logger = log.get_logger(__name__)
env = Environment()


@enum.unique
class DownstreamAction(enum.Enum):
    ready = 0
    manual_fix = 1
    try_push = 2
    try_push_stability = 3
    wait_try = 4
    wait_upstream = 5
    try_rebase = 6

    def reason_str(self):
        # type () -> Text
        return {DownstreamAction.ready: "",
                DownstreamAction.manual_fix: "",
                DownstreamAction.try_push: "valid try push required",
                DownstreamAction.try_push_stability: "stability try push required",
                DownstreamAction.wait_try: "waiting for try to complete",
                DownstreamAction.wait_upstream: "waiting for PR to be merged",
                DownstreamAction.try_rebase: "Need to rebase try push"}.get(self, "")


class DownstreamSync(SyncProcess):
    sync_type = "downstream"
    obj_id = "pr"
    statuses = ("open", "complete")
    status_transitions = [("open", "complete"),
                          ("complete", "open")]  # Unfortunately, if a backout occurs

    @classmethod
    @constructor(lambda args: ("downstream", str(args["pr_id"])))
    def new(cls,
            lock: SyncLock,
            git_gecko: Repo,
            git_wpt: Repo,
            wpt_base: str,
            pr_id: int,
            pr_title: str,
            pr_body: str,
            ) -> DownstreamSync:
        # TODO: add PR link to the comment
        sync = super().new(lock,
                           git_gecko,
                           git_wpt,
                           pr=pr_id,
                           gecko_base=DownstreamSync.gecko_landing_branch(),
                           gecko_head=DownstreamSync.gecko_landing_branch(),
                           wpt_base=wpt_base,
                           wpt_head="origin/pr/%s" % pr_id)

        with sync.as_mut(lock):
            sync.create_bug(git_wpt, pr_id, pr_title, pr_body)
        return sync

    def make_bug_comment(self, git_wpt: Repo, pr_id: int, pr_title: str,
                         pr_body: str | None) -> str:
        pr_msg = env.gh_wpt.cleanup_pr_body(pr_body)
        # TODO: Ensure we have the right set of commits before geting here
        author = self.wpt_commits[0].author if self.wpt_commits else b""

        msg = ["Sync web-platform-tests PR %s into mozilla-central"
               " (this bug is closed when the sync is complete)." % pr_id,
               "",
               "PR: %s" % env.gh_wpt.pr_url(pr_id),
               ""
               "Details from upstream follow.",
               "",
               "%s wrote:" % author.decode("utf8", "ignore"),
               ">  %s" % pr_title]
        if pr_msg:
            msg.append(">  ")
            msg.extend(">  %s" % line for line in pr_msg.split("\n"))
        return "\n".join(msg)

    @classmethod
    def has_metadata(cls, message: bytes) -> bool:
        required_keys = ["wpt-commits",
                         "wpt-pr"]
        metadata = sync_commit.get_metadata(message)
        return all(item in metadata for item in required_keys)

    @property
    def landable_status(self) -> LandableStatus:
        if self.skip:
            return LandableStatus.skip
        if self.metadata_ready:
            return LandableStatus.ready
        if self.error:
            return LandableStatus.error

        return LandableStatus.missing_try_results

    @property
    def pr_head(self) -> WptCommit:
        """Head commit of the PR. Typically this is equal to
        self.wpt_commits.head but if the PR is rebased onto master
        then the head of the PR won't match the commit that actually
        merged unless it happens to be a fast-forward"""
        return sync_commit.WptCommit(self.git_wpt, "origin/pr/%s" % self.pr)

    @SyncProcess.error.setter  # type: ignore
    @mut()
    def error(self, value: str | None) -> Any | None:
        if self.pr:
            if value is not None:
                env.gh_wpt.add_labels(self.pr, "mozilla:gecko-blocked")
            elif self.error is not None:
                env.gh_wpt.remove_labels(self.pr, "mozilla:gecko-blocked")
        return SyncProcess.error.fset(self, value)  # type: ignore

    @property
    def pr_status(self):
        return self.data.get("pr-status", "open")

    @pr_status.setter  # type: ignore
    @mut()
    def pr_status(self, value):
        self.data["pr-status"] = value

    @property
    def notify_bugs(self) -> FrozenDict:
        return FrozenDict(**self.data.get("notify-bugs", {}))

    @notify_bugs.setter  # type: ignore
    @mut()
    def notify_bugs(self, value: FrozenDict) -> None:
        self.data["notify-bugs"] = value.as_dict()

    @property
    def next_action(self) -> DownstreamAction:
        """Work out the next action for the sync based on the current status.

        Returns a DownstreamAction indicating the next step to take."""

        if self.data.get("force-metadata-ready"):
            # This is mostly for testing
            return DownstreamAction.ready
        if self.skip:
            return DownstreamAction.ready
        if self.error:
            if self.tried_to_rebase is False:
                return DownstreamAction.try_rebase
            return DownstreamAction.manual_fix

        latest_try_push = self.latest_valid_try_push
        if (latest_try_push and not latest_try_push.taskgroup_id):
            if latest_try_push.status == "open":
                return DownstreamAction.wait_try
            elif latest_try_push.infra_fail:
                if self.tried_to_rebase is False:
                    return DownstreamAction.try_rebase
                return DownstreamAction.manual_fix

        assert self.pr is not None
        pr = env.gh_wpt.get_pull(self.pr)
        if pr.merged:  # Wait till PR is merged to do anything
            if not latest_try_push:
                if self.requires_stability_try:
                    logger.debug("Sync for PR %s requires a stability try push" % self.pr)
                    return DownstreamAction.try_push_stability
                elif self.requires_try:
                    return DownstreamAction.try_push
                else:
                    return DownstreamAction.ready

            if latest_try_push.status != "complete":
                return DownstreamAction.wait_try

            if self.requires_stability_try and not latest_try_push.stability:
                return DownstreamAction.try_push_stability

            # If we have infra failure, flag for human intervention. Retrying stability
            # runs would be very costly
            if latest_try_push.infra_fail:
                tasks = latest_try_push.tasks()
                if tasks is None:
                    return DownstreamAction.manual_fix

                # Check if we had any successful tests
                if tasks.has_completed_tests():
                    return DownstreamAction.ready
                else:
                    return DownstreamAction.manual_fix

            return DownstreamAction.ready
        else:
            return DownstreamAction.wait_upstream

    @property
    def metadata_ready(self) -> bool:
        return self.next_action == DownstreamAction.ready

    @property
    def results_notified(self) -> bool:
        return self.data.get("results-notified", False)

    @results_notified.setter  # type: ignore
    @mut()
    def results_notified(self, value):
        self.data["results-notified"] = value

    @property
    def skip(self) -> bool:
        return self.data.get("skip", False)

    @skip.setter  # type: ignore
    @mut()
    def skip(self, value: bool) -> None:
        self.data["skip"] = value

    @property
    def tried_to_rebase(self) -> bool:
        return self.data.get("tried_to_rebase", False)

    @tried_to_rebase.setter  # type: ignore
    @mut()
    def tried_to_rebase(self, value: bool) -> None:
        self.data["tried_to_rebase"] = value

    @mut()
    def try_rebase(self) -> None:
        logger.info("Rebasing onto %s" % self.gecko_landing_branch())
        initial_tried_to_rebase = self.tried_to_rebase
        self.tried_to_rebase = True

        commit_hash_before_rebase = self.gecko_commits.base.sha1
        self.gecko_rebase(self.gecko_landing_branch(), abort_on_fail=True)
        commit_hash_after_rebase = self.gecko_commits.base.sha1

        if commit_hash_before_rebase == commit_hash_after_rebase:
            self.tried_to_rebase = initial_tried_to_rebase

    @property
    def wpt(self) -> WPT:
        git_work = self.wpt_worktree.get()
        git_work.git.reset(hard=True)
        return WPT(os.path.join(git_work.working_dir))

    @property
    def requires_try(self) -> bool:
        return not self.skip and len(self.gecko_commits) > 0

    @property
    def requires_stability_try(self) -> bool:
        return self.requires_try and self.has_affected_tests_readonly

    @property
    def latest_valid_try_push(self) -> TryPush | None:
        """Try push for the current head of the PR, if any.

        In legacy cases we don't store the wpt-head for the try push
        so we always assume that any try push is valid"""

        latest_try_push = self.latest_try_push
        if latest_try_push is None:
            return None

        if len(self.gecko_commits) == 0:
            # Something most likely is not correct, but we can't fix it here.
            return latest_try_push
        if self.metadata_commit is not None:
            if len(self.gecko_commits) == 1:
                # Apparently we only have a metadata commit and the actual change got rebased away
                # In this case the metadata commit is probably wrong, but we can't fix that here
                return latest_try_push
            gecko_head = self.gecko_commits[-2]
        else:
            gecko_head = self.gecko_commits[-1]

        # Check if the try push is for the current PR head
        if (latest_try_push.wpt_head and
            latest_try_push.wpt_head not in (self.pr_head.sha1, self.wpt_commits.head.sha1)):
            logger.info("Got more commits since latest try push")
            return None
        if latest_try_push.gecko_head not in {gecko_head.sha1,
                                              self.metadata_commit.sha1
                                              if self.metadata_commit else None}:
            logger.info("Gecko commits changed since latest try push")
            return None

        return latest_try_push

    @mut()
    def try_paths(self) -> Mapping[str, list[str]]:
        """Return a mapping of {test_type: path} for tests that should be run on try.

        Paths are relative to the gecko root"""
        affected_tests = self.affected_tests()
        base_path = env.config["gecko"]["path"]["wpt"]
        # Filter out paths that aren't in the head.
        # This can happen if the files were moved in a previous PR that we haven't yet
        # merged
        affected_paths = {}

        head_tree = self.gecko_commits.head.commit.tree

        def contains(tree: Tree, path: str) -> bool:
            path_parts = path.split(os.path.sep)
            for part in path_parts:
                try:
                    tree = tree[part]
                except KeyError:
                    return False
            return True

        for test_type, wpt_paths in affected_tests.items():
            paths = []
            for path in wpt_paths:
                gecko_path = os.path.join(base_path, path)
                if contains(head_tree, gecko_path):
                    paths.append(gecko_path)
            if paths:
                affected_paths[test_type] = paths

        if not affected_paths:
            # Default to just running infra tests
            infra_path = os.path.join(base_path, "infrastructure/")
            affected_paths = {
                "testharness": [infra_path],
                "reftest": [infra_path],
                "wdspec": [infra_path],
            }
        return affected_paths

    @mut()
    def next_try_push(self, try_cls: type = trypush.TryFuzzyCommit) -> TryPush | None:
        """Schedule a new try push for the sync, if required.

        A stability try push will only be scheduled if the upstream PR is
        approved, which we check directly from GitHub. Therefore returning
        None is not an indication that the sync is ready to land, just that
        there's no further action at this time.
        """
        if self.skip or self.status != "open":
            return None

        self.update_commits()
        # Ensure affected tests is up to date
        self.affected_tests()

        action = self.next_action
        if action == DownstreamAction.try_rebase:
            self.try_rebase()
            action = self.next_action
        if action == DownstreamAction.try_push:
            return TryPush.create(self._lock,
                                  self,
                                  affected_tests=self.try_paths(),
                                  stability=False,
                                  hacks=False,
                                  try_cls=try_cls)
        elif action == DownstreamAction.try_push_stability:
            return TryPush.create(self._lock,
                                  self,
                                  affected_tests=self.try_paths(),
                                  stability=True,
                                  hacks=False,
                                  try_cls=try_cls)
        return None

    @mut()
    def create_bug(self, git_wpt: Repo, pr_id: int, pr_title: str, pr_body: str | None) -> None:
        if self.bug is not None:
            return
        comment = self.make_bug_comment(git_wpt, pr_id, pr_title, pr_body)
        summary = f"[wpt-sync] Sync PR {pr_id} - {pr_title}"
        if len(summary) > 255:
            summary = summary[:254] + "\u2026"
        bug = env.bz.new(summary=summary,
                         comment=comment,
                         product="Testing",
                         component="web-platform-tests",
                         whiteboard="[wptsync downstream]",
                         priority="P4",
                         url=env.gh_wpt.pr_url(pr_id))
        self.bug = bug  # type: ignore

    @mut()
    def update_wpt_commits(self) -> None:
        """Update the set of commits in the PR from the latest upstream."""
        if not self.wpt_commits.head or self.wpt_commits.head.sha1 != self.pr_head.sha1:
            self.wpt_commits.head = self.pr_head  # type: ignore

        if (len(self.wpt_commits) == 0 and
            self.git_wpt.is_ancestor(self.wpt_commits.head.commit,
                                     self.git_wpt.rev_parse("origin/master"))):
            # The commits landed on master so we need to change the commit
            # range to not use origin/master as a base
            base_commit = None
            assert isinstance(self.wpt_commits.head, sync_commit.WptCommit)
            assert self.wpt_commits.head.pr() == self.pr
            for commit in self.git_wpt.iter_commits(self.wpt_commits.head.sha1):
                wpt_commit = sync_commit.WptCommit(self.git_wpt, commit)
                if wpt_commit.pr() != self.pr:
                    base_commit = wpt_commit
                    break
            assert base_commit is not None
            self.data["wpt-base"] = base_commit.sha1
            self.wpt_commits.base = base_commit  # type: ignore

    @mut()
    def update_github_check(self) -> None:
        if not env.config["web-platform-tests"]["github"]["checks"]["enabled"]:
            return
        title = "gecko/sync"
        head_sha = self.wpt_commits.head.sha1

        # TODO: maybe just get this from GitHub rather than store it
        existing = self.data.get("check")
        check_id = None
        if existing is not None:
            if existing.get("sha1") == head_sha:
                check_id = existing.get("id")

        assert self.bug is not None
        url = env.bz.bugzilla_url(self.bug)
        external_id = str(self.bug)

        # For now hardcode the status at completed
        status = "completed"
        conclusion = "neutral"

        completed_at = datetime.now()

        output = {"title": "Gecko sync for PR %s" % self.pr,
                  "summary": "Gecko sync status: %s" % self.landable_status.reason_str(),
                  "test": self.build_check_text(head_sha)}

        try:
            logger.info("Generating GH check status")
            resp = env.gh_wpt.set_check(title, check_id=check_id, commit_sha=head_sha, url=url,
                                        external_id=external_id, status=status, started_at=None,
                                        conclusion=conclusion, completed_at=completed_at,
                                        output=output)
            self.data["check"] = {"id": resp["id"], "sha1": head_sha}
        except AssertionError:
            raise
        except Exception:
            # Just log errors trying to update the check status, but otherwise don't fail
            newrelic.agent.record_exception()
            import traceback
            logger.error("Creating PR status check failed")
            logger.error(traceback.format_exc())

    def build_check_text(self, commit_sha: str) -> str:
        text = """

        # Summary

        [Bugzilla](%(bug_link)s)

        # Try pushes
        %(try_push_section)s

        %(error_section)s
        """
        try_pushes = [try_push for try_push in
                      sorted(self.try_pushes(), key=lambda x: -x.process_name.seq_id)
                      if try_push.wpt_head == commit_sha]
        if not try_pushes:
            try_push_section = "No current try pushes"
        else:
            items = []
            for try_push in try_pushes:
                link_str = "Try push" + (" (stability)" if try_push.stability else "")
                items.append(" * [{}]({}): {}{}".format(link_str,
                                                        try_push.treeherder_url,
                                                        try_push.status,
                                                        " infra-fail" if try_push.infra_fail
                                                        else ""))
                try_push_section = "\n".join(items)

        error_section = "# Errors:\n ```%s```" % self.error if self.error else ""
        assert self.bug is not None
        return text % {"bug_link": env.bz.bugzilla_url(self.bug),
                       "try_push_section": try_push_section,
                       "error_section": error_section}

    def files_changed(self) -> set[str]:
        # TODO: Would be nice to do this from mach with a gecko worktree
        return set(self.wpt.files_changed().decode("utf8", "replace").split("\n"))

    @property
    def metadata_commit(self) -> GeckoCommit | None:
        if len(self.gecko_commits) == 0:
            return None
        if self.gecko_commits[-1].metadata.get("wpt-type") == "metadata":
            commit = self.gecko_commits[-1]
            assert isinstance(commit, GeckoCommit)
            return commit
        return None

    @mut()
    def ensure_metadata_commit(self) -> GeckoCommit:
        if self.metadata_commit:
            return self.metadata_commit

        git_work = self.gecko_worktree.get()

        if "metadata-commit" in self.data:
            gitutils.cherry_pick(git_work, self.data["metadata-commit"])
            # We only care about the cached value inside this function, so
            # remove it now so we don't have to maintain it
            del self.data["metadata-commit"]
        else:
            assert all(item.metadata.get("wpt-type") != "metadata" for item in self.gecko_commits)
            metadata = {
                "wpt-pr": str(self.pr),
                "wpt-type": "metadata"
            }
            msg = sync_commit.Commit.make_commit_msg(
                b"Bug %s [wpt PR %s] - Update wpt metadata, a=testonly" %
                (str(self.bug).encode("utf8") if self.bug is not None else b"None",
                 str(self.pr).encode("utf8") if self.pr is not None else b"None"),
                metadata)
            sync_commit.create_commit(git_work, msg, allow_empty=True)
        commit = git_work.commit("HEAD")
        return sync_commit.GeckoCommit(self.git_gecko, commit.hexsha)

    @mut()
    def set_bug_component(self, files_changed: set[str]) -> None:
        new_component = bugcomponents.get(self.gecko_worktree.get(),
                                          files_changed,
                                          default=("Testing", "web-platform-tests"))
        env.bz.set_component(self.bug, *new_component)

    @mut()
    def move_metadata(self, renames: dict[str, str]) -> None:
        if not renames:
            return

        self.ensure_metadata_commit()

        gecko_work = self.gecko_worktree.get()
        metadata_base = env.config["gecko"]["path"]["meta"]
        for old_path, new_path in renames.items():
            old_meta_path = os.path.join(metadata_base,
                                         old_path + ".ini")
            if os.path.exists(os.path.join(gecko_work.working_dir,
                                           old_meta_path)):
                new_meta_path = os.path.join(metadata_base,
                                             new_path + ".ini")
                dir_name = os.path.join(gecko_work.working_dir,
                                        os.path.dirname(new_meta_path))
                if not os.path.exists(dir_name):
                    os.makedirs(dir_name)
                gecko_work.index.move((old_meta_path, new_meta_path))

        self._commit_metadata()

    @mut()
    def update_bug_components(self, renames: dict[str, str]) -> None:
        if not renames:
            return

        self.ensure_metadata_commit()

        gecko_work = self.gecko_worktree.get()
        bugcomponents.update(gecko_work, renames)

        self._commit_metadata()

    @mut()
    def _commit_metadata(self, amend: bool = True) -> None:
        assert self.metadata_commit
        gecko_work = self.gecko_worktree.get()
        if gecko_work.is_dirty():
            logger.info("Updating metadata commit")
            try:
                gecko_work.git.commit(amend=True, no_edit=True)
            except git.GitCommandError as e:
                if amend and e.status == 1 and "--allow-empty" in e.stdout:
                    logger.warning("Amending commit made it empty, resetting")
                    gecko_work.git.reset("HEAD^")

    @mut()
    def update_commits(self) -> bool:
        exception = None
        try:
            self.update_wpt_commits()

            # Check if this sync reverts some unlanded earlier PR and if so mark both
            # as skip and don't try to apply the commits here
            reverts = self.reverts_syncs()
            if reverts:
                all_open = all(item.status == "open" for item in reverts)
                for revert_sync in reverts:
                    if revert_sync.status == "open":
                        logger.info("Skipping sync for PR %s because it is later reverted" %
                                    revert_sync.pr)
                        with SyncLock.for_process(revert_sync.process_name) as revert_lock:
                            assert isinstance(revert_lock, SyncLock)
                            with revert_sync.as_mut(revert_lock):
                                revert_sync.skip = True  # type: ignore
                # TODO: If this commit reverts some closed syncs, then set the metadata
                # commit of this commit to the revert of the metadata commit from that
                # sync
                if all_open:
                    logger.info("Sync was a revert of other open syncs, skipping")
                    self.skip = True  # type: ignore
                    return False

            old_gecko_head = self.gecko_commits.head.sha1
            logger.debug(f"PR {self.pr} gecko HEAD was {old_gecko_head}")

            def plain_apply() -> bool:
                logger.info("Applying on top of the current commits")
                self.wpt_to_gecko_commits()
                return True

            def rebase_apply() -> bool:
                logger.info("Applying with a rebase onto latest integration branch")
                new_base = self.gecko_integration_branch()
                gecko_work = self.gecko_worktree.get()
                reset_head = "HEAD"
                if (len(self.gecko_commits) > 0 and
                    self.gecko_commits[0].metadata.get("wpt-type") == "dependent"):
                    # If we have any dependent commits first reset to the new
                    # head. This prevents conflicts if the dependents already
                    # landed
                    # TODO: Actually check if they landed?
                    reset_head = new_base
                gecko_work.git.reset(reset_head, hard=True)
                self.gecko_rebase(new_base, abort_on_fail=True)
                self.wpt_to_gecko_commits()
                return True

            def dependents_apply() -> bool:
                logger.info("Applying with upstream dependents")
                dependencies = self.unlanded_commits_same_files()
                if dependencies:
                    logger.info("Found dependencies:\n%s" %
                                "\n".join(item.msg.splitlines()[0].decode("utf8", "replace")
                                          for item in dependencies))
                    self.wpt_to_gecko_commits(dependencies)
                    assert self.bug is not None
                    env.bz.comment(self.bug,
                                   "PR %s applied with additional changes from upstream: %s"
                                   % (self.pr, ", ".join(item.sha1 for item in dependencies)))
                    return True
                return False

            error = None
            for fn in [plain_apply, rebase_apply, dependents_apply]:
                try:
                    if fn():
                        error = None
                        break
                    else:
                        logger.error("Applying with %s was a no-op" % fn.__name__)
                except Exception as e:
                    import traceback
                    error = e
                    logger.error("Applying with %s errored" % fn.__name__)
                    logger.error(traceback.format_exc())

            if error is not None:
                raise error

            logger.debug(f"PR {self.pr} gecko HEAD now {self.gecko_commits.head.sha1}")
            if old_gecko_head == self.gecko_commits.head.sha1:
                logger.info("Gecko commits did not change for PR %s" % self.pr)
                return False

            # If we have a metadata commit already, ensure it's applied now
            if "metadata-commit" in self.data:
                self.ensure_metadata_commit()

            renames = self.wpt_renames()
            self.move_metadata(renames)
            self.update_bug_components(renames)

            files_changed = self.files_changed()
            self.set_bug_component(files_changed)
        except Exception as e:
            exception = e
            raise
        finally:
            # If we managed to apply all the commits without error, reset the error flag
            # otherwise update it with the current exception
            self.error = exception
        return True

    @mut()
    def wpt_to_gecko_commits(self, dependencies: list[WptCommit] | None = None) -> None:
        """Create a patch based on wpt branch, apply it to corresponding gecko branch.

        If there is a commit with wpt-type metadata, this function will remove it. The
        sha1 will be stashed in self.data["metadata-commit"] so it can be restored next time
        we call ensure_metadata_commit()
        """
        # The logic here is that we can retain any dependent commits as long as we have
        # at least the set in the dependencies array, followed by the gecko commits created
        # from the wpt_commits, interspersed with any number of manifest commits,
        # followed by zero or one metadata commits

        if dependencies:
            expected_commits: list[tuple[str, WptCommit | None, bool]] = [
                (item.sha1, item, True)
                for item in dependencies]
        else:
            # If no dependencies are supplied, retain the ones that we alredy have, if any
            expected_commits = []
            for commit in self.gecko_commits:
                assert isinstance(commit, sync_commit.GeckoCommit)
                if commit.metadata.get("wpt-type") == "dependency":
                    expected_commits.append((commit.metadata["wpt-commit"], None, True))
                else:
                    break

        # Expect all the new commits
        for commit in self.wpt_commits:
            assert isinstance(commit, WptCommit)
            if not commit.is_merge:
                expected_commits.append((commit.sha1, commit, False))

        existing = [
            commit for commit in self.gecko_commits
            if commit.metadata.get("wpt-commit") and
            commit.metadata.get("wpt-type") in ("dependency", None)]
        if TYPE_CHECKING:
            existing_commits = cast(List[GeckoCommit], existing)
        else:
            existing_commits = existing

        retain_commits = 0
        for gecko_commit, (wpt_sha1, _, _) in zip(existing_commits, expected_commits):
            if gecko_commit.metadata.get("wpt-commit") != wpt_sha1:
                break
            retain_commits += 1

        keep_commits = existing_commits[:retain_commits]
        maybe_add_commits = expected_commits[retain_commits:]

        # Strip out any leading commits that come from currently applied dependencies that are
        # not being retained
        strip_count = 0
        for _, wpt_commit, _ in maybe_add_commits:
            if wpt_commit is not None:
                break
            strip_count += 1
        add_commits = maybe_add_commits[strip_count:]

        if len(keep_commits) == len(existing_commits) and not add_commits:
            logger.info("Commits did not change")
            return

        logger.info("Keeping %i existing commits; adding %i new commits" % (len(keep_commits),
                                                                            len(add_commits)))

        if self.metadata_commit:
            # If we have a metadata commit, store it in self.data["metadata-commit"]
            # remove it when updating commits, and reapply it when we next call
            # ensure_metadata_commit
            self.data["metadata-commit"] = self.metadata_commit.sha1

        reset_head = None
        if not keep_commits:
            reset_head = self.data["gecko-base"]
        elif len(keep_commits) < len(existing_commits):
            reset_head = keep_commits[-1]
        elif ("metadata-commit" in self.data and
              self.gecko_commits[-1].metadata.get("wpt-type") == "metadata"):
            reset_head = self.gecko_commits[-2]

        # Clear the set of affected tests since there are updates
        del self.data["affected-tests"]

        gecko_work = self.gecko_worktree.get()

        if reset_head:
            self.gecko_commits.head = reset_head  # type: ignore
        gecko_work.git.reset(hard=True)

        for _, wpt_commit, is_dependency in add_commits:
            assert wpt_commit is not None
            logger.info("Moving commit %s" % wpt_commit.sha1)
            if is_dependency:
                metadata = {
                    "wpt-type": "dependency",
                    "wpt-commit": wpt_commit.sha1
                }
                msg_filter = None
            else:
                metadata = {
                    "wpt-pr": str(self.pr),
                    "wpt-commit": wpt_commit.sha1
                }
                msg_filter = self.message_filter

            wpt_commit.move(gecko_work,
                            dest_prefix=env.config["gecko"]["path"]["wpt"],
                            msg_filter=msg_filter,
                            metadata=metadata,
                            patch_fallback=True)

    def unlanded_commits_same_files(self) -> list[WptCommit]:
        from . import landing

        sync_point = landing.load_sync_point(self.git_gecko, self.git_wpt)
        base = sync_point["upstream"]
        head = "origin/master"
        changed = self.wpt_commits.files_changed
        commits = []
        for commit in self.git_wpt.iter_commits(f"{base}..{head}",
                                                reverse=True,
                                                paths=list(changed)):
            wpt_commit = sync_commit.WptCommit(self.git_wpt, commit)
            # Check for same-pr rather than same-commit because we always
            # use the commits on the PR branch, not the merged commits.
            # The other option is to use the GH API to decide if the PR
            # merged and if so what the merge commit was, although in that
            # case we would still not know the commit prior to merge, which
            # is what we need
            if wpt_commit.pr() == self.pr:
                break
            commits.append(wpt_commit)
        return commits

    def message_filter(self, msg: bytes) -> tuple[bytes, dict[str, str]]:
        msg = sync_commit.try_filter(msg)
        parts = msg.split(b"\n", 1)
        if len(parts) > 1:
            summary, body = parts
        else:
            summary = parts[0]
            body = b""

        new_msg = b"Bug %s [wpt PR %s] - %s, a=testonly\n\nSKIP_BMO_CHECK\n%s" % (
            str(self.bug).encode("utf8"),
            str(self.pr).encode("utf8"),
            summary,
            body)
        return new_msg, {}

    @mut()
    def affected_tests(self, revish: Any | None = None) -> Mapping[str, list[str]]:
        # TODO? support files, harness changes -- don't want to update metadata
        if "affected-tests" not in self.data:
            tests_by_type: MutableMapping[str, list[str]] = defaultdict(list)
            logger.info("Updating MANIFEST.json")
            self.wpt.manifest()
            args = ["--show-type", "--new"]
            if revish:
                args.append(revish)
            logger.info("Getting a list of tests affected by changes.")
            try:
                output = self.wpt.tests_affected(*args)
            except subprocess.CalledProcessError:
                logger.error("Calling wpt tests-affected failed")
                return tests_by_type
            if output:
                for item_bytes in output.strip().split(b"\n"):
                    item = item_bytes.decode("utf8", "replace")
                    path, test_type = item.strip().split("\t")
                    tests_by_type[test_type].append(path)
            self.data["affected-tests"] = tests_by_type
        return self.data["affected-tests"]

    @property
    def affected_tests_readonly(self) -> Mapping[str, list[str]]:
        if "affected-tests" not in self.data:
            logger.warning("Trying to get affected tests before it's set")
            return {}
        return self.data["affected-tests"]

    @property
    def has_affected_tests_readonly(self) -> bool:
        if "affected-tests" not in self.data:
            logger.warning("Trying to get affected tests before it's set")
            return True
        return bool(self.data["affected-tests"])

    @mut()
    def update_metadata(self, log_files, stability=False):
        meta_path = env.config["gecko"]["path"]["meta"]
        gecko_work = self.gecko_worktree.get()

        mach = Mach(gecko_work.working_dir)
        args = []

        if stability:
            help_text = mach.wpt_update("--help").decode("utf8")
            if "--stability " in help_text:
                args.extend(["--stability", "wpt-sync Bug %s" % self.bug])
            else:
                args.append("--update-intermittent")
        args.extend(log_files)

        logger.debug("Updating metadata")
        output = mach.wpt_update(*args)
        prefix = b"disabled:"
        disabled = []
        for line in output.split(b"\n"):
            if line.startswith(prefix):
                disabled.append(line[len(prefix):].decode("utf8", "replace").strip())

        if gecko_work.is_dirty(untracked_files=True, path=meta_path):
            self.ensure_metadata_commit()
            gecko_work.git.add(meta_path, all=True)
            self._commit_metadata()

        return disabled

    @mut()
    def try_notify(self, force: bool = False) -> None:
        newrelic.agent.record_custom_event("try_notify", params={
            "sync_bug": self.bug,
            "sync_pr": self.pr
        })

        if self.results_notified and not force:
            return

        if not self.bug:
            logger.error("Sync for PR %s has no associated bug" % self.pr)
            return

        if not self.affected_tests():
            logger.debug("PR %s doesn't have affected tests so skipping results notification" %
                         self.pr)
            newrelic.agent.record_custom_event("try_notify_no_affected", params={
                "sync_bug": self.bug,
                "sync_pr": self.pr
            })
            return

        logger.info("Trying to generate results notification for PR %s" % self.pr)

        results = notify.results.for_sync(self)

        if not results:
            # TODO handle errors here better, perhaps
            logger.error("Failed to get results notification for PR %s" % self.pr)
            newrelic.agent.record_custom_event("try_notify_failed", params={
                "sync_bug": self.bug,
                "sync_pr": self.pr
            })
            return

        message, truncated = notify.msg.for_results(results)

        with env.bz.bug_ctx(self.bug) as bug:
            if truncated:
                bug.add_attachment(data=message.encode("utf8"),
                                   file_name="wpt-results.md",
                                   summary="Notable wpt changes",
                                   is_markdown=True,
                                   comment=truncated)
            else:
                env.bz.comment(self.bug, message, is_markdown=True)

        bugs = notify.bugs.for_sync(self, results)
        notify.bugs.update_metadata(self, bugs)

        self.results_notified = True  # type: ignore

        with SyncLock.for_process(self.process_name) as lock:
            assert isinstance(lock, SyncLock)
            for try_push in self.try_pushes():
                with try_push.as_mut(lock):
                    try_push.cleanup_logs()

    def reverts_syncs(self) -> set[DownstreamSync]:
        """Return a set containing the previous syncs reverted by this one, if any"""
        revert_re = re.compile(b"This reverts commit ([0-9A-Fa-f]+)")
        unreverted_commits = defaultdict(set)
        for commit in self.wpt_commits:
            if not commit.msg.startswith(b"Revert "):
                # If not everything is a revert then return
                return set()
            revert_shas = revert_re.findall(commit.msg)
            if len(revert_shas) == 0:
                return set()
            # Just use the first match for now
            sha = revert_shas[0].decode("ascii")
            try:
                # Reassign the hash here, in case a short hash was used for reverting.
                sha = str(self.git_wpt.rev_parse(sha))
            except (ValueError, git.BadName):
                # Commit isn't in this repo (could be upstream)
                return set()
            pr = env.gh_wpt.pr_for_commit(sha)
            if pr is None:
                return set()
            sync = DownstreamSync.for_pr(self.git_gecko, self.git_wpt, pr)
            if sync is None:
                return set()
            assert isinstance(sync, DownstreamSync)
            if sync not in unreverted_commits:
                # Ensure we have the latest commits for the reverted sync
                with SyncLock.for_process(sync.process_name) as revert_lock:
                    assert isinstance(revert_lock, SyncLock)
                    with sync.as_mut(revert_lock):
                        sync.update_wpt_commits()
                unreverted_commits[sync] = {item.sha1 for item in sync.wpt_commits}
            if sha in unreverted_commits[sync]:
                unreverted_commits[sync].remove(sha)
            # If the commit is not part of the sync, check if the PR was squashed and then reverted,
            # in that case all commits of the sync should be reverted.
            elif sha == env.gh_wpt.merge_sha(pr):
                unreverted_commits[sync] = set()

        rv = {sync for sync, unreverted in unreverted_commits.items()
              if not unreverted}
        return rv


@entry_point("downstream")
def new_wpt_pr(git_gecko: Repo, git_wpt: Repo, pr_data: Mapping[str, Any],
               raise_on_error: bool = True, repo_update: bool = True) -> None:
    """ Start a new downstream sync """
    if pr_data["user"]["login"] == env.config["web-platform-tests"]["github"]["user"]:
        raise ValueError("Tried to create a downstream sync for a PR created "
                         "by the wpt bot")
    if repo_update:
        update_repositories(git_gecko, git_wpt)
    pr_id = pr_data["number"]
    if DownstreamSync.for_pr(git_gecko, git_wpt, pr_id):
        return
    wpt_base = "origin/%s" % pr_data["base"]["ref"]

    with SyncLock("downstream", str(pr_id)) as lock:
        sync = DownstreamSync.new(lock,
                                  git_gecko,
                                  git_wpt,
                                  wpt_base,
                                  pr_id,
                                  pr_data["title"],
                                  pr_data["body"] or "")
        with sync.as_mut(lock):
            try:
                sync.update_commits()
                sync.update_github_check()
            except Exception as e:
                sync.error = e
                if raise_on_error:
                    raise
                traceback.print_exc()
                logger.error(e)
            # Now wait for the status to change before we take any actions


@entry_point("downstream")
@mut("try_push", "sync")
def try_push_complete(git_gecko, git_wpt, try_push, sync):
    if not try_push.taskgroup_id:
        logger.error("No taskgroup id set for try push")
        return

    if not try_push.status == "complete":
        # Ensure we don't have some old set of tasks

        tasks = try_push.tasks()
        if not tasks.complete(allow_unscheduled=True):
            logger.info("Try push %s is not complete" % try_push.treeherder_url)
            return
        logger.info("Try push %s is complete" % try_push.treeherder_url)
        try:
            if not tasks.validate():
                try_push.infra_fail = True
                if len(sync.latest_busted_try_pushes()) > 5:
                    message = ("Too many busted try pushes. "
                               "Check the try results for infrastructure issues.")
                    sync.error = message
                    env.bz.comment(sync.bug, message)
                    try_push.status = "complete"
                    raise AbortError(message)
            elif len(tasks.failed_builds()):
                message = ("Try push had build failures")
                sync.error = message
                env.bz.comment(sync.bug, message)
                try_push.status = "complete"
                try_push.infra_fail = True
                raise AbortError(message)
            else:
                logger.info(f"Try push {try_push!r} for PR {sync.pr} complete")
                disabled = []
                if tasks.has_failures():
                    if sync.affected_tests():
                        log_files = []
                        wpt_tasks = try_push.download_logs(tasks.wpt_tasks)
                        for task in wpt_tasks:
                            for run in task.get("status", {}).get("runs", []):
                                log = run.get("_log_paths", {}).get("wptreport.json")
                                if log:
                                    log_files.append(log)
                        if not log_files:
                            raise ValueError("No log files found for try push %r" % try_push)
                        disabled = sync.update_metadata(log_files, stability=try_push.stability)
                    else:
                        env.bz.comment(sync.bug, ("The PR was not expected to affect any tests, "
                                                  "but the try push wasn't a success. "
                                                  "Check the try results for infrastructure "
                                                  "issues"))
                        # TODO: consider marking the push an error here so that we can't
                        # land without manual intervention

                if try_push.stability and disabled:
                    logger.info("The following tests were disabled:\n%s" % "\n".join(disabled))
                    # TODO notify relevant people about test expectation changes, stability
                    env.bz.comment(sync.bug, ("The following tests were disabled "
                                              "based on stability try push:\n %s" %
                                              "\n".join(disabled)))

            try_push.status = "complete"
            sync.next_try_push()
        finally:
            sync.update_github_check()
    else:
        sync.next_try_push()
        sync.update_github_check()

    if sync.metadata_commit is not None and len(sync.gecko_commits) == 1:
        # Apparently we only have a metadata commit and the actual change got rebased away
        # In this case the metadata commit is probably wrong,
        # and we just want to skip this sync.
        sync.skip = True

    if sync.landable_status == LandableStatus.ready:
        sync.try_notify()


@entry_point("downstream")
@mut("sync")
def update_pr(git_gecko: Repo,
              git_wpt: Repo,
              sync: DownstreamSync,
              action: str,
              merge_sha: str,
              base_sha: str,
              merged_by: str | None = None,
              ) -> None:
    try:
        if action == "closed" and not merge_sha:
            sync.pr_status = "closed"  # type: ignore
            if sync.bug:
                env.bz.set_status(sync.bug, "RESOLVED", "INVALID")
            sync.finish()
        elif action == "closed":
            # We are storing the wpt base as a reference
            sync.data["wpt-base"] = base_sha
            sync.next_try_push()
        elif action == "reopened" or action == "open":
            sync.status = "open"  # type: ignore
            sync.pr_status = "open"  # type: ignore
            sync.next_try_push()
            assert sync.bug is not None
            if sync.bug:
                status = env.bz.get_status(sync.bug)
                if status is not None and status[0] == "RESOLVED":
                    env.bz.set_status(sync.bug, "REOPENED")
        sync.update_github_check()
    except Exception as e:
        sync.error = e
        raise
