in dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/DubboMergingDigest.java [348:470]
private void merge(double[] incomingMean, double[] incomingWeight, int incomingCount,
List<List<Double>> incomingData, int[] incomingOrder,
double unmergedWeight, boolean runBackwards, double compression) {
// when our incoming buffer fills up, we combine our existing centroids with the incoming data,
// and then reduce the centroids by merging if possible
assert lastUsedCell.get() <= 0 || weight[0] == 1;
assert lastUsedCell.get() <= 0 || weight[lastUsedCell.get() - 1] == 1;
System.arraycopy(mean, 0, incomingMean, incomingCount, lastUsedCell.get());
System.arraycopy(weight, 0, incomingWeight, incomingCount, lastUsedCell.get());
incomingCount += lastUsedCell.get();
if (incomingData != null) {
for (int i = 0; i < lastUsedCell.get(); i++) {
assert data != null;
incomingData.add(data.get(i));
}
data = new ArrayList<>();
}
if (incomingOrder == null) {
incomingOrder = new int[incomingCount];
}
Sort.stableSort(incomingOrder, incomingMean, incomingCount);
totalWeight += unmergedWeight;
// option to run backwards is to help investigate bias in errors
if (runBackwards) {
Sort.reverse(incomingOrder, 0, incomingCount);
}
// start by copying the least incoming value to the normal buffer
lastUsedCell.set(0);
mean[lastUsedCell.get()] = incomingMean[incomingOrder[0]];
weight[lastUsedCell.get()] = incomingWeight[incomingOrder[0]];
double wSoFar = 0;
if (data != null) {
assert incomingData != null;
data.add(incomingData.get(incomingOrder[0]));
}
// weight will contain all zeros after this loop
double normalizer = scale.normalizer(compression, totalWeight);
double k1 = scale.k(0, normalizer);
double wLimit = totalWeight * scale.q(k1 + 1, normalizer);
for (int i = 1; i < incomingCount; i++) {
int ix = incomingOrder[i];
double proposedWeight = weight[lastUsedCell.get()] + incomingWeight[ix];
double projectedW = wSoFar + proposedWeight;
boolean addThis;
if (useWeightLimit) {
double q0 = wSoFar / totalWeight;
double q2 = (wSoFar + proposedWeight) / totalWeight;
addThis = proposedWeight <= totalWeight * Math.min(scale.max(q0, normalizer), scale.max(q2, normalizer));
} else {
addThis = projectedW <= wLimit;
}
if (i == 1 || i == incomingCount - 1) {
// force last centroid to never merge
addThis = false;
}
if (addThis) {
// next point will fit
// so merge into existing centroid
weight[lastUsedCell.get()] += incomingWeight[ix];
mean[lastUsedCell.get()] = mean[lastUsedCell.get()] + (incomingMean[ix] - mean[lastUsedCell.get()]) * incomingWeight[ix] / weight[lastUsedCell.get()];
incomingWeight[ix] = 0;
if (data != null) {
while (data.size() <= lastUsedCell.get()) {
data.add(new ArrayList<Double>());
}
assert incomingData != null;
assert data.get(lastUsedCell.get()) != incomingData.get(ix);
data.get(lastUsedCell.get()).addAll(incomingData.get(ix));
}
} else {
// didn't fit ... move to next output, copy out first centroid
wSoFar += weight[lastUsedCell.get()];
if (!useWeightLimit) {
k1 = scale.k(wSoFar / totalWeight, normalizer);
wLimit = totalWeight * scale.q(k1 + 1, normalizer);
}
lastUsedCell.getAndIncrement();
mean[lastUsedCell.get()] = incomingMean[ix];
weight[lastUsedCell.get()] = incomingWeight[ix];
incomingWeight[ix] = 0;
if (data != null) {
assert incomingData != null;
assert data.size() == lastUsedCell.get();
data.add(incomingData.get(ix));
}
}
}
// points to next empty cell
lastUsedCell.getAndIncrement();
// sanity check
double sum = 0;
for (int i = 0; i < lastUsedCell.get(); i++) {
sum += weight[i];
}
assert sum == totalWeight;
if (runBackwards) {
Sort.reverse(mean, 0, lastUsedCell.get());
Sort.reverse(weight, 0, lastUsedCell.get());
if (data != null) {
Collections.reverse(data);
}
}
assert weight[0] == 1;
assert weight[lastUsedCell.get() - 1] == 1;
if (totalWeight > 0) {
min = Math.min(min, mean[0]);
max = Math.max(max, mean[lastUsedCell.get() - 1]);
}
}