in ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java [44:128]
List<LogSegment> evict(long[] followerNextIndices, long safeEvictIndex,
long lastAppliedIndex, LogSegmentList segments, int maxCachedSegments);
class CacheInvalidationPolicyDefault implements CacheInvalidationPolicy {
@Override
public List<LogSegment> evict(long[] followerNextIndices,
long safeEvictIndex, long lastAppliedIndex,
LogSegmentList segments, final int maxCachedSegments) {
try(AutoCloseableLock readLock = segments.readLock()) {
return evictImpl(followerNextIndices, safeEvictIndex, lastAppliedIndex, segments);
}
}
private List<LogSegment> evictImpl(long[] followerNextIndices,
long safeEvictIndex, long lastAppliedIndex,
LogSegmentList segments) {
List<LogSegment> result = new ArrayList<>();
int safeIndex = segments.size() - 1;
for (; safeIndex >= 0; safeIndex--) {
LogSegment segment = segments.get(safeIndex);
// a segment's cache can be invalidated only if it's close and all its
// entries have been flushed to the local disk and the local disk
// segment is also closed.
if (!segment.isOpen() && segment.getEndIndex() <= safeEvictIndex) {
break;
}
}
if (followerNextIndices == null || followerNextIndices.length == 0) {
// no followers, determine the eviction based on lastAppliedIndex
// first scan from the oldest segment to the one that is right before
// lastAppliedIndex. All these segment's cache can be invalidated.
int j = 0;
for (; j <= safeIndex; j++) {
LogSegment segment = segments.get(j);
if (segment.getEndIndex() > lastAppliedIndex) {
break;
}
if (segment.hasCache()) {
result.add(segment);
}
}
// if there is no cache invalidation target found, pick a segment that
// later (but not now) the state machine will consume
if (result.isEmpty()) {
for (int i = safeIndex; i >= j; i--) {
LogSegment s = segments.get(i);
if (s.getStartIndex() > lastAppliedIndex && s.hasCache()) {
result.add(s);
break;
}
}
}
} else {
// this peer is the leader with followers. determine the eviction based
// on followers' next indices and the local lastAppliedIndex.
Arrays.sort(followerNextIndices);
// segments covering index minToRead will still be loaded. Thus we first
// try to evict cache for segments before minToRead.
final long minToRead = Math.min(followerNextIndices[0], lastAppliedIndex);
int j = 0;
for (; j <= safeIndex; j++) {
LogSegment s = segments.get(j);
if (s.getEndIndex() >= minToRead) {
break;
}
if (s.hasCache()) {
result.add(s);
}
}
// if there is no eviction target, continue the scanning and evict
// the one that is not being read currently.
if (result.isEmpty()) {
for (; j <= safeIndex; j++) {
LogSegment s = segments.get(j);
if (Arrays.stream(followerNextIndices).noneMatch(s::containsIndex)
&& !s.containsIndex(lastAppliedIndex) && s.hasCache()) {
result.add(s);
break;
}
}
}
}
return result;
}
}