private void emitStatus()

in src/com/amazon/kinesis/streaming/agent/tailing/FileTailer.java [395:431]


    private void emitStatus() {
        try {
            Map<String, Object> metrics = getMetrics();
            if (flow.logEmitInternalMetrics()) {
                try {
                    ObjectMapper mapper = new ObjectMapper();
                    LOGGER.info("{}: File Tailer Status: {}", serviceName(), mapper.writeValueAsString(metrics));
                } catch (JsonProcessingException e) {
                    LOGGER.error("{}: Failed when emitting file tailer status metrics.", serviceName(), e);
                }
            }
            AtomicLong zero = new AtomicLong(0);
            long bytesBehind = Metrics.getMetric(metrics, Metrics.FILE_TAILER_BYTES_BEHIND_METRIC, 0L);
            int filesBehind = Metrics.getMetric(metrics, Metrics.FILE_TAILER_FILES_BEHIND_METRIC, 0);
            long bytesConsumed = Metrics.getMetric(metrics, Metrics.PARSER_TOTAL_BYTES_CONSUMED_METRIC, zero).get();
            long recordsParsed = Metrics.getMetric(metrics, Metrics.PARSER_TOTAL_RECORDS_PARSED_METRIC, zero).get();
            long recordsProcessed = Metrics.getMetric(metrics, Metrics.PARSER_TOTAL_RECORDS_PROCESSED_METRIC, zero).get();
            long recordsSkipped = Metrics.getMetric(metrics, Metrics.PARSER_TOTAL_RECORDS_SKIPPED_METRIC, zero).get();
            long recordsSent = Metrics.getMetric(metrics, Metrics.SENDER_TOTAL_RECORDS_SENT_METRIC, zero).get();
            LOGGER.info("{}: Tailer Progress: Tailer has parsed {} records ({} bytes), transformed {} records, skipped {} records, and has successfully sent {} records to destination.",
                    serviceName(), recordsParsed, bytesConsumed, recordsProcessed, recordsSkipped, recordsSent);
            String msg = String.format("%s: Tailer is %02f MB (%d bytes) behind.", serviceName(),
                    bytesBehind / 1024 / 1024.0, bytesBehind);
            if (filesBehind > 0) {
                msg += String.format(" There are %d file(s) newer than current file(s) being tailed.", filesBehind);
            }
            if (bytesBehind >= Metrics.BYTES_BEHIND_WARN_LEVEL) {
                LOGGER.warn(msg);
            } else if (bytesBehind >= Metrics.BYTES_BEHIND_INFO_LEVEL || agentContext.logEmitInternalMetrics()) {
                LOGGER.info(msg);
            } else if (bytesBehind > 0) {
                LOGGER.debug(msg);
            }
        } catch (Exception e) {
            LOGGER.error("{}: Failed while emitting tailer status.", serviceName(), e);
        }
    }