def do_status()

in sync/command.py [0:0]


def do_status(git_gecko: Repo,
              git_wpt: Repo,
              obj_type: str,
              sync_type: str,
              obj_id: int,
              new_status: str,
              seq_id: int | None = None,
              old_status: int | None = None,
              **kwargs: Any
              ) -> None:
    from . import upstream
    from . import downstream
    from . import landing
    from . import trypush
    objs: Iterable[TryPush | SyncProcess] = []
    if obj_type == "try":
        try_pushes = trypush.TryPush.load_by_obj(git_gecko,
                                                 sync_type,
                                                 obj_id,
                                                 seq_id=seq_id)
        if TYPE_CHECKING:
            objs = cast(Set[trypush.TryPush], try_pushes)
        else:
            objs = try_pushes
    else:
        if sync_type == "upstream":
            cls: type[SyncProcess] = upstream.UpstreamSync
        if sync_type == "downstream":
            cls = downstream.DownstreamSync
        if sync_type == "landing":
            cls = landing.LandingSync
        objs = cls.load_by_obj(git_gecko,
                               git_wpt,
                               obj_id,
                               seq_id=seq_id)

    if old_status is not None:
        objs = {item for item in objs if item.status == old_status}

    if not objs:
        logger.error("No matching syncs found")

    for obj in objs:
        logger.info(f"Setting status of {obj.process_name} to {new_status}")
        with SyncLock.for_process(obj.process_name) as lock:
            assert isinstance(lock, SyncLock)
            with obj.as_mut(lock):
                obj.status = new_status  # type: ignore