in src/com/amazon/kinesis/streaming/agent/tailing/AbstractParser.java [156:189]
private boolean setCurrentFile(TrackedFile file, boolean resetParsing) {
try {
Preconditions.checkArgument(file == null || file.isOpen());
if (resetParsing || file == null)
stopParsing("Parsing is reset by caller.");
if (file != null) {
long newOffset = file.getChannel().position();
if (currentFile == null || resetParsing) {
logger.info("{}: Opening {} for parsing.", name, file.getPath());
currentBufferFileEnded = true;
} else {
// if the offset changed, the file must have rotated and we can't continue with the current buffer
currentBufferFileEnded = currentBufferFileEnded || (currentFileChannelOffset != newOffset);
if (!sameAsCurrentFile(file)) {
logger.info("{}: Continuing to parse {}.", name, file.getPath());
}
logger.debug("{}: Old offset: {}, new offset: {}", name, currentFileChannelOffset, newOffset);
}
currentFile = file;
currentFileChannel = currentFile.getChannel();
currentFileChannelOffset = currentFileChannel.position();
headerLinesToSkip = currentFileChannelOffset == 0 ? flow.getSkipHeaderLines() : 0;
fileFooterPattern = flow.getFileFooterPattern();
return true;
} else {
return false;
}
} catch(IOException e) {
logger.error("{}: Failed when setting current file to {} (reset={}).", name, file, resetParsing, e);
stopParsing("Unhandled error.");
totalUndhandledErrors.incrementAndGet();
return false;
}
}