in sync/command.py [0:0]
def do_status(git_gecko: Repo,
git_wpt: Repo,
obj_type: str,
sync_type: str,
obj_id: int,
new_status: str,
seq_id: int | None = None,
old_status: int | None = None,
**kwargs: Any
) -> None:
from . import upstream
from . import downstream
from . import landing
from . import trypush
objs: Iterable[TryPush | SyncProcess] = []
if obj_type == "try":
try_pushes = trypush.TryPush.load_by_obj(git_gecko,
sync_type,
obj_id,
seq_id=seq_id)
if TYPE_CHECKING:
objs = cast(Set[trypush.TryPush], try_pushes)
else:
objs = try_pushes
else:
if sync_type == "upstream":
cls: type[SyncProcess] = upstream.UpstreamSync
if sync_type == "downstream":
cls = downstream.DownstreamSync
if sync_type == "landing":
cls = landing.LandingSync
objs = cls.load_by_obj(git_gecko,
git_wpt,
obj_id,
seq_id=seq_id)
if old_status is not None:
objs = {item for item in objs if item.status == old_status}
if not objs:
logger.error("No matching syncs found")
for obj in objs:
logger.info(f"Setting status of {obj.process_name} to {new_status}")
with SyncLock.for_process(obj.process_name) as lock:
assert isinstance(lock, SyncLock)
with obj.as_mut(lock):
obj.status = new_status # type: ignore