in pylib/vcsreplicator/vcsreplicator/nagios.py [0:0]
def check_consumer_lag():
parser = argparse.ArgumentParser()
parser.add_argument("config", help="Path to config file to load")
parser.add_argument(
"--consumer-section",
default="consumer",
help="Config section Kafka config should be read from",
)
parser.add_argument(
"--warning-lag-count",
default=150,
type=int,
help="Number of messages behind after which a warning will be issued",
)
parser.add_argument(
"--warning-lag-time",
default=150.0,
type=float,
help="Time behind after which a warning will be issued",
)
parser.add_argument(
"--critical-lag-count",
default=600,
type=int,
help="Number of messages behind after which an error will be issued",
)
parser.add_argument(
"--critical-lag-time",
default=600.0,
type=float,
help="Time behind after which an error will be issued",
)
parser.add_argument(
"--telegraf",
action="store_true",
help="Output data for consumption by Telegraf",
)
args = parser.parse_args()
consumer_section = args.consumer_section
config = Config(filename=args.config)
client = config.get_client_from_section(consumer_section, timeout=5)
topic = config.get(consumer_section, "topic")
group = config.get(consumer_section, "group")
try:
offsets = consumer_offsets_and_lag(client, topic, [group])[group]
except Exception as e:
print("WARNING - exception fetching offsets: %s" % e)
print("")
raise
if args.telegraf:
telegraf_data = create_consumer_telegraf_json(offsets)
print(json.dumps(telegraf_data, sort_keys=True))
sys.exit(0)
exitcode = 0
good = 0
bad = 0
output = []
drift_warned = False
for partition, (offset, available, lag_time) in sorted(offsets.items()):
# Consumer is fully caught up.
if offset >= available:
good += 1
output.append(
"OK - partition %d is completely in sync (%d/%d)"
% (partition, offset, available)
)
continue
bad += 1
lag = available - offset
if lag >= args.critical_lag_count:
exitcode = 2
label = "CRITICAL"
elif lag >= args.warning_lag_count:
exitcode = max(exitcode, 1)
label = "WARNING"
else:
label = "OK"
output.append(
"%s - partition %d is %d messages behind (%d/%d)"
% (label, partition, lag, offset, available)
)
if lag_time is None:
output.append(
"WARNING - could not determine lag time for partition %d" % partition
)
# TODO raise warning for inability to determine lag time if persistent.
# exitcode = max(exitcode, 1)
else:
if lag_time >= args.critical_lag_time:
exitcode = 2
label = "CRITICAL"
elif lag_time >= args.warning_lag_time:
exitcode = max(exitcode, 1)
label = "WARNING"
else:
label = "OK"
output.append(
"%s - partition %d is %0.3f seconds behind"
% (label, partition, lag_time)
)
# Clock drift between producer and consumer.
if lag_time < 0.0 and not drift_warned:
exitcode = max(exitcode, 1)
output.append(
"WARNING - clock drift of %.3f seconds between "
"producer and consumer; check NTP sync" % lag_time
)
drift_warned = True
if exitcode == 2:
print("CRITICAL - %d/%d partitions out of sync" % (bad, len(offsets)))
elif exitcode:
print("WARNING - %d/%d partitions out of sync" % (bad, len(offsets)))
elif good == len(offsets):
print("OK - %d/%d consumers completely in sync" % (good, len(offsets)))
else:
drifted = len(offsets) - good
print(
"OK - %d/%d consumers out of sync but within tolerances"
% (drifted, len(offsets))
)
print("")
for m in output:
print(m)
print("")
print(
"See https://mozilla-version-control-tools.readthedocs.io/en/latest/hgmo/ops.html"
)
print("for details about this check.")
sys.exit(exitcode)