def unifyrepo()

in hgext/unifyrepo/__init__.py [0:0]


def unifyrepo(ui, settings, **opts):
    """Unify the contents of multiple source repositories using settings.

    The settings file is a Mercurial config file (basically an INI file).
    """
    conf = unifyconfig(settings)

    # Ensure destrepo is created with generaldelta enabled.
    ui.setconfig(b"format", b"usegeneraldelta", True)
    ui.setconfig(b"format", b"generaldelta", True)

    # Verify all source repos have the same revision 0
    rev0s = set()
    for source in conf.sources:
        repo = hg.repository(ui, path=source["path"])

        # Verify
        node = repo[0].node()
        if rev0s and node not in rev0s:
            raise error.Abort(b"repository has different rev 0: %s\n" % source["name"])

        # Verify pushlog exists
        pushlog = getattr(repo, "pushlog", None)
        if not pushlog:
            raise error.Abort(
                b"pushlog API not available", hint=b"is the pushlog extension loaded?"
            )

        rev0s.add(node)

    # Ensure the staging repo has all changesets from the source repos.

    stageui = ui.copy()

    # Now collect all the changeset data with pushlog info.
    # node -> (when, source, rev, who, pushid)
    nodepushinfo = {}
    pushcount = 0
    allnodes = set()

    # Obtain pushlog data from each source repo. We obtain data for every node
    # and filter later because we want to be sure we have the earliest known
    # push data for a given node.
    for source in conf.sources:
        path = source["path"]
        sourcerepo = hg.repository(ui, path=source["path"])
        pushlog = getattr(sourcerepo, "pushlog", None)

        index = sourcerepo.changelog.index
        revnode = {}
        for rev in sourcerepo:
            # revlog.node() is too slow. Use the index directly.
            node = index[rev][7]
            revnode[rev] = node
            allnodes.add(node)

        noderev = {v: k for k, v in revnode.items()}

        localpushcount = 0
        pushnodecount = 0
        for pushid, who, when, nodes in pushlog.pushes():
            pushcount += 1
            localpushcount += 1
            for node in nodes:
                pushnodecount += 1
                bnode = bin(node)

                # There is a race between us iterating the repo and querying the
                # pushlog. A new changeset could be written between when we
                # obtain nodes and encounter the pushlog. So ignore pushlog
                # for nodes we don't know about.
                if bnode not in noderev:
                    ui.warn(
                        b"pushlog entry for unknown node: %s; "
                        b"possible race condition?\n" % node
                    )
                    continue

                rev = noderev[bnode]

                if bnode not in nodepushinfo:
                    nodepushinfo[bnode] = (when, path, rev, who, pushid)
                else:
                    currentwhen = nodepushinfo[bnode][0]
                    if when < currentwhen:
                        nodepushinfo[bnode] = (when, path, rev, who, pushid)

        ui.write(
            b"obtained pushlog info for %d/%d revisions from %d pushes from %s\n"
            % (pushnodecount, len(revnode), localpushcount, source["name"])
        )

    # Now verify that every node in the source repos has pushlog data.
    missingpl = allnodes - set(nodepushinfo.keys())
    if missingpl:
        raise error.Abort(
            b"missing pushlog info for %d nodes: %s\n"
            % (len(missingpl), b", ".join(sorted(hex(n) for n in missingpl)))
        )

    # Filter out changesets we aren't aggregating.
    # We also use this pass to identify which nodes to bookmark.
    books = {}
    sourcenodes = set()
    for source in conf.sources:
        sourcerepo = hg.repository(ui, path=source["path"])
        cl = sourcerepo.changelog
        index = cl.index

        sourcerevs = sourcerepo.revs(source["pullrevs"])
        sourcerevs.sort()
        headrevs = set(cl.headrevs())
        sourceheadrevs = headrevs & set(sourcerevs)

        # We /could/ allow multiple heads from each source repo. But for now
        # it is easier to limit to 1 head per source.
        if len(sourceheadrevs) > 1:
            raise error.Abort(
                b"%s has %d heads" % (source["name"], len(sourceheadrevs)),
                hint=b"define pullrevs to limit what is aggregated",
            )

        for rev in cl:
            if rev not in sourcerevs:
                continue

            node = index[rev][7]
            sourcenodes.add(node)
            if source["bookmark"]:
                books[source["bookmark"]] = node

        ui.write(
            b"aggregating %d/%d revisions for %d heads from %s\n"
            % (len(sourcerevs), len(cl), len(sourceheadrevs), source["name"])
        )

    nodepushinfo = {k: v for k, v in nodepushinfo.items() if k in sourcenodes}

    ui.write(
        b"aggregating %d/%d nodes from %d original pushes\n"
        % (len(nodepushinfo), len(allnodes), pushcount)
    )

    # We now have accounting for every changeset. Because pulling changesets
    # is a bit time consuming, it is worthwhile to minimize the number of pull
    # operations. We do this by ordering all changesets by original push time
    # then emitting the minimum number of "fast forward" nodes from the tip
    # of each linear range inside that list.

    # (time, source, rev, user, pushid) -> node
    inversenodeinfo = {v: k for k, v in nodepushinfo.items()}

    destui = ui.copy()
    destui.setconfig(b"format", b"aggressivemergedeltas", True)
    destui.setconfig(b"format", b"maxchainlen", 10000)

    destrepo = hg.repository(
        destui, path=conf.destpath, create=not os.path.exists(conf.destpath)
    )
    destcl = destrepo.changelog
    pullpushinfo = {k: v for k, v in inversenodeinfo.items() if not destcl.hasnode(v)}

    ui.write(
        b"%d/%d nodes will be pulled\n" % (len(pullpushinfo), len(inversenodeinfo))
    )

    # Enable aggressive merge deltas on the stage repo to minimize manifest delta
    # size. This could make delta chains very long. So we may want to institute a
    # delta chain cap on the destination repo. But this will ensure the stage repo
    # has the most efficient/compact representation of deltas. Pulling from this
    # repo will also inherit the optimal delta, so we don't need to enable
    # aggressivemergedeltas on the destination repo.
    stageui.setconfig(b"format", b"aggressivemergedeltas", True)

    stagerepo = hg.repository(
        stageui, path=conf.stagepath, create=not os.path.exists(conf.stagepath)
    )

    for source in conf.sources:
        path = source["path"]
        sourcepeer = hg.peer(ui, {}, path)
        ui.write(b"pulling %s into %s\n" % (path, conf.stagepath))
        exchange.pull(stagerepo, sourcepeer)

    pullnodes = list(emitfastforwardnodes(stagerepo, pullpushinfo))
    unifiedpushes = list(unifypushes(inversenodeinfo))

    ui.write(
        b"consolidated into %d pulls from %d unique pushes\n"
        % (len(pullnodes), len(unifiedpushes))
    )

    if not pullnodes:
        ui.write(b"nothing to do; exiting\n")
        return

    stagepeer = hg.peer(ui, {}, conf.stagepath)

    for node in pullnodes:
        # TODO Bug 1265002 - we should update bookmarks when we pull.
        # Otherwise the changesets will get replicated without a bookmark
        # and any poor soul who pulls will see a nameless head.
        exchange.pull(destrepo, stagepeer, heads=[node])
        # For some reason there is a massive memory leak (10+ MB per
        # iteration on Firefox repos) if we don't gc here.
        gc.collect()

    # Now that we've aggregated all the changesets in the destination repo,
    # define the pushlog entries.
    pushlog = getattr(destrepo, "pushlog", None)
    if not pushlog:
        raise error.Abort(
            b"pushlog API not available", hint=b"is the pushlog extension loaded?"
        )

    with destrepo.lock():
        with destrepo.transaction(b"pushlog") as tr:
            insertpushes = list(newpushes(destrepo, unifiedpushes))
            ui.write(b"inserting %d pushlog entries\n" % len(insertpushes))
            pushlog.recordpushes(insertpushes, tr=tr)

    # Verify that pushlog time in revision order is always increasing.
    destnodepushtime = {}
    for push in destrepo.pushlog.pushes():
        for node in push.nodes:
            destnodepushtime[bin(node)] = push.when

    destcl = destrepo.changelog
    lastpushtime = 0
    for rev in destrepo:
        node = destcl.node(rev)
        pushtime = destnodepushtime[node]

        if pushtime < lastpushtime:
            ui.warn(b"push time for %d is older than %d\n" % (rev, rev - 1))

        lastpushtime = pushtime

    # Write bookmarks.
    ui.write(b"writing %d bookmarks\n" % len(books))

    with destrepo.wlock():
        with destrepo.lock():
            with destrepo.transaction(b"bookmarks") as tr:
                bm = bookmarks.bmstore(destrepo)
                books.update(
                    {
                        book: None  # delete any bookmarks not found in the update
                        for book in bm.keys()
                        if book not in books
                    }
                )
                # Mass replacing may not be the proper strategy. But it works for
                # our current use case.
                bm.applychanges(destrepo, tr, books.items())

    if not opts.get("skipreplicate"):
        # This is a bit hacky. Pushlog and bookmarks aren't currently replicated
        # via the normal hooks mechanism because we use the low-level APIs to
        # write them. So, we send a replication message to sync the entire repo.
        try:
            vcsr = extensions.find(b"vcsreplicator")
        except KeyError:
            raise error.Abort(
                b"vcsreplicator extension not installed; "
                b"pushlog and bookmarks may not be replicated properly"
            )

        vcsr.replicatecommand(destrepo.ui, destrepo)