hgext/unifyrepo/__init__.py (301 lines of code) (raw):
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
"""Extension for unifying the contents of related by separate repositories.
Using pushlog data, this extension can recreate a new repository that is
chronologically ordered by the push times from multiple source repositories.
In other words, it will recreate the repository as if it were a single
repository from the beginning.
The extension also has the ability to create bookmarks in the destination
repository.
This extension was created to unify the discrete Firefox repositories, which
are logically the same but existed as separate repositories for historical
reasons. For that reason, some config options and defaults may not be
appropriate for all repos.
Config Files
------------
Because the configuration for repository unification can be complicated,
a standalone config file is used to define the source repositories and
how they are integrated into the destination repository.
This config file uses the Mercurial config file parser (basically
an INI file). Sections in the config represent source repositories.
Each section can have the following entries:
path (required)
The filesystem path of the repository.
It must be a filesystem path.
pullrevs (optional)
Revset defining which revisions to pull from this repo into the unified
repo. By default, this is `0:tip`, which means to pull all revisions.
bookmark (optional)
Name of bookmark to associate with the tip-most commit from this repo.
If not defined, the section name will be used.
By default, bookmarks will be created.
nobookmark (optional)
If defined, no bookmark will be defined for revisions coming from this
repo.
The [GLOBAL] section defines special global settings. The following settings
can be defined:
stagepath (required)
Local filesystem path to the repository that will hold contents of all
source repositories.
Every changeset from every source repository will be pulled into this repo.
This provides a common source for adding changesets to the destination repo.
destpath (required)
Local filesystem path to the final, destination repository.
"""
from __future__ import absolute_import
import collections
import gc
import os
from mercurial.node import (
bin,
hex,
)
from mercurial import (
bookmarks,
config,
error,
exchange,
extensions,
hg,
registrar,
)
minimumhgversion = b"4.8"
testedwith = b"4.8 4.9 5.0 5.1 5.2 5.3 5.4 5.5 5.9"
cmdtable = {}
command = registrar.command(cmdtable)
class unifyconfig(object):
def __init__(self, path):
self._c = config.config()
with open(path, "rb") as fh:
self._c.read(path, fh)
if b"GLOBAL" not in self._c:
raise error.Abort(b"config file missing GLOBAL section")
self.stagepath = self._c.get(b"GLOBAL", b"stagepath")
if not self.stagepath:
raise error.Abort(b"GLOBAL.stagepath not defined in config")
self.destpath = self._c.get(b"GLOBAL", b"destpath")
if not self.destpath:
raise error.Abort(b"GLOBAL.destpath not defined in config")
@property
def sources(self):
for name in sorted(self._c):
# uppercase names are reserved for special sections.
if name.upper() == name:
continue
bookmark = self._c.get(name, b"bookmark", name)
if self._c.hasitem(name, b"nobookmark"):
bookmark = None
path = self._c.get(name, b"path")
if not path:
raise error.Abort(b'section %s missing "path"' % name)
yield {
"name": name,
"path": path,
"pullrevs": self._c.get(name, b"pullrevs", b"0:tip"),
"bookmark": bookmark,
}
def emitfastforwardnodes(repo, pushtonode):
"""Given info about the pushes for every node, emit tip-most fast forward nodes.
Basically, emit a node if its DAG branch is different from the DAG branch of
the previous node. We can pull the emitted node to recreate a repository in
push order as if all pushes went to the same repo using the minimum number
of pull operations.
"""
cl = repo.changelog
relevantnodes = set(pushtonode.values())
# We pre-populate the mapping of node to children by iterating the repo
# and looking at parents because this lookup is indexed and much faster.
# node -> set<node>
clchildren = collections.defaultdict(set)
for rev in repo:
node = cl.node(rev)
if node not in relevantnodes:
continue
for p in cl.parents(node):
clchildren[p].add(node)
wantedchildren = set()
lastnode = None
for key, node in sorted(pushtonode.items()):
children = clchildren[node]
# We're a DAG head. Always emit.
if not children:
yield node
wantedchildren = set()
lastnode = None
continue
# If this node is a child of the last node, keep going.
if node in wantedchildren:
lastnode = node
wantedchildren = children
continue
# We aren't a child of the last node. That means we changed DAG
# branches. So emit.
if lastnode:
yield lastnode
lastnode = node
wantedchildren = children
# The loop should emit if there is a DAG head. And the last node should
# always be a DAG head.
assert lastnode is None
def unifypushes(pushtonode):
"""Given a mapping of original push info to node, create a new pushlog.
This emits tuples of (source, pushid, who, when, [nodes]).
"""
lastsource = None
lastid = None
lastwhen = None
lastwho = None
nodes = []
for (when, source, rev, who, pushid), node in sorted(pushtonode.items()):
# First time through.
if lastsource is None:
lastsource = source
lastid = pushid
lastwhen = when
lastwho = who
nodes.append(node)
continue
# Pushlog entries are unique by source repo and push id. Do not compare
# time here because it is possible for different pushes to have the same
# time (in theory) and we want to preserve these as separate events.
if lastsource == source and lastid == pushid:
nodes.append(node)
continue
yield lastsource, lastid, lastwho, lastwhen, nodes
lastsource = source
lastid = pushid
lastwhen = when
lastwho = who
nodes = [node]
if lastsource:
yield lastsource, lastid, lastwho, lastwhen, nodes
def newpushes(repo, unifiedpushes):
"""Obtain push records for the unified destination repo.
The output of this function should be fed into pushlog.recordpushes()
to insert the new push records.
"""
pushlog = repo.pushlog
lastpushid = pushlog.lastpushid()
destpushes = list(pushlog.pushes())
destpushnodes = set()
for push in destpushes:
destpushnodes |= set(bin(n) for n in push.nodes)
for source, pushid, who, when, nodes in unifiedpushes:
missing = [n for n in nodes if n not in destpushnodes]
if not missing:
continue
lastpushid += 1
yield lastpushid, who, when, missing
@command(
b"unifyrepo",
[
(
b"",
b"skipreplicate",
False,
b"Flag to disable `hg replicatesync` on successful unification",
),
],
b"unifyrepo settings",
norepo=True,
)
def unifyrepo(ui, settings, **opts):
"""Unify the contents of multiple source repositories using settings.
The settings file is a Mercurial config file (basically an INI file).
"""
conf = unifyconfig(settings)
# Ensure destrepo is created with generaldelta enabled.
ui.setconfig(b"format", b"usegeneraldelta", True)
ui.setconfig(b"format", b"generaldelta", True)
# Verify all source repos have the same revision 0
rev0s = set()
for source in conf.sources:
repo = hg.repository(ui, path=source["path"])
# Verify
node = repo[0].node()
if rev0s and node not in rev0s:
raise error.Abort(b"repository has different rev 0: %s\n" % source["name"])
# Verify pushlog exists
pushlog = getattr(repo, "pushlog", None)
if not pushlog:
raise error.Abort(
b"pushlog API not available", hint=b"is the pushlog extension loaded?"
)
rev0s.add(node)
# Ensure the staging repo has all changesets from the source repos.
stageui = ui.copy()
# Now collect all the changeset data with pushlog info.
# node -> (when, source, rev, who, pushid)
nodepushinfo = {}
pushcount = 0
allnodes = set()
# Obtain pushlog data from each source repo. We obtain data for every node
# and filter later because we want to be sure we have the earliest known
# push data for a given node.
for source in conf.sources:
path = source["path"]
sourcerepo = hg.repository(ui, path=source["path"])
pushlog = getattr(sourcerepo, "pushlog", None)
index = sourcerepo.changelog.index
revnode = {}
for rev in sourcerepo:
# revlog.node() is too slow. Use the index directly.
node = index[rev][7]
revnode[rev] = node
allnodes.add(node)
noderev = {v: k for k, v in revnode.items()}
localpushcount = 0
pushnodecount = 0
for pushid, who, when, nodes in pushlog.pushes():
pushcount += 1
localpushcount += 1
for node in nodes:
pushnodecount += 1
bnode = bin(node)
# There is a race between us iterating the repo and querying the
# pushlog. A new changeset could be written between when we
# obtain nodes and encounter the pushlog. So ignore pushlog
# for nodes we don't know about.
if bnode not in noderev:
ui.warn(
b"pushlog entry for unknown node: %s; "
b"possible race condition?\n" % node
)
continue
rev = noderev[bnode]
if bnode not in nodepushinfo:
nodepushinfo[bnode] = (when, path, rev, who, pushid)
else:
currentwhen = nodepushinfo[bnode][0]
if when < currentwhen:
nodepushinfo[bnode] = (when, path, rev, who, pushid)
ui.write(
b"obtained pushlog info for %d/%d revisions from %d pushes from %s\n"
% (pushnodecount, len(revnode), localpushcount, source["name"])
)
# Now verify that every node in the source repos has pushlog data.
missingpl = allnodes - set(nodepushinfo.keys())
if missingpl:
raise error.Abort(
b"missing pushlog info for %d nodes: %s\n"
% (len(missingpl), b", ".join(sorted(hex(n) for n in missingpl)))
)
# Filter out changesets we aren't aggregating.
# We also use this pass to identify which nodes to bookmark.
books = {}
sourcenodes = set()
for source in conf.sources:
sourcerepo = hg.repository(ui, path=source["path"])
cl = sourcerepo.changelog
index = cl.index
sourcerevs = sourcerepo.revs(source["pullrevs"])
sourcerevs.sort()
headrevs = set(cl.headrevs())
sourceheadrevs = headrevs & set(sourcerevs)
# We /could/ allow multiple heads from each source repo. But for now
# it is easier to limit to 1 head per source.
if len(sourceheadrevs) > 1:
raise error.Abort(
b"%s has %d heads" % (source["name"], len(sourceheadrevs)),
hint=b"define pullrevs to limit what is aggregated",
)
for rev in cl:
if rev not in sourcerevs:
continue
node = index[rev][7]
sourcenodes.add(node)
if source["bookmark"]:
books[source["bookmark"]] = node
ui.write(
b"aggregating %d/%d revisions for %d heads from %s\n"
% (len(sourcerevs), len(cl), len(sourceheadrevs), source["name"])
)
nodepushinfo = {k: v for k, v in nodepushinfo.items() if k in sourcenodes}
ui.write(
b"aggregating %d/%d nodes from %d original pushes\n"
% (len(nodepushinfo), len(allnodes), pushcount)
)
# We now have accounting for every changeset. Because pulling changesets
# is a bit time consuming, it is worthwhile to minimize the number of pull
# operations. We do this by ordering all changesets by original push time
# then emitting the minimum number of "fast forward" nodes from the tip
# of each linear range inside that list.
# (time, source, rev, user, pushid) -> node
inversenodeinfo = {v: k for k, v in nodepushinfo.items()}
destui = ui.copy()
destui.setconfig(b"format", b"aggressivemergedeltas", True)
destui.setconfig(b"format", b"maxchainlen", 10000)
destrepo = hg.repository(
destui, path=conf.destpath, create=not os.path.exists(conf.destpath)
)
destcl = destrepo.changelog
pullpushinfo = {k: v for k, v in inversenodeinfo.items() if not destcl.hasnode(v)}
ui.write(
b"%d/%d nodes will be pulled\n" % (len(pullpushinfo), len(inversenodeinfo))
)
# Enable aggressive merge deltas on the stage repo to minimize manifest delta
# size. This could make delta chains very long. So we may want to institute a
# delta chain cap on the destination repo. But this will ensure the stage repo
# has the most efficient/compact representation of deltas. Pulling from this
# repo will also inherit the optimal delta, so we don't need to enable
# aggressivemergedeltas on the destination repo.
stageui.setconfig(b"format", b"aggressivemergedeltas", True)
stagerepo = hg.repository(
stageui, path=conf.stagepath, create=not os.path.exists(conf.stagepath)
)
for source in conf.sources:
path = source["path"]
sourcepeer = hg.peer(ui, {}, path)
ui.write(b"pulling %s into %s\n" % (path, conf.stagepath))
exchange.pull(stagerepo, sourcepeer)
pullnodes = list(emitfastforwardnodes(stagerepo, pullpushinfo))
unifiedpushes = list(unifypushes(inversenodeinfo))
ui.write(
b"consolidated into %d pulls from %d unique pushes\n"
% (len(pullnodes), len(unifiedpushes))
)
if not pullnodes:
ui.write(b"nothing to do; exiting\n")
return
stagepeer = hg.peer(ui, {}, conf.stagepath)
for node in pullnodes:
# TODO Bug 1265002 - we should update bookmarks when we pull.
# Otherwise the changesets will get replicated without a bookmark
# and any poor soul who pulls will see a nameless head.
exchange.pull(destrepo, stagepeer, heads=[node])
# For some reason there is a massive memory leak (10+ MB per
# iteration on Firefox repos) if we don't gc here.
gc.collect()
# Now that we've aggregated all the changesets in the destination repo,
# define the pushlog entries.
pushlog = getattr(destrepo, "pushlog", None)
if not pushlog:
raise error.Abort(
b"pushlog API not available", hint=b"is the pushlog extension loaded?"
)
with destrepo.lock():
with destrepo.transaction(b"pushlog") as tr:
insertpushes = list(newpushes(destrepo, unifiedpushes))
ui.write(b"inserting %d pushlog entries\n" % len(insertpushes))
pushlog.recordpushes(insertpushes, tr=tr)
# Verify that pushlog time in revision order is always increasing.
destnodepushtime = {}
for push in destrepo.pushlog.pushes():
for node in push.nodes:
destnodepushtime[bin(node)] = push.when
destcl = destrepo.changelog
lastpushtime = 0
for rev in destrepo:
node = destcl.node(rev)
pushtime = destnodepushtime[node]
if pushtime < lastpushtime:
ui.warn(b"push time for %d is older than %d\n" % (rev, rev - 1))
lastpushtime = pushtime
# Write bookmarks.
ui.write(b"writing %d bookmarks\n" % len(books))
with destrepo.wlock():
with destrepo.lock():
with destrepo.transaction(b"bookmarks") as tr:
bm = bookmarks.bmstore(destrepo)
books.update(
{
book: None # delete any bookmarks not found in the update
for book in bm.keys()
if book not in books
}
)
# Mass replacing may not be the proper strategy. But it works for
# our current use case.
bm.applychanges(destrepo, tr, books.items())
if not opts.get("skipreplicate"):
# This is a bit hacky. Pushlog and bookmarks aren't currently replicated
# via the normal hooks mechanism because we use the low-level APIs to
# write them. So, we send a replication message to sync the entire repo.
try:
vcsr = extensions.find(b"vcsreplicator")
except KeyError:
raise error.Abort(
b"vcsreplicator extension not installed; "
b"pushlog and bookmarks may not be replicated properly"
)
vcsr.replicatecommand(destrepo.ui, destrepo)