void KllSketch::merge()

in velox/functions/lib/KllSketch-inl.h [429:527]


void KllSketch<T, A, C>::merge(const folly::Range<Iter>& others) {
  auto newN = n_;
  for (auto& other : others) {
    if (other.n_ == 0) {
      continue;
    }
    if (newN == 0) {
      minValue_ = other.minValue_;
      maxValue_ = other.maxValue_;
    } else {
      minValue_ = std::min(minValue_, other.minValue_, C());
      maxValue_ = std::max(maxValue_, other.maxValue_, C());
    }
    newN += other.n_;
  }
  if (newN == n_) {
    return;
  }
  // Merge bottom level.
  for (auto& other : others) {
    for (uint32_t j = other.levels_[0]; j < other.levels_[1]; ++j) {
      items_[insertPosition()] = other.items_[j];
    }
  }
  // Merge higher levels.
  auto tmpNumItems = getNumRetained();
  auto provisionalNumLevels = numLevels();
  for (auto& other : others) {
    if (other.numLevels() >= 2) {
      tmpNumItems += other.levels_.back() - other.levels_[1];
      provisionalNumLevels = std::max(provisionalNumLevels, other.numLevels());
    }
  }
  if (tmpNumItems > getNumRetained()) {
    std::vector<T, A> workbuf(tmpNumItems);
    const uint8_t ub = 1 + detail::floorLog2(newN, 1);
    const size_t workLevelsSize = ub + 2;
    std::vector<uint32_t, AllocU32> worklevels(workLevelsSize, 0, allocator_);
    std::vector<uint32_t, AllocU32> outlevels(workLevelsSize, 0, allocator_);
    // Populate work arrays.
    worklevels[0] = 0;
    std::move(&items_[levels_[0]], &items_[levels_[1]], &workbuf[0]);
    worklevels[1] = safeLevelSize(0);
    // Merge each level, each level in all sketches are already sorted.
    for (uint8_t lvl = 1; lvl < provisionalNumLevels; ++lvl) {
      using Entry = std::pair<const T*, const T*>;
      using AllocEntry =
          typename std::allocator_traits<A>::template rebind_alloc<Entry>;
      auto gt = [](const Entry& x, const Entry& y) {
        return C()(*y.first, *x.first);
      };
      std::priority_queue<Entry, std::vector<Entry, AllocEntry>, decltype(gt)>
          pq(gt, allocator_);
      if (auto sz = safeLevelSize(lvl); sz > 0) {
        pq.emplace(&items_[levels_[lvl]], &items_[levels_[lvl] + sz]);
      }
      for (auto& other : others) {
        if (auto sz = other.safeLevelSize(lvl); sz > 0) {
          pq.emplace(
              &other.items_[other.levels_[lvl]],
              &other.items_[other.levels_[lvl] + sz]);
        }
      }
      int outIndex = worklevels[lvl];
      while (!pq.empty()) {
        auto [s, t] = pq.top();
        pq.pop();
        workbuf[outIndex++] = *s++;
        if (s < t) {
          pq.emplace(s, t);
        }
      }
      worklevels[lvl + 1] = outIndex;
    }
    auto result = detail::generalCompress<T, C>(
        k_,
        provisionalNumLevels,
        workbuf.data(),
        worklevels.data(),
        outlevels.data(),
        isLevelZeroSorted_,
        randomBit_);
    VELOX_DCHECK_LE(result.finalNumLevels, ub);
    // Now we need to transfer the results back into "this" sketch.
    items_.resize(result.finalCapacity);
    const auto freeSpaceAtBottom = result.finalCapacity - result.finalNumItems;
    std::move(
        &workbuf[outlevels[0]],
        &workbuf[outlevels[0] + result.finalNumItems],
        &items_[freeSpaceAtBottom]);
    levels_.resize(result.finalNumLevels + 1);
    const auto offset = freeSpaceAtBottom - outlevels[0];
    for (unsigned lvl = 0; lvl < levels_.size(); ++lvl) {
      levels_[lvl] = outlevels[lvl] + offset;
    }
  }
  n_ = newN;
  VELOX_DCHECK_EQ(detail::sumSampleWeights(numLevels(), levels_.data()), n_);
}