def check_aggregator_lag()

in pylib/vcsreplicator/vcsreplicator/nagios.py [0:0]


def check_aggregator_lag():
    """Check the lag of an aggregator daemon."""
    parser = argparse.ArgumentParser()
    parser.add_argument("config", help="Path to config file to load")
    parser.add_argument(
        "--warning-count",
        default=40,
        type=int,
        help="Total number of messages behind after which a warning will be issued",
    )
    parser.add_argument(
        "--critical-count",
        default=100,
        type=int,
        help="Total number of messages behind after which a "
        "critical will be reported",
    )
    parser.add_argument(
        "--telegraf",
        action="store_true",
        help="Output data for consumption by Telegraf",
    )

    args = parser.parse_args()

    config = Config(filename=args.config)
    client = config.get_client_from_section("aggregator", timeout=5)
    monitor_topic = config.get("aggregator", "monitor_topic")
    groups_path = config.get("aggregator", "monitor_groups_file")
    ack_group = config.get("aggregator", "ack_group")

    try:
        groups = read_consumer_groups(groups_path)
        consumed, acked, counts = get_aggregation_counts(
            client, monitor_topic, groups, ack_group
        )
    except Exception as e:
        print("WARNING - exception fetching data: %s" % e)
        print("")
        raise

    if args.telegraf:
        telegraf_data = create_aggregator_telegraf_json(consumed, acked)
        print(json.dumps(telegraf_data, sort_keys=True))
        sys.exit(0)

    message_count = sum(counts.values())

    output = []

    if not message_count:
        exit_code = 0
        output.append("OK - aggregator has copied all fully replicated messages")
    else:
        if message_count >= args.critical_count:
            exit_code = 2
            label = "CRITICAL"
        elif message_count >= args.warning_count:
            exit_code = 1
            label = "WARNING"
        else:
            exit_code = 0
            label = "OK"

        output.append(
            "%s - %d messages from %d partitions behind"
            % (label, message_count, len(counts))
        )

    output.append("")
    for partition in sorted(consumed.keys()):
        consume_offset = consumed[partition]
        acked_offset = acked[partition]
        unacked = consume_offset - acked_offset
        if unacked <= 0:
            label = "OK"
        elif unacked >= args.critical_count:
            label = "CRITICAL"
        elif unacked >= args.warning_count:
            label = "WARNING"

        if unacked <= 0:
            output.append(
                "%s - partition %d is completely in sync (%d/%d)"
                % (label, partition, acked_offset, consume_offset)
            )
        else:
            output.append(
                "%s - partition %d is %d messages behind (%d/%d)"
                % (label, partition, unacked, acked_offset, consume_offset)
            )

    for l in output:
        print(l)

    print("")
    print(
        "See https://mozilla-version-control-tools.readthedocs.io/en/latest/hgmo/ops.html"
    )
    print("for details about this check.")

    sys.exit(exit_code)