private void merge()

in dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/DubboMergingDigest.java [348:470]


    private void merge(double[] incomingMean, double[] incomingWeight, int incomingCount,
                       List<List<Double>> incomingData, int[] incomingOrder,
                       double unmergedWeight, boolean runBackwards, double compression) {
        // when our incoming buffer fills up, we combine our existing centroids with the incoming data,
        // and then reduce the centroids by merging if possible
        assert lastUsedCell.get() <= 0 || weight[0] == 1;
        assert lastUsedCell.get() <= 0 || weight[lastUsedCell.get() - 1] == 1;
        System.arraycopy(mean, 0, incomingMean, incomingCount, lastUsedCell.get());

        System.arraycopy(weight, 0, incomingWeight, incomingCount, lastUsedCell.get());
        incomingCount += lastUsedCell.get();

        if (incomingData != null) {
            for (int i = 0; i < lastUsedCell.get(); i++) {
                assert data != null;
                incomingData.add(data.get(i));
            }
            data = new ArrayList<>();
        }
        if (incomingOrder == null) {
            incomingOrder = new int[incomingCount];
        }
        Sort.stableSort(incomingOrder, incomingMean, incomingCount);

        totalWeight += unmergedWeight;

        // option to run backwards is to help investigate bias in errors
        if (runBackwards) {
            Sort.reverse(incomingOrder, 0, incomingCount);
        }


        // start by copying the least incoming value to the normal buffer
        lastUsedCell.set(0);
        mean[lastUsedCell.get()] = incomingMean[incomingOrder[0]];
        weight[lastUsedCell.get()] = incomingWeight[incomingOrder[0]];
        double wSoFar = 0;
        if (data != null) {
            assert incomingData != null;
            data.add(incomingData.get(incomingOrder[0]));
        }

        // weight will contain all zeros after this loop

        double normalizer = scale.normalizer(compression, totalWeight);
        double k1 = scale.k(0, normalizer);
        double wLimit = totalWeight * scale.q(k1 + 1, normalizer);
        for (int i = 1; i < incomingCount; i++) {
            int ix = incomingOrder[i];
            double proposedWeight = weight[lastUsedCell.get()] + incomingWeight[ix];
            double projectedW = wSoFar + proposedWeight;
            boolean addThis;
            if (useWeightLimit) {
                double q0 = wSoFar / totalWeight;
                double q2 = (wSoFar + proposedWeight) / totalWeight;
                addThis = proposedWeight <= totalWeight * Math.min(scale.max(q0, normalizer), scale.max(q2, normalizer));
            } else {
                addThis = projectedW <= wLimit;
            }
            if (i == 1 || i == incomingCount - 1) {
                // force last centroid to never merge
                addThis = false;
            }

            if (addThis) {
                // next point will fit
                // so merge into existing centroid
                weight[lastUsedCell.get()] += incomingWeight[ix];
                mean[lastUsedCell.get()] = mean[lastUsedCell.get()] + (incomingMean[ix] - mean[lastUsedCell.get()]) * incomingWeight[ix] / weight[lastUsedCell.get()];
                incomingWeight[ix] = 0;

                if (data != null) {
                    while (data.size() <= lastUsedCell.get()) {
                        data.add(new ArrayList<Double>());
                    }
                    assert incomingData != null;
                    assert data.get(lastUsedCell.get()) != incomingData.get(ix);
                    data.get(lastUsedCell.get()).addAll(incomingData.get(ix));
                }
            } else {
                // didn't fit ... move to next output, copy out first centroid
                wSoFar += weight[lastUsedCell.get()];
                if (!useWeightLimit) {
                    k1 = scale.k(wSoFar / totalWeight, normalizer);
                    wLimit = totalWeight * scale.q(k1 + 1, normalizer);
                }

                lastUsedCell.getAndIncrement();
                mean[lastUsedCell.get()] = incomingMean[ix];
                weight[lastUsedCell.get()] = incomingWeight[ix];
                incomingWeight[ix] = 0;

                if (data != null) {
                    assert incomingData != null;
                    assert data.size() == lastUsedCell.get();
                    data.add(incomingData.get(ix));
                }
            }
        }
        // points to next empty cell
        lastUsedCell.getAndIncrement();

        // sanity check
        double sum = 0;
        for (int i = 0; i < lastUsedCell.get(); i++) {
            sum += weight[i];
        }
        assert sum == totalWeight;
        if (runBackwards) {
            Sort.reverse(mean, 0, lastUsedCell.get());
            Sort.reverse(weight, 0, lastUsedCell.get());
            if (data != null) {
                Collections.reverse(data);
            }
        }
        assert weight[0] == 1;
        assert weight[lastUsedCell.get() - 1] == 1;

        if (totalWeight > 0) {
            min = Math.min(min, mean[0]);
            max = Math.max(max, mean[lastUsedCell.get() - 1]);
        }
    }