in flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java [170:223]
public QuantileSummary merge(QuantileSummary other) {
Preconditions.checkState(
headBuffer.isEmpty(), "Current buffer needs to be compressed before merge.");
Preconditions.checkState(
other.headBuffer.isEmpty(), "Other buffer needs to be compressed before merge.");
if (other.count == 0) {
return shallowCopy();
} else if (count == 0) {
return other.shallowCopy();
} else {
List<StatsTuple> mergedSampled = new ArrayList<>();
double mergedRelativeError = Math.max(relativeError, other.relativeError);
long mergedCount = count + other.count;
long additionalSelfDelta =
Double.valueOf(Math.floor(2 * other.relativeError * other.count)).longValue();
long additionalOtherDelta =
Double.valueOf(Math.floor(2 * relativeError * count)).longValue();
int selfIdx = 0;
int otherIdx = 0;
while (selfIdx < sampled.size() && otherIdx < other.sampled.size()) {
StatsTuple selfSample = sampled.get(selfIdx);
StatsTuple otherSample = other.sampled.get(otherIdx);
StatsTuple nextSample;
long additionalDelta = 0;
if (selfSample.value < otherSample.value) {
nextSample = selfSample;
if (otherIdx > 0) {
additionalDelta = additionalSelfDelta;
}
selfIdx++;
} else {
nextSample = otherSample;
if (selfIdx > 0) {
additionalDelta = additionalOtherDelta;
}
otherIdx++;
}
nextSample = nextSample.shallowCopy();
nextSample.delta = nextSample.delta + additionalDelta;
mergedSampled.add(nextSample);
}
IntStream.range(selfIdx, sampled.size())
.forEach(i -> mergedSampled.add(sampled.get(i)));
IntStream.range(otherIdx, other.sampled.size())
.forEach(i -> mergedSampled.add(other.sampled.get(i)));
List<StatsTuple> comp =
compressInternal(mergedSampled, 2 * mergedRelativeError * mergedCount);
return new QuantileSummary(
mergedRelativeError, compressThreshold, comp, mergedCount, true);
}
}