def check_consumer_lag()

in pylib/vcsreplicator/vcsreplicator/nagios.py [0:0]


def check_consumer_lag():
    parser = argparse.ArgumentParser()
    parser.add_argument("config", help="Path to config file to load")
    parser.add_argument(
        "--consumer-section",
        default="consumer",
        help="Config section Kafka config should be read from",
    )
    parser.add_argument(
        "--warning-lag-count",
        default=150,
        type=int,
        help="Number of messages behind after which a warning will be issued",
    )
    parser.add_argument(
        "--warning-lag-time",
        default=150.0,
        type=float,
        help="Time behind after which a warning will be issued",
    )
    parser.add_argument(
        "--critical-lag-count",
        default=600,
        type=int,
        help="Number of messages behind after which an error will be issued",
    )
    parser.add_argument(
        "--critical-lag-time",
        default=600.0,
        type=float,
        help="Time behind after which an error will be issued",
    )
    parser.add_argument(
        "--telegraf",
        action="store_true",
        help="Output data for consumption by Telegraf",
    )

    args = parser.parse_args()

    consumer_section = args.consumer_section

    config = Config(filename=args.config)
    client = config.get_client_from_section(consumer_section, timeout=5)
    topic = config.get(consumer_section, "topic")
    group = config.get(consumer_section, "group")

    try:
        offsets = consumer_offsets_and_lag(client, topic, [group])[group]
    except Exception as e:
        print("WARNING - exception fetching offsets: %s" % e)
        print("")
        raise

    if args.telegraf:
        telegraf_data = create_consumer_telegraf_json(offsets)
        print(json.dumps(telegraf_data, sort_keys=True))
        sys.exit(0)

    exitcode = 0
    good = 0
    bad = 0
    output = []
    drift_warned = False

    for partition, (offset, available, lag_time) in sorted(offsets.items()):
        # Consumer is fully caught up.
        if offset >= available:
            good += 1
            output.append(
                "OK - partition %d is completely in sync (%d/%d)"
                % (partition, offset, available)
            )
            continue

        bad += 1
        lag = available - offset
        if lag >= args.critical_lag_count:
            exitcode = 2
            label = "CRITICAL"
        elif lag >= args.warning_lag_count:
            exitcode = max(exitcode, 1)
            label = "WARNING"
        else:
            label = "OK"

        output.append(
            "%s - partition %d is %d messages behind (%d/%d)"
            % (label, partition, lag, offset, available)
        )

        if lag_time is None:
            output.append(
                "WARNING - could not determine lag time for partition %d" % partition
            )
            # TODO raise warning for inability to determine lag time if persistent.
            # exitcode = max(exitcode, 1)
        else:
            if lag_time >= args.critical_lag_time:
                exitcode = 2
                label = "CRITICAL"
            elif lag_time >= args.warning_lag_time:
                exitcode = max(exitcode, 1)
                label = "WARNING"
            else:
                label = "OK"

            output.append(
                "%s - partition %d is %0.3f seconds behind"
                % (label, partition, lag_time)
            )

            # Clock drift between producer and consumer.
            if lag_time < 0.0 and not drift_warned:
                exitcode = max(exitcode, 1)
                output.append(
                    "WARNING - clock drift of %.3f seconds between "
                    "producer and consumer; check NTP sync" % lag_time
                )
                drift_warned = True

    if exitcode == 2:
        print("CRITICAL - %d/%d partitions out of sync" % (bad, len(offsets)))
    elif exitcode:
        print("WARNING - %d/%d partitions out of sync" % (bad, len(offsets)))
    elif good == len(offsets):
        print("OK - %d/%d consumers completely in sync" % (good, len(offsets)))
    else:
        drifted = len(offsets) - good
        print(
            "OK - %d/%d consumers out of sync but within tolerances"
            % (drifted, len(offsets))
        )

    print("")
    for m in output:
        print(m)

    print("")
    print(
        "See https://mozilla-version-control-tools.readthedocs.io/en/latest/hgmo/ops.html"
    )
    print("for details about this check.")

    sys.exit(exitcode)