def run_cli()

in pylib/vcsreplicator/vcsreplicator/consumer.py [0:0]


def run_cli(message_handler):
    """Command line interface to consumer.

    ``message_handler`` is the message processing callable to be used when
    messages are acted upon.
    """
    import argparse
    import yaml

    # Unbuffer stdout.
    sys.stdout = os.fdopen(sys.stdout.fileno(), "w", 1)

    parser = argparse.ArgumentParser()
    parser.add_argument("config", help="Path to config file to load")
    parser.add_argument(
        "--dump", action="store_true", help="Dump available messages and exit"
    )
    parser.add_argument(
        "--onetime", action="store_true", help="Process a single message and exit"
    )
    parser.add_argument(
        "--start-from", type=int, help="Start N records from the beginning"
    )
    parser.add_argument(
        "--partition",
        type=int,
        help="Partition to fetch from. Defaults to all partitions.",
    )
    parser.add_argument(
        "--skip",
        action="store_true",
        help="Skip the consuming of the next message then exit",
    )
    parser.add_argument(
        "--wait-for-no-lag",
        action="store_true",
        help="Wait for consumer lag to be 0 messages and exit",
    )
    parser.add_argument(
        "--wait-for-n",
        type=int,
        help="Wait for N messages to become available then exit",
    )

    args = parser.parse_args()

    config = Config(filename=args.config)

    # hglib will use 'hg' which relies on PATH being correct. Since we're
    # running from a virtualenv, PATH may not be set unless the virtualenv
    # is activated. Overwrite the hglib defaults with a value from the config.
    hglib.HGPATH = config.hg_path

    client = config.get_client_from_section("consumer", timeout=30)
    topic = config.get("consumer", "topic")
    group = config.get("consumer", "group")
    poll_timeout = config.c.getfloat("consumer", "poll_timeout")
    wait_for_topic(client, topic, 30)

    if args.wait_for_no_lag:
        while True:
            d = consumer_offsets_and_lag(client, topic, [group])
            partitions = d[group]
            lagging = False
            for partition, (offset, available, lag_time) in partitions.items():
                lag = available - offset
                if lag > 0:
                    lagging = True

            if lagging:
                time.sleep(0.1)
            else:
                sys.exit(0)

    partitions = None
    if args.partition is not None:
        partitions = [args.partition]

    consumer = Consumer(client, group, topic, partitions)

    if args.start_from:
        consumer.seek(args.start_from, 0)

    if args.wait_for_n:
        left = args.wait_for_n
        while left > 0:
            m = consumer.get_message()
            if not m:
                continue

            print("got a %s message" % payload_log_display(m[2]))

            left -= 1

        sys.exit(0)

    if args.dump:
        messages = []
        while True:
            m = consumer.get_message()
            if not m:
                break
            messages.append(m[2])

        print(yaml.safe_dump(messages, default_flow_style=False).rstrip())
        sys.exit(0)

    if args.skip:
        r = consumer.get_message()
        if not r:
            print("no message available; nothing to skip")
            sys.exit(1)

        partition, message, payload = r
        consumer.commit(partitions=[partition])
        print("skipped message in partition %d for group %s" % (partition, group))
        sys.exit(0)

    root = logging.getLogger()
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter("%(name)s %(message)s")
    formatter.converter = time.gmtime
    handler.setFormatter(formatter)
    root.addHandler(handler)

    if not args.onetime:
        logger.warn(
            "starting consumer for topic=%s group=%s partitions=%s"
            % (topic, group, partitions or "all")
        )
    try:
        consume(
            config,
            consumer,
            message_handler,
            onetime=args.onetime,
            timeout=poll_timeout,
        )
        if not args.onetime:
            logger.warn("process exiting gracefully")
    except BaseException:
        logger.error("exiting main consume loop with error")
        raise