in quantiles/include/quantiles_sketch_impl.hpp [1092:1163]
void quantiles_sketch<T, C, A>::downsampling_merge(quantiles_sketch& tgt, FwdSk&& src) {
if (src.get_k() % tgt.get_k() != 0) {
throw std::invalid_argument("src.get_k() is not a multiple of tgt.get_k()");
}
if (src.is_empty()) {
return;
}
const uint16_t downsample_factor = src.get_k() / tgt.get_k();
const uint8_t lg_sample_factor = count_trailing_zeros_in_u32(downsample_factor);
const uint64_t new_n = src.get_n() + tgt.get_n();
// move items from src's base buffer
for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
}
// check (after moving raw items) if we need to extend levels array
const uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
if (levels_needed > tgt.levels_.size()) {
tgt.levels_.reserve(levels_needed);
while (tgt.levels_.size() < levels_needed) {
Level empty_level(tgt.allocator_);
empty_level.reserve(tgt.get_k());
tgt.levels_.push_back(std::move(empty_level));
}
}
Level down_buf(tgt.allocator_);
down_buf.reserve(tgt.get_k());
Level scratch_buf(tgt.allocator_);
scratch_buf.reserve(2 * tgt.get_k());
uint64_t src_pattern = src.bit_pattern_;
for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
if ((src_pattern & 1) > 0) {
down_buf.clear();
scratch_buf.clear();
// zip with stride, leaving input buffer intact
zip_buffer_with_stride(src.levels_[src_lvl], down_buf, downsample_factor);
// propagate-carry
in_place_propagate_carry(src_lvl + lg_sample_factor,
down_buf, scratch_buf,
false, tgt);
// update n_ at the end
}
}
tgt.n_ = new_n;
if ((tgt.get_n() / (2 * tgt.get_k())) != tgt.bit_pattern_) {
throw std::logic_error("Failed internal consistency check after downsampling_merge()");
}
// update min and max items
// can't just check is_empty() since min and max might not have been set if
// there were no base buffer items added via update()
if (!tgt.min_item_) {
tgt.min_item_.emplace(conditional_forward<FwdSk>(*src.min_item_));
} else {
if (tgt.comparator_(*src.min_item_, *tgt.min_item_))
*tgt.min_item_ = conditional_forward<FwdSk>(*src.min_item_);
}
if (!tgt.max_item_) {
tgt.max_item_.emplace(conditional_forward<FwdSk>(*src.max_item_));
} else {
if (tgt.comparator_(*tgt.max_item_, *src.max_item_))
*tgt.max_item_ = conditional_forward<FwdSk>(*src.max_item_);
}
}