in src/com/amazon/kinesis/streaming/agent/Agent.java [349:391]
private void emitStatus() {
try {
Map<String, Map<String, Object>> metrics = getMetrics();
if (agentContext.logEmitInternalMetrics()) {
if (metrics != null && !metrics.isEmpty()) {
try {
ObjectMapper mapper = new ObjectMapper();
logger.info("{}: Status: {}", serviceName(), mapper.writeValueAsString(metrics));
} catch (JsonProcessingException e) {
logger.error("{}: Failed when emitting status metrics.", serviceName(), e);
}
}
}
// NOTE: This log line is parsed by scripts in support/benchmarking.
// If it's modified the scripts need to be modified as well.
logger.info("{}: Progress: {} records parsed ({} bytes), and {} records sent successfully to destinations. Uptime: {}ms",
serviceName(),
metrics.get("Agent").get("TotalRecordsParsed"),
metrics.get("Agent").get("TotalBytesConsumed"),
metrics.get("Agent").get("TotalRecordsSent"),
uptime.elapsed(TimeUnit.MILLISECONDS));
// Log a message if we're far behind in tailing the input
long bytesBehind = (long) metrics.get("Agent").get("TotalBytesBehind");
int filesBehind = (int) metrics.get("Agent").get("TotalFilesBehind");
// NOTE: This log line is parsed by scripts in support/benchmarking.
// If it's modified the scripts need to be modified as well.
String msg = String.format("%s: Tailing is %02f MB (%d bytes) behind.", serviceName(),
bytesBehind / 1024 / 1024.0, bytesBehind);
if (filesBehind > 0) {
msg += String.format(" There are %d file(s) newer than current file(s) being tailed.", filesBehind);
}
if (bytesBehind >= Metrics.BYTES_BEHIND_WARN_LEVEL) {
logger.warn(msg);
} else if (bytesBehind >= Metrics.BYTES_BEHIND_INFO_LEVEL || agentContext.logEmitInternalMetrics()) {
logger.info(msg);
} else if (bytesBehind > 0) {
logger.debug(msg);
}
} catch (Exception e) {
logger.error("{}: Failed while emitting agent status.", serviceName(), e);
}
}