in velox/functions/lib/KllSketch-inl.h [429:527]
void KllSketch<T, A, C>::merge(const folly::Range<Iter>& others) {
auto newN = n_;
for (auto& other : others) {
if (other.n_ == 0) {
continue;
}
if (newN == 0) {
minValue_ = other.minValue_;
maxValue_ = other.maxValue_;
} else {
minValue_ = std::min(minValue_, other.minValue_, C());
maxValue_ = std::max(maxValue_, other.maxValue_, C());
}
newN += other.n_;
}
if (newN == n_) {
return;
}
// Merge bottom level.
for (auto& other : others) {
for (uint32_t j = other.levels_[0]; j < other.levels_[1]; ++j) {
items_[insertPosition()] = other.items_[j];
}
}
// Merge higher levels.
auto tmpNumItems = getNumRetained();
auto provisionalNumLevels = numLevels();
for (auto& other : others) {
if (other.numLevels() >= 2) {
tmpNumItems += other.levels_.back() - other.levels_[1];
provisionalNumLevels = std::max(provisionalNumLevels, other.numLevels());
}
}
if (tmpNumItems > getNumRetained()) {
std::vector<T, A> workbuf(tmpNumItems);
const uint8_t ub = 1 + detail::floorLog2(newN, 1);
const size_t workLevelsSize = ub + 2;
std::vector<uint32_t, AllocU32> worklevels(workLevelsSize, 0, allocator_);
std::vector<uint32_t, AllocU32> outlevels(workLevelsSize, 0, allocator_);
// Populate work arrays.
worklevels[0] = 0;
std::move(&items_[levels_[0]], &items_[levels_[1]], &workbuf[0]);
worklevels[1] = safeLevelSize(0);
// Merge each level, each level in all sketches are already sorted.
for (uint8_t lvl = 1; lvl < provisionalNumLevels; ++lvl) {
using Entry = std::pair<const T*, const T*>;
using AllocEntry =
typename std::allocator_traits<A>::template rebind_alloc<Entry>;
auto gt = [](const Entry& x, const Entry& y) {
return C()(*y.first, *x.first);
};
std::priority_queue<Entry, std::vector<Entry, AllocEntry>, decltype(gt)>
pq(gt, allocator_);
if (auto sz = safeLevelSize(lvl); sz > 0) {
pq.emplace(&items_[levels_[lvl]], &items_[levels_[lvl] + sz]);
}
for (auto& other : others) {
if (auto sz = other.safeLevelSize(lvl); sz > 0) {
pq.emplace(
&other.items_[other.levels_[lvl]],
&other.items_[other.levels_[lvl] + sz]);
}
}
int outIndex = worklevels[lvl];
while (!pq.empty()) {
auto [s, t] = pq.top();
pq.pop();
workbuf[outIndex++] = *s++;
if (s < t) {
pq.emplace(s, t);
}
}
worklevels[lvl + 1] = outIndex;
}
auto result = detail::generalCompress<T, C>(
k_,
provisionalNumLevels,
workbuf.data(),
worklevels.data(),
outlevels.data(),
isLevelZeroSorted_,
randomBit_);
VELOX_DCHECK_LE(result.finalNumLevels, ub);
// Now we need to transfer the results back into "this" sketch.
items_.resize(result.finalCapacity);
const auto freeSpaceAtBottom = result.finalCapacity - result.finalNumItems;
std::move(
&workbuf[outlevels[0]],
&workbuf[outlevels[0] + result.finalNumItems],
&items_[freeSpaceAtBottom]);
levels_.resize(result.finalNumLevels + 1);
const auto offset = freeSpaceAtBottom - outlevels[0];
for (unsigned lvl = 0; lvl < levels_.size(); ++lvl) {
levels_[lvl] = outlevels[lvl] + offset;
}
}
n_ = newN;
VELOX_DCHECK_EQ(detail::sumSampleWeights(numLevels(), levels_.data()), n_);
}