def hgssh()

in pylib/vcsreplicator/vcsreplicator/bootstrap.py [0:0]


def hgssh():
    """hgssh component of the vcsreplicator bootstrap procedure."""
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("config", help="Path to config file")
    parser.add_argument("hg", help="Path to hg executable for use in bootstrap process")
    parser.add_argument(
        "--workers",
        help="Number of concurrent workers to use for publishing messages",
        type=int,
        default=multiprocessing.cpu_count(),
    )
    parser.add_argument("--output", help="Output file path for hgssh JSON")
    args = parser.parse_args()

    config = Config(filename=args.config)

    topic = config.c.get("replicationproducer", "topic")

    # Create consumer to gather partition offsets
    consumer_config = {
        # set this so offsets are committed to Zookeeper
        "api_version": (0, 8, 1),
        "bootstrap_servers": [
            host.strip()
            for host in config.c.get("replicationproducer", "hosts").split(",")
        ],
        "enable_auto_commit": False,  # We don't actually commit but this is just for good measure
    }
    consumer = KafkaConsumer(**consumer_config)

    # This call populates topic metadata for all topics in the cluster.
    # Needed as missing topic metadata can cause the below call to retrieve
    # partition information to fail.
    consumer.topics()

    partitions = consumer.partitions_for_topic(topic)
    if not partitions:
        logger.critical("could not get partitions for %s" % topic)
        sys.exit(1)

    # Gather the initial offsets
    topicpartitions = [
        TopicPartition(topic, partition_number)
        for partition_number in sorted(partitions)
    ]
    offsets_start = consumer.end_offsets(topicpartitions)
    logger.info("gathered initial Kafka offsets")

    # Mapping of `replicatesync` future to corresponding repo name
    replicatesync_futures = {}
    with futures.ThreadPoolExecutor(args.workers) as e:
        # Create a future which makes a `replicatesync` call
        # for each repo on hg.mo
        for repo in find_hg_repos(REPOS_DIR):
            # Create a future to call `replicatesync` for this repo
            replicatesync_args = [
                args.hg,
                "-R",
                repo,
                "replicatesync",
                "--bootstrap",
            ]
            replicatesync_futures.update(
                {e.submit(subprocess.check_output, replicatesync_args): repo}
            )

            logger.info("calling `replicatesync --bootstrap` on %s" % repo)

        # Execute the futures and raise an Exception on fail
        for future in futures.as_completed(replicatesync_futures):
            repo = replicatesync_futures[future]

            exc = future.exception()
            if exc:
                logger.error(
                    "error occurred calling `replicatesync --bootstrap` on %s: %s"
                    % (repo, exc)
                )
                raise Exception(
                    "error triggering replication of Mercurial repo %s: %s"
                    % (repo, exc)
                )
            logger.info("called `replicatesync --bootstrap` on %s successfully" % repo)

    # Gather the final offsets
    offsets_end = consumer.end_offsets(topicpartitions)
    logger.info("gathered final Kafka offsets")

    # Create map of partition numbers to (start, end) offset tuples
    offsets_combined = {
        int(topicpartition.partition): (
            offsets_start[topicpartition],
            offsets_end[topicpartition],
        )
        for topicpartition in topicpartitions
    }

    # Create JSON for processing in ansible and print to stdout
    # Convert repo paths into their wire representations
    output = {
        "offsets": offsets_combined,
        "repositories": sorted(
            [
                config.get_replication_path_rewrite(repo)
                for repo in replicatesync_futures.values()
            ]
        ),
    }

    print(json.dumps(output, sort_keys=True))
    logger.info("hgssh bootstrap process complete!")

    # Send output to a file if requested
    if args.output:
        logger.info("writing output to %s" % args.output)
        with open(args.output, "w") as f:
            json.dump(output, f)