in hgext/unifyrepo/__init__.py [0:0]
def unifyrepo(ui, settings, **opts):
"""Unify the contents of multiple source repositories using settings.
The settings file is a Mercurial config file (basically an INI file).
"""
conf = unifyconfig(settings)
# Ensure destrepo is created with generaldelta enabled.
ui.setconfig(b"format", b"usegeneraldelta", True)
ui.setconfig(b"format", b"generaldelta", True)
# Verify all source repos have the same revision 0
rev0s = set()
for source in conf.sources:
repo = hg.repository(ui, path=source["path"])
# Verify
node = repo[0].node()
if rev0s and node not in rev0s:
raise error.Abort(b"repository has different rev 0: %s\n" % source["name"])
# Verify pushlog exists
pushlog = getattr(repo, "pushlog", None)
if not pushlog:
raise error.Abort(
b"pushlog API not available", hint=b"is the pushlog extension loaded?"
)
rev0s.add(node)
# Ensure the staging repo has all changesets from the source repos.
stageui = ui.copy()
# Now collect all the changeset data with pushlog info.
# node -> (when, source, rev, who, pushid)
nodepushinfo = {}
pushcount = 0
allnodes = set()
# Obtain pushlog data from each source repo. We obtain data for every node
# and filter later because we want to be sure we have the earliest known
# push data for a given node.
for source in conf.sources:
path = source["path"]
sourcerepo = hg.repository(ui, path=source["path"])
pushlog = getattr(sourcerepo, "pushlog", None)
index = sourcerepo.changelog.index
revnode = {}
for rev in sourcerepo:
# revlog.node() is too slow. Use the index directly.
node = index[rev][7]
revnode[rev] = node
allnodes.add(node)
noderev = {v: k for k, v in revnode.items()}
localpushcount = 0
pushnodecount = 0
for pushid, who, when, nodes in pushlog.pushes():
pushcount += 1
localpushcount += 1
for node in nodes:
pushnodecount += 1
bnode = bin(node)
# There is a race between us iterating the repo and querying the
# pushlog. A new changeset could be written between when we
# obtain nodes and encounter the pushlog. So ignore pushlog
# for nodes we don't know about.
if bnode not in noderev:
ui.warn(
b"pushlog entry for unknown node: %s; "
b"possible race condition?\n" % node
)
continue
rev = noderev[bnode]
if bnode not in nodepushinfo:
nodepushinfo[bnode] = (when, path, rev, who, pushid)
else:
currentwhen = nodepushinfo[bnode][0]
if when < currentwhen:
nodepushinfo[bnode] = (when, path, rev, who, pushid)
ui.write(
b"obtained pushlog info for %d/%d revisions from %d pushes from %s\n"
% (pushnodecount, len(revnode), localpushcount, source["name"])
)
# Now verify that every node in the source repos has pushlog data.
missingpl = allnodes - set(nodepushinfo.keys())
if missingpl:
raise error.Abort(
b"missing pushlog info for %d nodes: %s\n"
% (len(missingpl), b", ".join(sorted(hex(n) for n in missingpl)))
)
# Filter out changesets we aren't aggregating.
# We also use this pass to identify which nodes to bookmark.
books = {}
sourcenodes = set()
for source in conf.sources:
sourcerepo = hg.repository(ui, path=source["path"])
cl = sourcerepo.changelog
index = cl.index
sourcerevs = sourcerepo.revs(source["pullrevs"])
sourcerevs.sort()
headrevs = set(cl.headrevs())
sourceheadrevs = headrevs & set(sourcerevs)
# We /could/ allow multiple heads from each source repo. But for now
# it is easier to limit to 1 head per source.
if len(sourceheadrevs) > 1:
raise error.Abort(
b"%s has %d heads" % (source["name"], len(sourceheadrevs)),
hint=b"define pullrevs to limit what is aggregated",
)
for rev in cl:
if rev not in sourcerevs:
continue
node = index[rev][7]
sourcenodes.add(node)
if source["bookmark"]:
books[source["bookmark"]] = node
ui.write(
b"aggregating %d/%d revisions for %d heads from %s\n"
% (len(sourcerevs), len(cl), len(sourceheadrevs), source["name"])
)
nodepushinfo = {k: v for k, v in nodepushinfo.items() if k in sourcenodes}
ui.write(
b"aggregating %d/%d nodes from %d original pushes\n"
% (len(nodepushinfo), len(allnodes), pushcount)
)
# We now have accounting for every changeset. Because pulling changesets
# is a bit time consuming, it is worthwhile to minimize the number of pull
# operations. We do this by ordering all changesets by original push time
# then emitting the minimum number of "fast forward" nodes from the tip
# of each linear range inside that list.
# (time, source, rev, user, pushid) -> node
inversenodeinfo = {v: k for k, v in nodepushinfo.items()}
destui = ui.copy()
destui.setconfig(b"format", b"aggressivemergedeltas", True)
destui.setconfig(b"format", b"maxchainlen", 10000)
destrepo = hg.repository(
destui, path=conf.destpath, create=not os.path.exists(conf.destpath)
)
destcl = destrepo.changelog
pullpushinfo = {k: v for k, v in inversenodeinfo.items() if not destcl.hasnode(v)}
ui.write(
b"%d/%d nodes will be pulled\n" % (len(pullpushinfo), len(inversenodeinfo))
)
# Enable aggressive merge deltas on the stage repo to minimize manifest delta
# size. This could make delta chains very long. So we may want to institute a
# delta chain cap on the destination repo. But this will ensure the stage repo
# has the most efficient/compact representation of deltas. Pulling from this
# repo will also inherit the optimal delta, so we don't need to enable
# aggressivemergedeltas on the destination repo.
stageui.setconfig(b"format", b"aggressivemergedeltas", True)
stagerepo = hg.repository(
stageui, path=conf.stagepath, create=not os.path.exists(conf.stagepath)
)
for source in conf.sources:
path = source["path"]
sourcepeer = hg.peer(ui, {}, path)
ui.write(b"pulling %s into %s\n" % (path, conf.stagepath))
exchange.pull(stagerepo, sourcepeer)
pullnodes = list(emitfastforwardnodes(stagerepo, pullpushinfo))
unifiedpushes = list(unifypushes(inversenodeinfo))
ui.write(
b"consolidated into %d pulls from %d unique pushes\n"
% (len(pullnodes), len(unifiedpushes))
)
if not pullnodes:
ui.write(b"nothing to do; exiting\n")
return
stagepeer = hg.peer(ui, {}, conf.stagepath)
for node in pullnodes:
# TODO Bug 1265002 - we should update bookmarks when we pull.
# Otherwise the changesets will get replicated without a bookmark
# and any poor soul who pulls will see a nameless head.
exchange.pull(destrepo, stagepeer, heads=[node])
# For some reason there is a massive memory leak (10+ MB per
# iteration on Firefox repos) if we don't gc here.
gc.collect()
# Now that we've aggregated all the changesets in the destination repo,
# define the pushlog entries.
pushlog = getattr(destrepo, "pushlog", None)
if not pushlog:
raise error.Abort(
b"pushlog API not available", hint=b"is the pushlog extension loaded?"
)
with destrepo.lock():
with destrepo.transaction(b"pushlog") as tr:
insertpushes = list(newpushes(destrepo, unifiedpushes))
ui.write(b"inserting %d pushlog entries\n" % len(insertpushes))
pushlog.recordpushes(insertpushes, tr=tr)
# Verify that pushlog time in revision order is always increasing.
destnodepushtime = {}
for push in destrepo.pushlog.pushes():
for node in push.nodes:
destnodepushtime[bin(node)] = push.when
destcl = destrepo.changelog
lastpushtime = 0
for rev in destrepo:
node = destcl.node(rev)
pushtime = destnodepushtime[node]
if pushtime < lastpushtime:
ui.warn(b"push time for %d is older than %d\n" % (rev, rev - 1))
lastpushtime = pushtime
# Write bookmarks.
ui.write(b"writing %d bookmarks\n" % len(books))
with destrepo.wlock():
with destrepo.lock():
with destrepo.transaction(b"bookmarks") as tr:
bm = bookmarks.bmstore(destrepo)
books.update(
{
book: None # delete any bookmarks not found in the update
for book in bm.keys()
if book not in books
}
)
# Mass replacing may not be the proper strategy. But it works for
# our current use case.
bm.applychanges(destrepo, tr, books.items())
if not opts.get("skipreplicate"):
# This is a bit hacky. Pushlog and bookmarks aren't currently replicated
# via the normal hooks mechanism because we use the low-level APIs to
# write them. So, we send a replication message to sync the entire repo.
try:
vcsr = extensions.find(b"vcsreplicator")
except KeyError:
raise error.Abort(
b"vcsreplicator extension not installed; "
b"pushlog and bookmarks may not be replicated properly"
)
vcsr.replicatecommand(destrepo.ui, destrepo)