public QuantileSummary merge()

in flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java [170:223]


    public QuantileSummary merge(QuantileSummary other) {
        Preconditions.checkState(
                headBuffer.isEmpty(), "Current buffer needs to be compressed before merge.");
        Preconditions.checkState(
                other.headBuffer.isEmpty(), "Other buffer needs to be compressed before merge.");

        if (other.count == 0) {
            return shallowCopy();
        } else if (count == 0) {
            return other.shallowCopy();
        } else {
            List<StatsTuple> mergedSampled = new ArrayList<>();
            double mergedRelativeError = Math.max(relativeError, other.relativeError);
            long mergedCount = count + other.count;
            long additionalSelfDelta =
                    Double.valueOf(Math.floor(2 * other.relativeError * other.count)).longValue();
            long additionalOtherDelta =
                    Double.valueOf(Math.floor(2 * relativeError * count)).longValue();

            int selfIdx = 0;
            int otherIdx = 0;
            while (selfIdx < sampled.size() && otherIdx < other.sampled.size()) {
                StatsTuple selfSample = sampled.get(selfIdx);
                StatsTuple otherSample = other.sampled.get(otherIdx);
                StatsTuple nextSample;
                long additionalDelta = 0;
                if (selfSample.value < otherSample.value) {
                    nextSample = selfSample;
                    if (otherIdx > 0) {
                        additionalDelta = additionalSelfDelta;
                    }
                    selfIdx++;
                } else {
                    nextSample = otherSample;
                    if (selfIdx > 0) {
                        additionalDelta = additionalOtherDelta;
                    }
                    otherIdx++;
                }
                nextSample = nextSample.shallowCopy();
                nextSample.delta = nextSample.delta + additionalDelta;
                mergedSampled.add(nextSample);
            }
            IntStream.range(selfIdx, sampled.size())
                    .forEach(i -> mergedSampled.add(sampled.get(i)));
            IntStream.range(otherIdx, other.sampled.size())
                    .forEach(i -> mergedSampled.add(other.sampled.get(i)));

            List<StatsTuple> comp =
                    compressInternal(mergedSampled, 2 * mergedRelativeError * mergedCount);
            return new QuantileSummary(
                    mergedRelativeError, compressThreshold, comp, mergedCount, true);
        }
    }