in sync/command.py [0:0]
def do_landing(git_gecko: Repo,
git_wpt: Repo,
wpt_head: str | None = None,
prev_wpt_head: str | None = None,
include_incomplete: bool = False,
accept_failures: bool = False,
retry: bool = False,
push: bool = True,
**kwargs: Any
) -> None:
from . import errors
from . import landing
from . import update
current_landing = landing.current(git_gecko, git_wpt)
def update_landing():
landing.update_landing(git_gecko,
git_wpt,
prev_wpt_head,
wpt_head,
include_incomplete,
retry=retry,
accept_failures=accept_failures)
if current_landing and current_landing.latest_try_push:
with SyncLock("landing", None) as lock:
assert isinstance(lock, SyncLock)
try_push = current_landing.latest_try_push
logger.info("Found try push %s" % try_push.treeherder_url)
if try_push.taskgroup_id is None:
update.update_taskgroup_ids(git_gecko, git_wpt, try_push)
assert try_push.taskgroup_id is not None
with try_push.as_mut(lock), current_landing.as_mut(lock):
if retry:
update_landing()
elif try_push.status == "open":
tasks = try_push.tasks()
try_result = current_landing.try_result(tasks=tasks)
if try_result == landing.TryPushResult.pending:
logger.info("Landing in bug %s is waiting for try results" % landing.bug)
else:
try:
landing.try_push_complete(git_gecko,
git_wpt,
try_push,
current_landing,
allow_push=push,
accept_failures=accept_failures,
tasks=tasks)
except errors.AbortError:
# Don't need to raise an error here because
# the logging is the important part
return
else:
update_landing()
else:
update_landing()