in src/com/amazon/kinesis/streaming/agent/tailing/SourceFileTracker.java [316:367]
boolean updateCurrentFile(TrackedFileList newSnapshot) throws IOException {
Preconditions.checkNotNull(currentSnapshot);
Preconditions.checkNotNull(currentOpenFile);
TrackedFileList oldSnapshot = currentSnapshot;
TrackedFile oldCurrentOpenFile = currentOpenFile;
// Initialize the analysis and log any anomalies
TrackedFileRotationAnalyzer analyzer = new TrackedFileRotationAnalyzer(currentSnapshot, newSnapshot, currentOpenFile);
if(!analyzer.getIncomingAnomalies().isEmpty()) {
for(Map.Entry<TrackedFile, String> anomaly : analyzer.getIncomingAnomalies().entrySet()) {
onUnexpectedChangeDetected(newSnapshot, anomaly.getKey(), anomaly.getValue());
}
}
// Flag to tell if we managed to track the current open file
boolean currentFileTracked = false;
if(!analyzer.checkNoRotation()) {
onPossibleRotationDetected(newSnapshot);
if (!analyzer.allFilesHaveDisappeared()) {
if(analyzer.hasCounterpart(currentOpenFile)) {
if(analyzer.currentOpenFileWasTruncated()) {
// If current file was truncated, we need to do something.
int newIndex = analyzer.findCurrentOpenFileAfterTruncate();
if(newIndex >= 0) {
currentFileTracked = onRotationByTruncate(newSnapshot, newIndex);
} else {
currentFileTracked = onTruncationWithoutRotation(newSnapshot);
}
} else {
// Otherwise, all is well. We resume tailing same file.
currentFileTracked = continueTailingCurrentFile(newSnapshot, analyzer.getCounterpartIndex(currentOpenFile));
}
} else {
// Current open file disappeared somehow.
// None of our supported rotations methods could explain this.
currentFileTracked = onCurrentFileNotFound(newSnapshot);
}
} else {
currentFileTracked = onAllFilesDisappeared(newSnapshot);
}
} else {
currentFileTracked = onNoRotationDetected(newSnapshot, analyzer.getCounterpartIndex(currentOpenFile));
}
// SANITYCHECK: After we run, the current snapshot and current file must have changed.
if(currentSnapshot == oldSnapshot) // identity intended
throw new IllegalStateException("The current tracked file snapshot was not updated as expected.");
if(currentOpenFile == oldCurrentOpenFile) // identity intended
throw new IllegalStateException("The current open file was not updated as expected.");
return currentFileTracked;
}