List evict()

in ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java [44:128]


  List<LogSegment> evict(long[] followerNextIndices, long safeEvictIndex,
      long lastAppliedIndex, LogSegmentList segments, int maxCachedSegments);

  class CacheInvalidationPolicyDefault implements CacheInvalidationPolicy {
    @Override
    public List<LogSegment> evict(long[] followerNextIndices,
        long safeEvictIndex, long lastAppliedIndex,
        LogSegmentList segments, final int maxCachedSegments) {
      try(AutoCloseableLock readLock = segments.readLock()) {
        return evictImpl(followerNextIndices, safeEvictIndex, lastAppliedIndex, segments);
      }
    }

    private List<LogSegment> evictImpl(long[] followerNextIndices,
        long safeEvictIndex, long lastAppliedIndex,
        LogSegmentList segments) {
      List<LogSegment> result = new ArrayList<>();
      int safeIndex = segments.size() - 1;
      for (; safeIndex >= 0; safeIndex--) {
        LogSegment segment = segments.get(safeIndex);
        // a segment's cache can be invalidated only if it's close and all its
        // entries have been flushed to the local disk and the local disk
        // segment is also closed.
        if (!segment.isOpen() && segment.getEndIndex() <= safeEvictIndex) {
          break;
        }
      }
      if (followerNextIndices == null || followerNextIndices.length == 0) {
        // no followers, determine the eviction based on lastAppliedIndex
        // first scan from the oldest segment to the one that is right before
        // lastAppliedIndex. All these segment's cache can be invalidated.
        int j = 0;
        for (; j <= safeIndex; j++) {
          LogSegment segment = segments.get(j);
          if (segment.getEndIndex() > lastAppliedIndex) {
            break;
          }
          if (segment.hasCache()) {
            result.add(segment);
          }
        }
        // if there is no cache invalidation target found, pick a segment that
        // later (but not now) the state machine will consume
        if (result.isEmpty()) {
          for (int i = safeIndex; i >= j; i--) {
            LogSegment s = segments.get(i);
            if (s.getStartIndex() > lastAppliedIndex && s.hasCache()) {
              result.add(s);
              break;
            }
          }
        }
      } else {
        // this peer is the leader with followers. determine the eviction based
        // on followers' next indices and the local lastAppliedIndex.
        Arrays.sort(followerNextIndices);
        // segments covering index minToRead will still be loaded. Thus we first
        // try to evict cache for segments before minToRead.
        final long minToRead = Math.min(followerNextIndices[0], lastAppliedIndex);
        int j = 0;
        for (; j <= safeIndex; j++) {
          LogSegment s = segments.get(j);
          if (s.getEndIndex() >= minToRead) {
            break;
          }
          if (s.hasCache()) {
            result.add(s);
          }
        }
        // if there is no eviction target, continue the scanning and evict
        // the one that is not being read currently.
        if (result.isEmpty()) {
          for (; j <= safeIndex; j++) {
            LogSegment s = segments.get(j);
            if (Arrays.stream(followerNextIndices).noneMatch(s::containsIndex)
                && !s.containsIndex(lastAppliedIndex) && s.hasCache()) {
              result.add(s);
              break;
            }
          }
        }
      }
      return result;
    }
  }