in src/com/amazon/kinesis/streaming/agent/tailing/TrackedFileRotationAnalyzer.java [82:133]
public boolean checkNoRotation() {
for(TrackedFile incomingFile : incoming) {
// All files in incoming snapshot have counterparts
if(!hasCounterpart(incomingFile))
return false;
TrackedFile currentCounterpart = getCounterpart(incomingFile);
// All counterparts of incoming files have same path and FileId
if(!incomingFile.isSameAs(currentCounterpart) ||
!incomingFile.getPath().equals(currentCounterpart.getPath()))
return false;
if(currentCounterpart == currentOpenFile) { // identity intended
// currentOpenFile: Incoming lastModifiedTime >= current lastModifiedFile
// currentOpenFile: Incoming size >= current size (currentOpenFile)
if(incomingFile.getSize() < currentCounterpart.getSize() ||
incomingFile.getLastModifiedTime() < currentCounterpart.getLastModifiedTime())
return false;
} else {
// All incoming files: Incoming lastModifiedTime == current counterpart lastModifiedFile
// All incoming files: Incoming size == current counterpart size
if(incomingFile.getSize() != currentCounterpart.getSize() ||
incomingFile.getLastModifiedTime() != currentCounterpart.getLastModifiedTime())
return false;
}
}
// Only tail files in current snapshot are allowed not to have counterparts: older files deleted by rotation
if(incoming.size() > current.size())
return false;
else if(incoming.size() < current.size()) {
// verify that only tail files are missing from current
int firstMissingFileIndex = incoming.size();
// all files before this index have counterparts
for(int i = 0; i < firstMissingFileIndex; ++i)
if(!hasCounterpart(current.get(i)))
return false;
// all files at or after this index don't
for(int i = firstMissingFileIndex; i < current.size(); ++i)
if(hasCounterpart(current.get(i)))
return false;
}
// currentOpenFile has an incoming counterpart
if(currentOpenFile != null && !hasCounterpart(currentOpenFile))
return false;
// No anomalies reported when synching current/incoming snapshots (implied by all above)
if(!getIncomingAnomalies().isEmpty())
return false;
// everythoing ok? hypothesis must be true
return true;
}