in sync/command.py [0:0]
def do_try_push_add(git_gecko: Repo,
git_wpt: Repo,
try_rev: str,
stability: bool = False,
sync_type: str | None = None,
sync_id: int | None = None,
rebuild_count: int | None = None,
**kwargs: Any
) -> None:
from . import downstream
from . import landing
from . import trypush
sync = None
if sync_type is None:
sync = sync_from_path(git_gecko, git_wpt)
if sync is None:
logger.error("No sync type supplied and no sync for current path")
return
else:
if sync_id is None:
logger.error("A sync id is required when a sync type is supplied")
return
if sync_type == "downstream":
sync = downstream.DownstreamSync.for_pr(git_gecko,
git_wpt,
sync_id)
elif sync_type == "landing":
syncs = landing.LandingSync.for_bug(git_gecko,
git_wpt,
sync_id,
statuses=None,
flat=True)
if syncs:
sync = syncs[0]
else:
logger.error("Invalid sync type %s" % sync_type)
return
if not sync:
raise ValueError
class FakeTry:
def __init__(self, *_args, **_kwargs):
pass
def __enter__(self):
return self
def __exit__(self, *args):
pass
def push(self):
return try_rev
with SyncLock.for_process(sync.process_name) as lock:
assert isinstance(lock, SyncLock)
with sync.as_mut(lock):
trypush = trypush.TryPush.create(lock,
sync,
None,
stability=stability,
try_cls=FakeTry,
rebuild_count=rebuild_count,
check_open=False)
print("Now run an update for the sync")