in flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java [296:325]
private QuantileSummary insertHeadBuffer() {
if (headBuffer.isEmpty()) {
return this;
}
long newCount = count;
List<StatsTuple> newSamples = new ArrayList<>();
List<Double> sorted = headBuffer.stream().sorted().collect(Collectors.toList());
int cursor = 0;
for (int i = 0; i < sorted.size(); i++) {
while (cursor < sampled.size() && sampled.get(cursor).value <= sorted.get(i)) {
newSamples.add(sampled.get(cursor));
cursor++;
}
long delta = Double.valueOf(Math.floor(2.0 * relativeError * count)).longValue();
if (newSamples.isEmpty() || (cursor == sampled.size() && i == sorted.size() - 1)) {
delta = 0;
}
StatsTuple tuple = new StatsTuple(sorted.get(i), 1L, delta);
newSamples.add(tuple);
newCount++;
}
for (int i = cursor; i < sampled.size(); i++) {
newSamples.add(sampled.get(i));
}
return new QuantileSummary(relativeError, compressThreshold, newSamples, newCount, false);
}