def update_landing()

in sync/landing.py [0:0]


def update_landing(git_gecko: Repo,
                   git_wpt: Repo,
                   prev_wpt_head: Any | None = None,
                   new_wpt_head: Any | None = None,
                   include_incomplete: bool = False,
                   retry: bool = False,
                   allow_push: bool = True,
                   accept_failures: bool = False,
                   ) -> LandingSync | None:
    """Create or continue a landing of wpt commits to gecko.

    :param prev_wpt_head: The sha1 of the previous wpt commit landed to gecko.
    :param wpt_head: The sha1 of the latest possible commit to land to gecko,
                     or None to use the head of the master branch"
    :param include_incomplete: By default we don't attempt to land anything that
                               hasn't completed a metadata update. This flag disables
                               that and just lands everything up to the specified commit.
    :param retry: Create a new try push for the landing even if there's an existing one
    :param allow_push: Allow pushing to gecko if try is complete
    :param accept_failures: Don't fail if an existing try push has too many failures """
    landing = current(git_gecko, git_wpt)
    sync_point = load_sync_point(git_gecko, git_wpt)

    with SyncLock("landing", None) as lock:
        assert isinstance(lock, SyncLock)
        if landing is None:
            update_repositories(git_gecko, git_wpt)
            if prev_wpt_head is None:
                prev_wpt_head = sync_point["upstream"]

            landable = landable_commits(git_gecko, git_wpt,
                                        prev_wpt_head,
                                        wpt_head=new_wpt_head,
                                        include_incomplete=include_incomplete)
            if landable is None:
                return None
            wpt_head, commits = landable
            landing = LandingSync.new(lock, git_gecko, git_wpt, prev_wpt_head, wpt_head)

            # Set the landing to block all the bugs that will land with it
            blocks = [sync.bug for (pr_, sync, commits_) in commits
                      if isinstance(sync, downstream.DownstreamSync) and
                      sync.bug is not None]
            assert landing.bug is not None
            with env.bz.bug_ctx(landing.bug) as bug:
                for bug_id in blocks:
                    bug.add_blocks(bug_id)
        else:
            if prev_wpt_head and landing.wpt_commits.base.sha1 != prev_wpt_head:
                raise AbortError("Existing landing base commit %s doesn't match"
                                 "supplied previous wpt head %s" % (landing.wpt_commits.base.sha1,
                                                                    prev_wpt_head))
            elif new_wpt_head and landing.wpt_commits.head.sha1 != new_wpt_head:
                raise AbortError("Existing landing head commit %s doesn't match"
                                 "supplied wpt head %s" % (landing.wpt_commits.head.sha1,
                                                           new_wpt_head))
            head = landing.gecko_commits.head
            if git_gecko.is_ancestor(head.commit,
                                     git_gecko.rev_parse(
                                         env.config["gecko"]["refs"]["central"])):
                logger.info("Landing reached central")
                with landing.as_mut(lock):
                    landing.finish()
                return None
            elif git_gecko.is_ancestor(head.commit,
                                       git_gecko.rev_parse(landing.gecko_integration_branch())):
                logger.info("Landing is on inbound but not yet on central")
                return None

            landable = landable_commits(git_gecko,
                                        git_wpt,
                                        landing.wpt_commits.base.sha1,
                                        landing.wpt_commits.head.sha1,
                                        include_incomplete=include_incomplete)
            if landable is None:
                raise AbortError("No new commits are landable")
            wpt_head, commits = landable
            assert wpt_head == landing.wpt_commits.head.sha1

        pushed = False

        with landing.as_mut(lock):
            if landing.latest_try_push is None:
                landing.apply_prs(prev_wpt_head, commits)

                landing.update_bug_components()

                landing.update_sync_point(sync_point)

                landing.next_try_push()
            elif retry:
                try:
                    landing.gecko_rebase(landing.gecko_landing_branch())
                except AbortError:
                    message = record_rebase_failure(landing)
                    raise AbortError(message)

                with landing.latest_try_push.as_mut(lock):
                    landing.latest_try_push.status = "complete"  # type: ignore
                landing.next_try_push(retry=True)
            else:
                try_push = landing.latest_try_push
                try_result = landing.try_result()
                if try_push.status == "complete" and (try_result.is_ok() or
                                                      accept_failures):
                    try:
                        landing.gecko_rebase(landing.gecko_landing_branch())
                    except AbortError:
                        message = record_rebase_failure(landing)
                        raise AbortError(message)

                    if landing.next_try_push() is None:
                        push_to_gecko(git_gecko, git_wpt, landing, allow_push)
                        pushed = True
                elif try_result == TryPushResult.pending:
                    logger.info("Existing try push %s is waiting for try results" %
                                try_push.treeherder_url)
                else:
                    logger.info("Existing try push %s requires manual fixup" %
                                try_push.treeherder_url)

        try_notify_downstream(commits, landing_is_complete=pushed)

        if pushed:
            retrigger()
    return landing