in src/com/amazon/kinesis/streaming/agent/tailing/TrackedFileRotationAnalyzer.java [161:213]
public int findCurrentOpenFileAfterTruncate() {
Preconditions.checkNotNull(currentOpenFile);
Preconditions.checkState(incoming.size() > 1); // always true after rotation by Truncate
Preconditions.checkState(hasCounterpart(incoming.get(0))); // always true for rotation by Truncate
// In case of rotation by Truncate, the current open file
// MUST have a counterpart: either the latest file, or a previously
// rotated file. If not, this is an anomaly (could happen if
// previously rotated file have been deleted).
if(hasCounterpart(currentOpenFile)) {
int counterpartIndex = getCounterpartIndex(currentOpenFile);
if(counterpartIndex == -1) {
// Should never happen really if we are this far!!
// Defensively, return -1.
return -1;
} else if(counterpartIndex == 0) {
// We were still at the top file; find the oldest un-matched file.
int lastUntracked = 1;
while(lastUntracked < incoming.size()) {
if(hasCounterpart(incoming.get(lastUntracked))) {
break;
}
++lastUntracked;
}
lastUntracked -= 1;
if(lastUntracked == incoming.size()) {
// No unmatched files were found
return -1;
} else {
// Now, depending on the situation, the current file could be any
// of the unmatched files, not necessarily the oldest. Starting
// with the assumption that it's the oldest, double check...
// TODO: Here would be the right place to compare file hashes
// to double check that's the right file being tracked.
// See #couldHaveBeenCurrentOpenFile().
for(; lastUntracked > 0 && !hasCounterpart(incoming.get(lastUntracked)); --lastUntracked) {
if(couldHaveBeenCurrentOpenFile(incoming.get(lastUntracked)))
break;
}
if(lastUntracked == 0)
// Couldn't find any good candidates
return -1;
else
return lastUntracked;
}
} else {
// The counterpart must be a previously rotated file. We should
// just keep tracking it.
return counterpartIndex;
}
}
return -1;
}