in pylib/vcsreplicator/vcsreplicator/bootstrap.py [0:0]
def hgssh():
"""hgssh component of the vcsreplicator bootstrap procedure."""
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("config", help="Path to config file")
parser.add_argument("hg", help="Path to hg executable for use in bootstrap process")
parser.add_argument(
"--workers",
help="Number of concurrent workers to use for publishing messages",
type=int,
default=multiprocessing.cpu_count(),
)
parser.add_argument("--output", help="Output file path for hgssh JSON")
args = parser.parse_args()
config = Config(filename=args.config)
topic = config.c.get("replicationproducer", "topic")
# Create consumer to gather partition offsets
consumer_config = {
# set this so offsets are committed to Zookeeper
"api_version": (0, 8, 1),
"bootstrap_servers": [
host.strip()
for host in config.c.get("replicationproducer", "hosts").split(",")
],
"enable_auto_commit": False, # We don't actually commit but this is just for good measure
}
consumer = KafkaConsumer(**consumer_config)
# This call populates topic metadata for all topics in the cluster.
# Needed as missing topic metadata can cause the below call to retrieve
# partition information to fail.
consumer.topics()
partitions = consumer.partitions_for_topic(topic)
if not partitions:
logger.critical("could not get partitions for %s" % topic)
sys.exit(1)
# Gather the initial offsets
topicpartitions = [
TopicPartition(topic, partition_number)
for partition_number in sorted(partitions)
]
offsets_start = consumer.end_offsets(topicpartitions)
logger.info("gathered initial Kafka offsets")
# Mapping of `replicatesync` future to corresponding repo name
replicatesync_futures = {}
with futures.ThreadPoolExecutor(args.workers) as e:
# Create a future which makes a `replicatesync` call
# for each repo on hg.mo
for repo in find_hg_repos(REPOS_DIR):
# Create a future to call `replicatesync` for this repo
replicatesync_args = [
args.hg,
"-R",
repo,
"replicatesync",
"--bootstrap",
]
replicatesync_futures.update(
{e.submit(subprocess.check_output, replicatesync_args): repo}
)
logger.info("calling `replicatesync --bootstrap` on %s" % repo)
# Execute the futures and raise an Exception on fail
for future in futures.as_completed(replicatesync_futures):
repo = replicatesync_futures[future]
exc = future.exception()
if exc:
logger.error(
"error occurred calling `replicatesync --bootstrap` on %s: %s"
% (repo, exc)
)
raise Exception(
"error triggering replication of Mercurial repo %s: %s"
% (repo, exc)
)
logger.info("called `replicatesync --bootstrap` on %s successfully" % repo)
# Gather the final offsets
offsets_end = consumer.end_offsets(topicpartitions)
logger.info("gathered final Kafka offsets")
# Create map of partition numbers to (start, end) offset tuples
offsets_combined = {
int(topicpartition.partition): (
offsets_start[topicpartition],
offsets_end[topicpartition],
)
for topicpartition in topicpartitions
}
# Create JSON for processing in ansible and print to stdout
# Convert repo paths into their wire representations
output = {
"offsets": offsets_combined,
"repositories": sorted(
[
config.get_replication_path_rewrite(repo)
for repo in replicatesync_futures.values()
]
),
}
print(json.dumps(output, sort_keys=True))
logger.info("hgssh bootstrap process complete!")
# Send output to a file if requested
if args.output:
logger.info("writing output to %s" % args.output)
with open(args.output, "w") as f:
json.dump(output, f)