in oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/CommitMitigatingTieredMergePolicy.java [365:595]
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos) throws IOException {
int segmentSize = infos.size();
timeSeriesCount++;
if (timeSeriesCount % timeSeriesLength == 0) {
// reset averages
avgCommitRateDocs = 0d;
avgCommitRateMB = 0d;
avgSegs = 0d;
}
avgSegs = singleExpSmoothing(segmentSize, avgSegs);
log.debug("segments: current {}, average {}", segmentSize, avgSegs);
if (verbose()) {
message("findMerges: " + segmentSize + " segments, " + avgSegs + " average");
}
if (segmentSize == 0) {
return null;
}
// if no. of segments exceeds the maximum, adjust the maximum rates to allow more merges (less commit/rate mitigation)
if (segmentSize > maxNoOfSegsForMitigation) {
if (avgCommitRateDocs > maxCommitRateDocs) {
double v = singleExpSmoothing(avgCommitRateDocs, maxCommitRateDocs);
log.debug("adjusting maxCommitRateDocs from {} to {}", maxCommitRateDocs, v);
maxCommitRateDocs = v;
}
if (avgCommitRateMB > maxCommitRateMB) {
double v = singleExpSmoothing(avgCommitRateMB, maxCommitRateMB);
log.debug("adjusting maxCommitRateMB from {} to {}", maxCommitRateMB, v);
maxCommitRateMB = v;
}
}
long now = System.currentTimeMillis();
double timeDelta = (now / 1000d) - (time / 1000d);
double commitRate = Math.abs(docCount - infos.totalDocCount()) / timeDelta;
time = now;
avgCommitRateDocs = singleExpSmoothing(commitRate, avgCommitRateDocs);
log.debug("commit rate: current {}, average {}, max {} docs/sec", commitRate, avgCommitRateDocs, maxCommitRateDocs);
docCount = infos.totalDocCount();
if (verbose()) {
message(commitRate + "doc/s (max: " + maxCommitRateDocs + ", avg: " + avgCommitRateDocs + " doc/s)");
}
// do not mitigate if there're too many segments to avoid affecting performance
if (commitRate > maxCommitRateDocs && segmentSize < maxNoOfSegsForMitigation) {
log.debug("mitigation due to {} > {} docs/sec and segments {} < {})", commitRate, maxCommitRateDocs,
segmentSize, maxNoOfSegsForMitigation);
return null;
}
final Collection<SegmentCommitInfo> merging = writer.get().getMergingSegments();
final Collection<SegmentCommitInfo> toBeMerged = new HashSet<SegmentCommitInfo>();
final List<SegmentCommitInfo> infosSorted = new ArrayList<SegmentCommitInfo>(infos.asList());
Collections.sort(infosSorted, new SegmentByteSizeDescending());
// Compute total index bytes & print details about the index
long totIndexBytes = 0;
long minSegmentBytes = Long.MAX_VALUE;
for (SegmentCommitInfo info : infosSorted) {
final long segBytes = size(info);
if (verbose()) {
String extra = merging.contains(info) ? " [merging]" : "";
if (segBytes >= maxMergedSegmentBytes / 2.0) {
extra += " [skip: too large]";
} else if (segBytes < floorSegmentBytes) {
extra += " [floored]";
}
message(" seg=" + writer.get().segString(info) + " size=" + String.format(Locale.ROOT, "%.3f", segBytes / 1024 / 1024.) + " MB" + extra);
}
minSegmentBytes = Math.min(segBytes, minSegmentBytes);
// Accum total byte size
totIndexBytes += segBytes;
}
// If we have too-large segments, grace them out
// of the maxSegmentCount:
int tooBigCount = 0;
while (tooBigCount < infosSorted.size() && size(infosSorted.get(tooBigCount)) >= maxMergedSegmentBytes / 2.0) {
totIndexBytes -= size(infosSorted.get(tooBigCount));
tooBigCount++;
}
minSegmentBytes = floorSize(minSegmentBytes);
// Compute max allowed segs in the index
long levelSize = minSegmentBytes;
long bytesLeft = totIndexBytes;
double allowedSegCount = 0;
while (true) {
final double segCountLevel = bytesLeft / (double) levelSize;
if (segCountLevel < segsPerTier) {
allowedSegCount += Math.ceil(segCountLevel);
break;
}
allowedSegCount += segsPerTier;
bytesLeft -= segsPerTier * levelSize;
levelSize *= maxMergeAtOnce;
}
int allowedSegCountInt = (int) allowedSegCount;
MergeSpecification spec = null;
// Cycle to possibly select more than one merge:
while (true) {
long mergingBytes = 0;
double idxBytes = 0;
// Gather eligible segments for merging, ie segments
// not already being merged and not already picked (by
// prior iteration of this loop) for merging:
final List<SegmentCommitInfo> eligible = new ArrayList<SegmentCommitInfo>();
for (int idx = tooBigCount; idx < infosSorted.size(); idx++) {
final SegmentCommitInfo info = infosSorted.get(idx);
if (merging.contains(info)) {
mergingBytes += info.sizeInBytes();
} else if (!toBeMerged.contains(info)) {
eligible.add(info);
}
idxBytes += info.sizeInBytes();
}
idxBytes /= 1024d * 1024d;
final boolean maxMergeIsRunning = mergingBytes >= maxMergedSegmentBytes;
if (verbose()) {
message(" allowedSegmentCount=" + allowedSegCountInt + " vs count=" + infosSorted.size() + " (eligible count=" + eligible.size() + ") tooBigCount=" + tooBigCount);
}
if (eligible.size() == 0) {
return spec;
}
double bytes = idxBytes - this.mb;
double mbRate = bytes / timeDelta;
avgCommitRateMB = singleExpSmoothing(mbRate, avgCommitRateMB);
log.debug("commit rate: current {}, average {}, max {} MB/sec", mbRate, avgCommitRateMB, maxCommitRateMB);
if (verbose()) {
message(mbRate + "mb/s (max: " + maxCommitRateMB + ", avg: " + avgCommitRateMB + " MB/s)");
}
this.mb = idxBytes;
// do not mitigate if there're too many segments to avoid affecting performance
if (mbRate > maxCommitRateMB && segmentSize < maxNoOfSegsForMitigation) {
log.debug("mitigation due to {} > {} MB/sec and segments {} < {})", mbRate, maxCommitRateMB,
segmentSize, maxNoOfSegsForMitigation);
return null;
}
if (eligible.size() >= allowedSegCountInt) {
// OK we are over budget -- find best merge!
MergeScore bestScore = null;
List<SegmentCommitInfo> best = null;
boolean bestTooLarge = false;
long bestMergeBytes = 0;
// Consider all merge starts:
for (int startIdx = 0; startIdx <= eligible.size() - maxMergeAtOnce; startIdx++) {
long totAfterMergeBytes = 0;
final List<SegmentCommitInfo> candidate = new ArrayList<SegmentCommitInfo>();
boolean hitTooLarge = false;
for (int idx = startIdx; idx < eligible.size() && candidate.size() < maxMergeAtOnce; idx++) {
final SegmentCommitInfo info = eligible.get(idx);
final long segBytes = size(info);
if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
hitTooLarge = true;
// NOTE: we continue, so that we can try
// "packing" smaller segments into this merge
// to see if we can get closer to the max
// size; this in general is not perfect since
// this is really "bin packing" and we'd have
// to try different permutations.
continue;
}
candidate.add(info);
totAfterMergeBytes += segBytes;
}
final MergeScore score = score(candidate, hitTooLarge, mergingBytes);
if (verbose()) {
message(" maybe=" + writer.get().segString(candidate) + " score=" + score.getScore() + " " + score.getExplanation() + " tooLarge=" + hitTooLarge + " size=" + String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes / 1024. / 1024.));
}
// If we are already running a max sized merge
// (maxMergeIsRunning), don't allow another max
// sized merge to kick off:
if ((bestScore == null || score.getScore() < bestScore.getScore()) && (!hitTooLarge || !maxMergeIsRunning)) {
best = candidate;
bestScore = score;
bestTooLarge = hitTooLarge;
bestMergeBytes = totAfterMergeBytes;
}
}
if (best != null) {
if (spec == null) {
spec = new MergeSpecification();
}
final OneMerge merge = new OneMerge(best);
spec.add(merge);
for (SegmentCommitInfo info : merge.segments) {
toBeMerged.add(info);
}
if (verbose()) {
message(" add merge=" + writer.get().segString(merge.segments) + " size=" + String.format(Locale.ROOT, "%.3f MB", bestMergeBytes / 1024. / 1024.) + " score=" + String.format(Locale.ROOT, "%.3f", bestScore.getScore()) + " " + bestScore.getExplanation() + (bestTooLarge ? " [max merge]" : ""));
}
} else {
return spec;
}
} else {
return spec;
}
}
}