in src/com/amazon/kinesis/streaming/agent/tailing/FileTailer.java [338:379]
protected synchronized boolean updateRecordParser(boolean forceRefresh) throws IOException {
if(isRunning()) {
boolean resetParsing = false;
boolean refreshed = false;
long elapsedSinceLastRefresh = System.currentTimeMillis() - fileTracker.getLastRefreshTimestamp();
// Refresh sparingly to save CPU cycles and unecessary IO...
if(forceRefresh
|| elapsedSinceLastRefresh >= maxTimeBetweenFileTrackerRefreshMillis
|| fileTracker.mustRefresh()) {
LOGGER.trace("{} is refreshing current tailed file.", serviceName());
if(fileTracker.getLastRefreshTimestamp() > 0)
LOGGER.trace("{}: Time since last refresh: {}", serviceName(), Duration.millis(elapsedSinceLastRefresh));
resetParsing = !fileTracker.refresh();
refreshed = true;
}
// Only update the parser if something changed
if(refreshed || (!parser.isParsing() && fileTracker.getCurrentOpenFile() != null)) {
TrackedFile currentFile = parser.getCurrentFile();
TrackedFile newFile = fileTracker.getCurrentOpenFile();
if (LOGGER.isDebugEnabled()) {
if (currentFile != null) {
if (newFile.getId().equals(currentFile.getId()) && newFile.getPath().equals(currentFile.getPath())) {
LOGGER.trace("{}: Continuing to tail current file {}.", serviceName(), currentFile.getPath());
} else {
LOGGER.debug("{}: Switching files. Rotation might have happened.\nOld file: {}\nNew file: {}",
serviceName(), currentFile, newFile);
}
} else if (newFile != null)
LOGGER.debug("{}: Found file to tail: {}", serviceName(), newFile);
}
if (resetParsing) {
if(currentFile != null) {
LOGGER.warn("{}: Parsing was reset when switching to new file {} (from {})!", serviceName(), newFile, currentFile);
}
parser.switchParsingToFile(newFile);
} else {
parser.continueParsingWithFile(newFile);
}
}
}
return parser.isParsing();
}