def hgweb()

in pylib/vcsreplicator/vcsreplicator/bootstrap.py [0:0]


def hgweb():
    """hgweb component of the vcsreplicator bootstrap procedure. Takes a
    vcsreplicator config path on the CLI and takes a JSON data structure
    on stdin"""
    import argparse

    # Parse CLI args
    parser = argparse.ArgumentParser()
    parser.add_argument("config", help="Path of config file to load")
    parser.add_argument(
        "input",
        help="JSON data input (output from the hgssh bootstrap procedure) file path",
    )
    parser.add_argument(
        "--workers",
        help="Number of concurrent workers to use for performing clones",
        type=int,
        default=multiprocessing.cpu_count(),
    )
    args = parser.parse_args()

    logger.info("reading hgssh JSON document")
    with open(args.input, "r") as f:
        hgssh_data = json.loads(f.read())
        logger.info("JSON document read")

    # Convert the JSON keys to integers
    hgssh_data["offsets"] = {int(k): v for k, v in hgssh_data["offsets"].items()}

    config = Config(filename=args.config)

    consumer_config = {
        # set this so offsets are committed to Zookeeper
        "api_version": (0, 8, 1),
        "bootstrap_servers": [
            host.strip() for host in config.c.get("consumer", "hosts").split(",")
        ],
        "client_id": config.c.get("consumer", "client_id"),
        "enable_auto_commit": False,
        "group_id": config.c.get("consumer", "group"),
        "max_partition_fetch_bytes": MAX_BUFFER_SIZE,
        "value_deserializer": value_deserializer,
    }

    topic = config.c.get("consumer", "topic")

    topicpartitions = [
        TopicPartition(topic, partition)
        for partition, (start_offset, end_offset) in sorted(
            hgssh_data["offsets"].items()
        )
        # there is no need to do an assignment if the length of the
        # bootstrap message range is 0
        if start_offset != end_offset
    ]

    consumer = KafkaConsumer(**consumer_config)

    # This call populates topic metadata for all topics in the cluster.
    consumer.topics()

    outputdata = collections.defaultdict(list)

    # We will remove repos from this set as we replicate them
    # Once this is an empty set we are done
    repositories_to_clone = set()
    for repo in hgssh_data["repositories"]:
        filterresult = config.filter(repo)

        if filterresult.passes_filter:
            repositories_to_clone.add(repo)
        else:
            outputdata[repo].append("filtered by rule %s" % filterresult.rule)

    extra_messages = collections.defaultdict(
        collections.deque
    )  # maps repo names to extra processing messages
    clone_futures_repo_mapping = {}  # maps cloning futures to repo names
    extra_messages_futures_repo_mapping = (
        {}
    )  # maps extra messages futures to repo names

    # Overwrite default hglib path so handle_message_main and it's derivatives
    # use the correct virtualenv
    hglib.HGPATH = config.c.get("programs", "hg")

    # Maps partitions to the list of messages within the bootstrap range
    aggregate_messages_by_topicpartition = {tp.partition: [] for tp in topicpartitions}

    # Gather all the Kafka messages within the bootstrap range for each partition
    for topicpartition in topicpartitions:
        start_offset, end_offset = hgssh_data["offsets"][topicpartition.partition]

        end_offset -= 1

        # Assign the consumer to the next partition and move to the start offset
        logger.info("assigning the consumer to partition %s" % topicpartition.partition)
        consumer.assign([topicpartition])

        logger.info("seeking the consumer to offset %s" % start_offset)
        consumer.seek(topicpartition, start_offset)
        consumer.commit(offsets={topicpartition: OffsetAndMetadata(start_offset, "")})

        logger.info(
            "partition %s of topic %s moved to offset %s"
            % (topicpartition.partition, topicpartition.topic, start_offset)
        )

        # Get all the messages we need to process from kafka
        for message in consumer:
            # Check if the message we are processing is within the range of accepted messages
            # If we are in the range, add this message to the list of messages on this partition
            # If we are at the end of the range, break from the loop and move on to the next partition
            if message.offset <= end_offset:
                aggregate_messages_by_topicpartition[message.partition].append(message)
                logger.info(
                    "message on partition %s, offset %s has been collected"
                    % (message.partition, message.offset)
                )

            consumer.commit(
                offsets={
                    TopicPartition(topic, message.partition): OffsetAndMetadata(
                        message.offset + 1, ""
                    ),
                }
            )

            if message.offset >= end_offset:
                logger.info(
                    "finished retrieving messages on partition %s" % message.partition
                )
                break

    logger.info("finished retrieving messages from Kafka")

    # Process the previously collected messages
    with futures.ThreadPoolExecutor(args.workers) as e:
        for partition, messages in sorted(aggregate_messages_by_topicpartition.items()):
            logger.info("processing messages for partition %s" % partition)
            for message in messages:
                payload = message.value

                # Ignore heartbeat messages
                if payload["name"] == "heartbeat-1":
                    continue

                if payload["path"] in repositories_to_clone:
                    # If we have not yet replicated the repository for this message,
                    # of the repo sync message is not tagged with the bootstrap flag,
                    # move on to the next message. The assumed upcoming hg-repo-sync-2
                    # message will clone the data represented in this message anyways.
                    if payload["name"] != "hg-repo-sync-2" or not payload["bootstrap"]:
                        continue

                    logger.info("scheduled clone for %s" % payload["path"])

                    # Schedule the repo sync
                    clone_future = e.submit(
                        clone_repo,
                        config,
                        payload["path"],
                        payload["requirements"],
                        payload["hgrc"],
                        payload["heads"],
                    )

                    # Here we register the future against its repo name
                    clone_futures_repo_mapping[clone_future] = payload["path"]

                    # Remove the repo from the set of repos
                    # which have not been scheduled to sync
                    repositories_to_clone.remove(payload["path"])
                elif payload["path"] not in outputdata:
                    # If the repo is not in the list of repositories to clone,
                    # and the repo is not in the outputdata object (ie hasn't
                    # errored out, by being filtered or otherwise),
                    # then we have already scheduled the repo sync and we will
                    # need to process this message once the sync completes.
                    extra_messages[payload["path"]].append((config, payload))
                    logger.info(
                        "extra messages found for %s: %s total"
                        % (payload["path"], len(extra_messages[payload["path"]]))
                    )

        if repositories_to_clone:
            logger.error(
                "did not receive expected sync messages for %s" % repositories_to_clone
            )

            # Add errors to audit output
            for repo in repositories_to_clone:
                outputdata[repo].append("did not receive sync message")

        # Process clones
        remaining_clones = len(clone_futures_repo_mapping)
        for completed_future in futures.as_completed(clone_futures_repo_mapping):
            repo = clone_futures_repo_mapping[completed_future]

            exc = completed_future.exception()
            if exc:
                message = "error triggering replication of Mercurial repo %s: %s" % (
                    repo,
                    str(exc),
                )
                logger.error(message)

                # Add error to audit output
                outputdata[repo].append(message)
            else:
                logger.info("%s successfully cloned" % repo)

            remaining_clones -= 1

            logger.info("%s repositories remaining" % remaining_clones)

            # Schedule extra message processing if necessary
            if repo in extra_messages:
                logger.info("scheduling extra processing for %s" % repo)
                future = e.submit(seqmap, handle_message_main, extra_messages[repo])
                extra_messages_futures_repo_mapping[future] = repo

        # Process extra messages
        total_message_batches = len(extra_messages_futures_repo_mapping)
        for completed_future in futures.as_completed(
            extra_messages_futures_repo_mapping
        ):
            repo = extra_messages_futures_repo_mapping[completed_future]

            exc = completed_future.exception()
            if exc:
                message = "error processing extra messages for %s: %s" % (
                    repo,
                    str(exc),
                )
                logger.error(message)

                # Add error to audit output
                outputdata[repo].append(message)
            else:
                logger.info("extra processing for %s completed successfully" % repo)

            total_message_batches -= 1
            logger.info("%s batches remaining" % total_message_batches)

    logger.info("%s bootstrap process complete" % config.c.get("consumer", "group"))

    # If anything broke, dump the errors and set exit code 1
    if outputdata:
        with open("/repo/hg/hgweb_bootstrap_out.json", "w") as f:
            f.write(json.dumps(outputdata))
        return 1