public MergeSpecification findMerges()

in oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/CommitMitigatingTieredMergePolicy.java [365:595]


    public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos) throws IOException {
        int segmentSize = infos.size();
        timeSeriesCount++;

        if (timeSeriesCount % timeSeriesLength == 0) {
            // reset averages
            avgCommitRateDocs = 0d;
            avgCommitRateMB = 0d;
            avgSegs = 0d;
        }
        avgSegs = singleExpSmoothing(segmentSize, avgSegs);

        log.debug("segments: current {}, average {}", segmentSize, avgSegs);

        if (verbose()) {
            message("findMerges: " + segmentSize + " segments, " + avgSegs + " average");
        }
        if (segmentSize == 0) {
            return null;
        }

        // if no. of segments exceeds the maximum, adjust the maximum rates to allow more merges (less commit/rate mitigation)
        if (segmentSize > maxNoOfSegsForMitigation) {
            if (avgCommitRateDocs > maxCommitRateDocs) {
                double v = singleExpSmoothing(avgCommitRateDocs, maxCommitRateDocs);
                log.debug("adjusting maxCommitRateDocs from {} to {}", maxCommitRateDocs, v);
                maxCommitRateDocs = v;
            }
            if (avgCommitRateMB > maxCommitRateMB) {
                double v = singleExpSmoothing(avgCommitRateMB, maxCommitRateMB);
                log.debug("adjusting maxCommitRateMB from {} to {}", maxCommitRateMB, v);
                maxCommitRateMB = v;
            }
        }

        long now = System.currentTimeMillis();
        double timeDelta = (now / 1000d) - (time / 1000d);
        double commitRate = Math.abs(docCount - infos.totalDocCount()) / timeDelta;
        time = now;

        avgCommitRateDocs = singleExpSmoothing(commitRate, avgCommitRateDocs);

        log.debug("commit rate: current {}, average {}, max {} docs/sec", commitRate, avgCommitRateDocs, maxCommitRateDocs);

        docCount = infos.totalDocCount();

        if (verbose()) {
            message(commitRate + "doc/s (max: " + maxCommitRateDocs + ", avg: " + avgCommitRateDocs + " doc/s)");
        }

        // do not mitigate if there're too many segments to avoid affecting performance
        if (commitRate > maxCommitRateDocs && segmentSize < maxNoOfSegsForMitigation) {
            log.debug("mitigation due to {} > {} docs/sec and segments {} < {})", commitRate, maxCommitRateDocs,
                    segmentSize, maxNoOfSegsForMitigation);
            return null;
        }

        final Collection<SegmentCommitInfo> merging = writer.get().getMergingSegments();
        final Collection<SegmentCommitInfo> toBeMerged = new HashSet<SegmentCommitInfo>();

        final List<SegmentCommitInfo> infosSorted = new ArrayList<SegmentCommitInfo>(infos.asList());
        Collections.sort(infosSorted, new SegmentByteSizeDescending());

        // Compute total index bytes & print details about the index
        long totIndexBytes = 0;
        long minSegmentBytes = Long.MAX_VALUE;
        for (SegmentCommitInfo info : infosSorted) {
            final long segBytes = size(info);
            if (verbose()) {
                String extra = merging.contains(info) ? " [merging]" : "";
                if (segBytes >= maxMergedSegmentBytes / 2.0) {
                    extra += " [skip: too large]";
                } else if (segBytes < floorSegmentBytes) {
                    extra += " [floored]";
                }
                message("  seg=" + writer.get().segString(info) + " size=" + String.format(Locale.ROOT, "%.3f", segBytes / 1024 / 1024.) + " MB" + extra);
            }

            minSegmentBytes = Math.min(segBytes, minSegmentBytes);
            // Accum total byte size
            totIndexBytes += segBytes;
        }

        // If we have too-large segments, grace them out
        // of the maxSegmentCount:
        int tooBigCount = 0;
        while (tooBigCount < infosSorted.size() && size(infosSorted.get(tooBigCount)) >= maxMergedSegmentBytes / 2.0) {
            totIndexBytes -= size(infosSorted.get(tooBigCount));
            tooBigCount++;
        }

        minSegmentBytes = floorSize(minSegmentBytes);

        // Compute max allowed segs in the index
        long levelSize = minSegmentBytes;
        long bytesLeft = totIndexBytes;
        double allowedSegCount = 0;
        while (true) {
            final double segCountLevel = bytesLeft / (double) levelSize;
            if (segCountLevel < segsPerTier) {
                allowedSegCount += Math.ceil(segCountLevel);
                break;
            }
            allowedSegCount += segsPerTier;
            bytesLeft -= segsPerTier * levelSize;
            levelSize *= maxMergeAtOnce;
        }
        int allowedSegCountInt = (int) allowedSegCount;

        MergeSpecification spec = null;

        // Cycle to possibly select more than one merge:
        while (true) {

            long mergingBytes = 0;
            double idxBytes = 0;

            // Gather eligible segments for merging, ie segments
            // not already being merged and not already picked (by
            // prior iteration of this loop) for merging:
            final List<SegmentCommitInfo> eligible = new ArrayList<SegmentCommitInfo>();
            for (int idx = tooBigCount; idx < infosSorted.size(); idx++) {
                final SegmentCommitInfo info = infosSorted.get(idx);
                if (merging.contains(info)) {
                    mergingBytes += info.sizeInBytes();
                } else if (!toBeMerged.contains(info)) {
                    eligible.add(info);
                }
                idxBytes += info.sizeInBytes();
            }
            idxBytes /= 1024d * 1024d;

            final boolean maxMergeIsRunning = mergingBytes >= maxMergedSegmentBytes;

            if (verbose()) {
                message("  allowedSegmentCount=" + allowedSegCountInt + " vs count=" + infosSorted.size() + " (eligible count=" + eligible.size() + ") tooBigCount=" + tooBigCount);
            }

            if (eligible.size() == 0) {
                return spec;
            }

            double bytes = idxBytes - this.mb;
            double mbRate = bytes / timeDelta;

            avgCommitRateMB = singleExpSmoothing(mbRate, avgCommitRateMB);

            log.debug("commit rate: current {}, average {}, max {} MB/sec", mbRate, avgCommitRateMB, maxCommitRateMB);

            if (verbose()) {
                message(mbRate + "mb/s (max: " + maxCommitRateMB + ", avg: " + avgCommitRateMB + " MB/s)");
            }

            this.mb = idxBytes;

            // do not mitigate if there're too many segments to avoid affecting performance
            if (mbRate > maxCommitRateMB && segmentSize < maxNoOfSegsForMitigation) {
                log.debug("mitigation due to {} > {} MB/sec and segments {} < {})", mbRate, maxCommitRateMB,
                        segmentSize, maxNoOfSegsForMitigation);
                return null;
            }

            if (eligible.size() >= allowedSegCountInt) {

                // OK we are over budget -- find best merge!
                MergeScore bestScore = null;
                List<SegmentCommitInfo> best = null;
                boolean bestTooLarge = false;
                long bestMergeBytes = 0;

                // Consider all merge starts:
                for (int startIdx = 0; startIdx <= eligible.size() - maxMergeAtOnce; startIdx++) {

                    long totAfterMergeBytes = 0;

                    final List<SegmentCommitInfo> candidate = new ArrayList<SegmentCommitInfo>();
                    boolean hitTooLarge = false;
                    for (int idx = startIdx; idx < eligible.size() && candidate.size() < maxMergeAtOnce; idx++) {
                        final SegmentCommitInfo info = eligible.get(idx);
                        final long segBytes = size(info);

                        if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
                            hitTooLarge = true;
                            // NOTE: we continue, so that we can try
                            // "packing" smaller segments into this merge
                            // to see if we can get closer to the max
                            // size; this in general is not perfect since
                            // this is really "bin packing" and we'd have
                            // to try different permutations.
                            continue;
                        }
                        candidate.add(info);
                        totAfterMergeBytes += segBytes;
                    }

                    final MergeScore score = score(candidate, hitTooLarge, mergingBytes);
                    if (verbose()) {
                        message("  maybe=" + writer.get().segString(candidate) + " score=" + score.getScore() + " " + score.getExplanation() + " tooLarge=" + hitTooLarge + " size=" + String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes / 1024. / 1024.));
                    }

                    // If we are already running a max sized merge
                    // (maxMergeIsRunning), don't allow another max
                    // sized merge to kick off:
                    if ((bestScore == null || score.getScore() < bestScore.getScore()) && (!hitTooLarge || !maxMergeIsRunning)) {
                        best = candidate;
                        bestScore = score;
                        bestTooLarge = hitTooLarge;
                        bestMergeBytes = totAfterMergeBytes;
                    }
                }

                if (best != null) {
                    if (spec == null) {
                        spec = new MergeSpecification();
                    }
                    final OneMerge merge = new OneMerge(best);
                    spec.add(merge);
                    for (SegmentCommitInfo info : merge.segments) {
                        toBeMerged.add(info);
                    }
                    if (verbose()) {
                        message("  add merge=" + writer.get().segString(merge.segments) + " size=" + String.format(Locale.ROOT, "%.3f MB", bestMergeBytes / 1024. / 1024.) + " score=" + String.format(Locale.ROOT, "%.3f", bestScore.getScore()) + " " + bestScore.getExplanation() + (bestTooLarge ? " [max merge]" : ""));
                    }
                } else {
                    return spec;
                }
            } else {
                return spec;
            }
        }
    }