void quantiles_sketch::downsampling_merge()

in quantiles/include/quantiles_sketch_impl.hpp [1092:1163]


void quantiles_sketch<T, C, A>::downsampling_merge(quantiles_sketch& tgt, FwdSk&& src) {
  if (src.get_k() % tgt.get_k() != 0) {
    throw std::invalid_argument("src.get_k() is not a multiple of tgt.get_k()");
  }
  if (src.is_empty()) {
    return;
  }

  const uint16_t downsample_factor = src.get_k() / tgt.get_k();
  const uint8_t lg_sample_factor = count_trailing_zeros_in_u32(downsample_factor);

  const uint64_t new_n = src.get_n() + tgt.get_n();

  // move items from src's base buffer
  for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
    tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
  }

  // check (after moving raw items) if we need to extend levels array
  const uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
  if (levels_needed > tgt.levels_.size()) {
    tgt.levels_.reserve(levels_needed);
    while (tgt.levels_.size() < levels_needed) {
      Level empty_level(tgt.allocator_);
      empty_level.reserve(tgt.get_k());
      tgt.levels_.push_back(std::move(empty_level));
    }
  }

  Level down_buf(tgt.allocator_);
  down_buf.reserve(tgt.get_k());

  Level scratch_buf(tgt.allocator_);
  scratch_buf.reserve(2 * tgt.get_k());

  uint64_t src_pattern = src.bit_pattern_;
  for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
    if ((src_pattern & 1) > 0) {
      down_buf.clear();
      scratch_buf.clear();

      // zip with stride, leaving input buffer intact
      zip_buffer_with_stride(src.levels_[src_lvl], down_buf, downsample_factor);

      // propagate-carry
      in_place_propagate_carry(src_lvl + lg_sample_factor,
                               down_buf, scratch_buf,
                               false, tgt);
      // update n_ at the end
    }
  }
  tgt.n_ = new_n;
  if ((tgt.get_n() / (2 * tgt.get_k())) != tgt.bit_pattern_) {
    throw std::logic_error("Failed internal consistency check after downsampling_merge()");
  }

  // update min and max items
  // can't just check is_empty() since min and max might not have been set if
  // there were no base buffer items added via update()
  if (!tgt.min_item_) {
    tgt.min_item_.emplace(conditional_forward<FwdSk>(*src.min_item_));
  } else {
    if (tgt.comparator_(*src.min_item_, *tgt.min_item_))
      *tgt.min_item_ = conditional_forward<FwdSk>(*src.min_item_);
  }
  if (!tgt.max_item_) {
    tgt.max_item_.emplace(conditional_forward<FwdSk>(*src.max_item_));
  } else {
    if (tgt.comparator_(*tgt.max_item_, *src.max_item_))
      *tgt.max_item_ = conditional_forward<FwdSk>(*src.max_item_);
  }
}