lib/maths/common/CQuantileSketch.cc (637 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <maths/common/CQuantileSketch.h>
#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <core/CPersistUtils.h>
#include <core/RestoreMacros.h>
#include <maths/common/CChecksum.h>
#include <maths/common/COrderings.h>
#include <boost/operators.hpp>
#include <algorithm>
#include <cmath>
#include <limits>
#include <numeric>
#include <optional>
#include <random>
#if defined(__SSE__)
#include <xmmintrin.h>
#define ml_unaligned_load_128 _mm_loadu_ps
#define ml_unaligned_store_128 _mm_storeu_ps
#define ml_minimum_128 _mm_min_ps
#define ml_subtract_128 _mm_sub_ps
#define ml_multiply_128 _mm_mul_ps
#define ml_shuffle_mask(w, x, y, z) _MM_SHUFFLE(w, x, y, z)
#define ml_shuffle_128 _mm_shuffle_ps
#define ml_rotate_128(x) _mm_shuffle_ps(x, x, _MM_SHUFFLE(0, 3, 2, 1))
#elif defined(__ARM_NEON__)
#include <type_traits>
#include <arm_neon.h>
// clang-format off
#define ml_unaligned_load_128(x) vld1q_f32(x)
#define ml_unaligned_store_128(x, y) vst1q_f32(x, y)
#define ml_minimum_128(x, y) vminq_f32(x, y)
#define ml_subtract_128(x, y) vsubq_f32(x, y)
#define ml_multiply_128(x, y) vmulq_f32(x, y)
#define ml_shuffle_mask(w, x, y, z) \
std::integral_constant<int, (((w) << 6) | ((x) << 4) | ((y) << 2) | (z))>{}
// clang-format on
template<typename MASK>
inline __attribute__((always_inline)) auto
ml_shuffle_128(float32x4_t a, float32x4_t b, MASK) {
float32x4_t result;
result = vmovq_n_f32(vgetq_lane_f32(a, MASK::value & 0x3));
result = vsetq_lane_f32(vgetq_lane_f32(a, (MASK::value >> 2) & 0x3), result, 1);
result = vsetq_lane_f32(vgetq_lane_f32(b, (MASK::value >> 4) & 0x3), result, 2);
result = vsetq_lane_f32(vgetq_lane_f32(b, (MASK::value >> 6) & 0x3), result, 3);
return result;
}
inline __attribute__((always_inline)) auto ml_rotate_128(float32x4_t x) {
float32x2_t x21{vget_high_f32(vextq_f32(x, x, 3))};
float32x2_t x03{vget_low_f32(vextq_f32(x, x, 3))};
return vcombine_f32(x21, x03);
}
#else
// clang-format off
#define ml_unaligned_load_128(x) \
std::array<float, 4>{*(x), *((x) + 1), *((x) + 2), *((x) + 3)};
#define ml_unaligned_store_128(x, y) \
*(x) = (y)[0]; \
*((x) + 1) = (y)[1]; \
*((x) + 2) = (y)[2]; \
*((x) + 3) = (y)[3]
#define ml_minimum_128(x, y) \
std::array<float, 4>{ \
std::min((x)[0], (y)[0]), std::min((x)[1], (y)[1]), \
std::min((x)[2], (y)[2]), std::min((x)[3], (y)[3]) \
}
#define ml_subtract_128(x, y) \
std::array<float, 4>{ \
(x)[0] - (y)[0], (x)[1] - (y)[1], (x)[2] - (y)[2], (x)[3] - (y)[3] \
}
#define ml_multiply_128(x, y) \
std::array<float, 4>{ \
(x)[0] * (y)[0], (x)[1] * (y)[1], \
(x)[2] * (y)[2], (x)[3] * (y)[3] \
}
#define ml_shuffle_mask(w, x, y, z) (((w) << 6) | ((x) << 4) | ((y) << 2) | (z))
#define ml_shuffle_128(x, y, mask) \
std::array<float, 4>{ \
(x)[(mask) & 0x3], (x)[((mask) >> 2) & 0x3], \
(y)[((mask) >> 4) & 0x3], (y)[((mask) >> 6) & 0x3] \
}
#define ml_rotate_128(x) \
std::array<float, 4>{(x)[1], (x)[2], (x)[3], (x)[0]};
// clang-format on
#endif
namespace ml {
namespace maths {
namespace common {
namespace {
using TFloatFloatPr = CQuantileSketch::TFloatFloatPr;
using TFloatFloatPrVec = CQuantileSketch::TFloatFloatPrVec;
//! \brief An iterator over just the unique knot values.
// clang-format off
class CUniqueIterator : private boost::addable2<CUniqueIterator, std::ptrdiff_t,
boost::subtractable2<CUniqueIterator, std::ptrdiff_t,
boost::equality_comparable<CUniqueIterator>>> {
// clang-format on
public:
CUniqueIterator(TFloatFloatPrVec& knots, std::size_t i)
: m_Knots(&knots), m_I(i) {}
bool operator==(const CUniqueIterator& rhs) const {
return m_I == rhs.m_I && m_Knots == rhs.m_Knots;
}
TFloatFloatPr& operator*() const { return (*m_Knots)[m_I]; }
TFloatFloatPr* operator->() const { return &(*m_Knots)[m_I]; }
const CUniqueIterator& operator++() {
CFloatStorage x{(*m_Knots)[m_I].first};
std::ptrdiff_t n{static_cast<std::ptrdiff_t>(m_Knots->size())};
while (++m_I < n && (*m_Knots)[m_I].first == x) {
}
return *this;
}
const CUniqueIterator& operator--() {
CFloatStorage x{(*m_Knots)[m_I].first};
while (--m_I >= 0 && (*m_Knots)[m_I].first == x) {
}
return *this;
}
const CUniqueIterator& operator+=(std::ptrdiff_t i) {
while (--i >= 0) {
this->operator++();
}
return *this;
}
const CUniqueIterator& operator-=(std::ptrdiff_t i) {
while (--i >= 0) {
this->operator--();
}
return *this;
}
std::ptrdiff_t index() const { return m_I; }
private:
TFloatFloatPrVec* m_Knots;
std::ptrdiff_t m_I;
};
std::ptrdiff_t previousDifferent(TFloatFloatPrVec& knots, std::size_t index) {
CUniqueIterator previous{knots, index};
--previous;
return previous.index();
}
std::ptrdiff_t nextDifferent(TFloatFloatPrVec& knots, std::size_t index) {
CUniqueIterator next{knots, index};
++next;
return next.index();
}
std::size_t fastSketchSize(double reductionFactor, std::size_t size) {
size = static_cast<std::size_t>(
static_cast<double>(size) * CQuantileSketch::REDUCTION_FACTOR / reductionFactor + 0.5);
return size + (3 - (size + 1) % 3) % 3;
}
const auto EPS = static_cast<double>(std::numeric_limits<float>::epsilon());
const core::TPersistenceTag UNSORTED_TAG{"a", "unsorted"};
const core::TPersistenceTag KNOTS_TAG{"b", "knots"};
const core::TPersistenceTag COUNT_TAG{"c", "count"};
}
CQuantileSketch::CQuantileSketch(const TFloatVec& centres, const TFloatVec& counts)
: m_MaxSize{std::max(2 * centres.size(), MINIMUM_MAX_SIZE)},
m_Count{std::accumulate(counts.begin(), counts.end(), 0.0)} {
m_Knots.reserve(centres.size());
for (std::size_t i = 0; i < centres.size(); ++i) {
m_Knots.emplace_back(centres[i], counts[i]);
}
}
CQuantileSketch::CQuantileSketch(std::size_t size)
: m_MaxSize(std::max(size, MINIMUM_MAX_SIZE)) {
m_Knots.reserve(m_MaxSize + 1);
}
CQuantileSketch::~CQuantileSketch() = default;
bool CQuantileSketch::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(UNSORTED_TAG, m_Unsorted)
RESTORE(KNOTS_TAG, core::CPersistUtils::fromString(traverser.value(), m_Knots))
RESTORE_BUILT_IN(COUNT_TAG, m_Count)
} while (traverser.next());
return true;
}
void CQuantileSketch::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(UNSORTED_TAG, m_Unsorted);
inserter.insertValue(KNOTS_TAG, core::CPersistUtils::toString(m_Knots));
inserter.insertValue(COUNT_TAG, m_Count, core::CIEEE754::E_SinglePrecision);
}
const CQuantileSketch& CQuantileSketch::operator+=(const CQuantileSketch& rhs) {
m_Knots.insert(m_Knots.end(), rhs.m_Knots.begin(), rhs.m_Knots.end());
m_Unsorted = m_Knots.size();
m_Count += rhs.m_Count;
this->reduce(m_MaxSize + 1);
m_Knots.shrink_to_fit();
LOG_TRACE(<< "knots = " << m_Knots);
return *this;
}
void CQuantileSketch::add(double x, double n) {
++m_Unsorted;
m_Knots.emplace_back(x, n);
m_Count += n;
if (m_Knots.size() > m_MaxSize) {
this->fastReduce();
}
}
void CQuantileSketch::age(double factor) {
for (auto& knot : m_Knots) {
knot.second *= factor;
}
m_Count *= factor;
}
bool CQuantileSketch::cdf(double x_, double& result, TOptionalInterpolation interpolation) const {
result = 0.0;
if (m_Knots.empty()) {
LOG_ERROR(<< "No values added to quantile sketch");
return false;
}
if (m_Unsorted > 0 || m_Knots.size() > this->fastReduceTargetSize()) {
// It is critical to match the size selected by the call to reduce in
// the add method or the unmerged values can cause issues estimating
// the cdf close to 0 or 1.
const_cast<CQuantileSketch*>(this)->reduce(this->fastReduceTargetSize());
}
CFloatStorage x = x_;
std::ptrdiff_t n = m_Knots.size();
if (n == 1) {
result = x < m_Knots[0].first ? 0.0 : (x > m_Knots[0].first ? 1.0 : 0.5);
return true;
}
std::ptrdiff_t k = std::lower_bound(m_Knots.begin(), m_Knots.end(), x,
COrderings::SFirstLess()) -
m_Knots.begin();
LOG_TRACE(<< "k = " << k);
// This must make the same assumptions as quantile regarding the distribution
// of values for each histogram bucket. See that function for more details.
switch (interpolation.value_or(this->cdfAndQuantileInterpolation())) {
case E_Linear: {
if (k == 0) {
double xl = m_Knots[0].first;
double xr = m_Knots[1].first;
double f = m_Knots[0].second / m_Count;
LOG_TRACE(<< "xl = " << xl << ", xr = " << xr << ", f = " << f);
result = f * std::max(x - 1.5 * xl + 0.5 * xr, 0.0) / (xr - xl);
} else if (k == n) {
double xl = m_Knots[n - 2].first;
double xr = m_Knots[n - 1].first;
double f = m_Knots[n - 1].second / m_Count;
LOG_TRACE(<< "xl = " << xl << ", xr = " << xr << ", f = " << f);
result = 1.0 - f * std::max(1.5 * xr - 0.5 * xl - x, 0.0) / (xr - xl);
} else {
double xl = m_Knots[k - 1].first;
double xr = m_Knots[k].first;
bool left = (2 * k < n);
bool loc = (2.0 * x < xl + xr);
double partial = 0.0;
for (std::ptrdiff_t i = left ? 0 : (loc ? k : k + 1),
m = left ? (loc ? k - 1 : k) : n;
i < m; ++i) {
partial += m_Knots[i].second;
}
partial = (left ? (partial + (loc ? 1.0 * m_Knots[k - 1].second : 0.0))
: (partial + (loc ? 0.0 : 1.0 * m_Knots[k].second))) /
m_Count;
double dn{0.5 * m_Knots[loc ? k - 1 : k].second / m_Count};
LOG_TRACE(<< "left = " << left << ", loc = " << loc << ", partial = " << partial
<< ", xl = " << xl << ", xr = " << xr << ", dn = " << dn);
result = left ? partial + dn * (2.0 * x - xl - xr) / (xr - xl)
: 1.0 - partial - dn * (xl + xr - 2.0 * x) / (xr - xl);
LOG_TRACE(<< "result = " << result << " "
<< dn * (2.0 * x - xl - xr) / (xr - xl));
}
return true;
}
case E_PiecewiseConstant: {
if (k == 0) {
double f = m_Knots[0].second / m_Count;
result = x < m_Knots[0].first ? 0.0 : 0.5 * f;
} else if (k == n) {
double f = m_Knots[n - 1].second / m_Count;
result = x > m_Knots[0].first ? 1.0 : 1.0 - 0.5 * f;
} else {
bool left = (2 * k < n);
double partial = x < m_Knots[0].first ? 0.0 : 0.5 * m_Knots[0].second;
for (std::ptrdiff_t i = left ? 0 : k + 1, m = left ? k : n; i < m; ++i) {
partial += m_Knots[i].second;
}
partial /= m_Count;
LOG_TRACE(<< "left = " << left << ", partial = " << partial);
result = left ? partial : 1.0 - partial;
}
return true;
}
}
return true;
}
bool CQuantileSketch::minimum(double& result) const {
if (m_Knots.empty()) {
LOG_ERROR(<< "No values added to quantile sketch");
return false;
}
result = m_Knots[0].first;
return true;
}
bool CQuantileSketch::maximum(double& result) const {
if (m_Knots.empty()) {
LOG_ERROR(<< "No values added to quantile sketch");
return false;
}
result = m_Knots.back().first;
return true;
}
bool CQuantileSketch::mad(double& result) const {
if (m_Knots.empty()) {
LOG_ERROR(<< "No values added to quantile sketch");
return false;
}
double median;
quantile(E_Linear, m_Knots, m_Count, 50.0, median);
LOG_TRACE(<< "median = " << median);
TFloatFloatPrVec knots(m_Knots);
std::for_each(knots.begin(), knots.end(), [median](TFloatFloatPr& knot) {
knot.first = std::fabs(knot.first - median);
});
std::sort(knots.begin(), knots.end(), COrderings::SFirstLess());
LOG_TRACE(<< "knots = " << knots);
quantile(E_Linear, knots, m_Count, 50.0, result);
return true;
}
bool CQuantileSketch::quantile(double percentage,
double& result,
TOptionalInterpolation interpolation) const {
if (m_Knots.empty()) {
LOG_ERROR(<< "No values added to quantile sketch");
return false;
}
if (m_Unsorted > 0 || m_Knots.size() > this->fastReduceTargetSize()) {
// It is critical to match the size selected by the call to reduce in
// the add method or the unmerged values can cause issues estimating
// percentiles close to 0 or 100.
const_cast<CQuantileSketch*>(this)->reduce(this->fastReduceTargetSize());
}
if (percentage < 0.0 || percentage > 100.0) {
LOG_ERROR(<< "Invalid percentile " << percentage);
return false;
}
quantile(interpolation.value_or(this->cdfAndQuantileInterpolation()),
m_Knots, m_Count, percentage, result);
return true;
}
double CQuantileSketch::count() const {
return m_Count;
}
std::uint64_t CQuantileSketch::checksum(std::uint64_t seed) const {
seed = CChecksum::calculate(seed, m_MaxSize);
seed = CChecksum::calculate(seed, m_Knots);
return CChecksum::calculate(seed, m_Count);
}
void CQuantileSketch::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CQuantileSketch");
core::memory_debug::dynamicSize("m_Knots", m_Knots, mem);
}
std::size_t CQuantileSketch::memoryUsage() const {
return core::memory::dynamicSize(m_Knots);
}
std::size_t CQuantileSketch::staticSize() const {
return sizeof(*this);
}
bool CQuantileSketch::checkInvariants() const {
if (m_Knots.size() > m_MaxSize) {
LOG_ERROR(<< "Too many knots: " << m_Knots.size() << " > " << m_MaxSize);
return false;
}
if (m_Unsorted > m_Knots.size()) {
LOG_ERROR(<< "Invalid unsorted count: " << m_Unsorted << "/" << m_Knots.size());
return false;
}
if (std::is_sorted(m_Knots.begin(), m_Knots.end() - m_Unsorted) == false) {
LOG_ERROR(<< "Unordered knots: "
<< core::CContainerPrinter::print(m_Knots.begin(), m_Knots.end() - m_Unsorted));
return false;
}
for (std::size_t i = 1; i + m_Unsorted < m_Knots.size(); ++i) {
if (m_Knots[i].first == m_Knots[i - 1].first) {
LOG_ERROR(<< "Duplicate values: " << m_Knots[i - 1] << " and " << m_Knots[i]);
return false;
}
}
double count = 0.0;
for (const auto& knot : m_Knots) {
count += knot.second;
}
if (std::fabs(m_Count - count) > 10.0 * EPS * m_Count) {
LOG_ERROR(<< "Count mismatch: error " << std::fabs(m_Count - count) << "/" << m_Count);
return false;
}
return true;
}
std::string CQuantileSketch::print() const {
return core::CContainerPrinter::print(m_Knots);
}
void CQuantileSketch::quantile(EInterpolation interpolation,
const TFloatFloatPrVec& knots,
double count,
double percentage,
double& result) {
// For linear interpolation we have to make some assumptions about how the
// merged bucket values are distributed.
//
// We make the following assumptions:
// 1. The bucket centre bisects (in weight) the values it represents,
// 2. The bucket start and end point are the midpoints between the bucket
// centre and the preceding and succeeding bucket centres, respectively,
// 3. Values are uniformly distributed on the intervals between the bucket
// endpoints and its centre.
//
// Assumption 1 is consistent with how bucket centres are computed on merge
// and assumption 2 is consistent with how we decide to merge buckets. This
// scheme also has the highly desireable property that if the sketch contains
// the raw data, i.e. that the number of distinct values is less than the
// sketch size, it computes quantiles exactly.
std::size_t n = knots.size();
percentage /= 100.0;
double partial = 0.0;
double cutoff = percentage * count;
for (std::size_t i = 0; i < n; ++i) {
partial += knots[i].second;
if (partial >= cutoff - count * EPS) {
switch (interpolation) {
case E_Linear:
if (n == 1) {
result = knots[0].first;
} else {
double xa = i == 0 ? 2.0 * knots[0].first - knots[1].first
: static_cast<double>(knots[i - 1].first);
double xb = knots[i].first;
double xc = i + 1 == n ? 2.0 * xb - xa
: static_cast<double>(knots[i + 1].first);
double nb = knots[i].second;
partial -= 0.5 * nb;
result = xb + (cutoff - partial) * (cutoff - partial < 0.0
? (xb - xa) / nb
: (xc - xb) / nb);
}
return;
case E_PiecewiseConstant:
if (i + 1 == n || partial > cutoff + count * EPS) {
result = knots[i].first;
} else {
result = (knots[i].first + knots[i + 1].first) / 2.0;
}
return;
}
}
}
result = knots[n - 1].first;
}
CQuantileSketch::EInterpolation CQuantileSketch::cdfAndQuantileInterpolation() const {
// If the number of knots is less than the target size for reduce we must
// never have combined any distinct values into a single bucket and the
// quantile and empircal cdf are computed exactly using piecewise constant
// interpolation.
return m_Knots.size() < this->fastReduceTargetSize() ? E_PiecewiseConstant : E_Linear;
}
std::size_t CQuantileSketch::fastReduceTargetSize() const {
return static_cast<std::size_t>(REDUCTION_FACTOR * static_cast<double>(m_MaxSize) + 1.0);
}
void CQuantileSketch::fastReduce() {
this->reduce(this->fastReduceTargetSize());
}
void CQuantileSketch::reduce(std::size_t target) {
this->order();
this->deduplicate(m_Knots.begin(), m_Knots.end());
if (m_Knots.size() > target) {
TFloatFloatPrVec mergeCosts;
TSizeVec mergeCandidates;
TBoolVec stale(m_Knots.size(), false);
mergeCosts.reserve(m_Knots.size());
mergeCandidates.reserve(m_Knots.size());
CPRNG::CXorOShiro128Plus rng(static_cast<std::uint64_t>(m_Count));
std::uniform_real_distribution<double> u01{0.0, 1.0};
for (std::size_t i = 0; i + 3 < m_Knots.size(); ++i) {
mergeCosts.emplace_back(mergeCost(m_Knots[i + 1], m_Knots[i + 2]), u01(rng));
mergeCandidates.push_back(i);
}
LOG_TRACE(<< "merge costs = " << mergeCosts);
this->reduceWithSuppliedCosts(target, mergeCosts, mergeCandidates, stale);
}
}
void CQuantileSketch::reduceWithSuppliedCosts(std::size_t target,
TFloatFloatPrVec& mergeCosts,
TSizeVec& mergeCandidates,
TBoolVec& stale) {
auto mergeCostGreater = [&mergeCosts](std::size_t lhs, std::size_t rhs) {
return COrderings::lexicographicalCompare(
-mergeCosts[lhs].first, mergeCosts[lhs].second,
-mergeCosts[rhs].first, mergeCosts[rhs].second);
};
std::make_heap(mergeCandidates.begin(), mergeCandidates.end(), mergeCostGreater);
std::size_t numberToMerge{m_Knots.size() - target};
while (numberToMerge > 0) {
LOG_TRACE(<< "merge candidates = " << mergeCandidates);
std::size_t l{mergeCandidates.front() + 1};
std::pop_heap(mergeCandidates.begin(), mergeCandidates.end(), mergeCostGreater);
mergeCandidates.pop_back();
LOG_TRACE(<< "stale = " << stale);
if (stale[l] == false) {
std::size_t r{static_cast<std::size_t>(nextDifferent(m_Knots, l))};
LOG_TRACE(<< "Merging " << l << " and " << r
<< ", cost = " << mergeCosts[l - 1].first);
// Note that mergeCosts[l - 1].second isn't truly random because
// it is used for selecting the merge order, but it's good enough.
auto mergedKnot = this->mergedKnot(l, r);
// Find the points that have been merged with xl and xr if any.
std::ptrdiff_t ll{previousDifferent(m_Knots, l)};
std::ptrdiff_t rr{nextDifferent(m_Knots, r) - 1};
std::fill_n(m_Knots.begin() + ll + 1, rr - ll, mergedKnot);
stale[ll] = true;
stale[rr] = true;
--numberToMerge;
LOG_TRACE(<< "merged = " << mergedKnot);
LOG_TRACE(<< "right = " << m_Knots[rr + 1]);
} else {
CUniqueIterator ll(m_Knots, l);
CUniqueIterator rr{ll};
++rr;
mergeCosts[l - 1].first = mergeCost(*ll, *rr);
mergeCandidates.push_back(l - 1);
std::push_heap(mergeCandidates.begin(), mergeCandidates.end(), mergeCostGreater);
stale[l] = false;
}
}
m_Knots.erase(std::unique(m_Knots.begin(), m_Knots.end()), m_Knots.end());
LOG_TRACE(<< "final = " << m_Knots);
}
void CQuantileSketch::order() {
if (m_Unsorted > 0) {
std::sort(m_Knots.end() - m_Unsorted, m_Knots.end(), COrderings::SFirstLess());
// Deduplicate points before merging.
std::size_t removed{m_Knots.size()};
this->deduplicate(m_Knots.end() - m_Unsorted, m_Knots.end());
removed -= m_Knots.size();
m_Unsorted -= removed;
std::inplace_merge(m_Knots.begin(), m_Knots.end() - m_Unsorted,
m_Knots.end(), COrderings::SFirstLess());
}
m_Unsorted = 0;
}
void CQuantileSketch::deduplicate(TFloatFloatPrVecItr begin, TFloatFloatPrVecItr end) {
// Deduplicate new values.
for (auto i = begin + 1; i <= end; ++i, ++begin) {
if (begin != i - 1) {
*begin = *(i - 1);
}
CFloatStorage x{begin->first};
for (/**/; i != end && i->first == x; ++i) {
begin->second += i->second;
}
}
m_Knots.erase(begin, end);
LOG_TRACE(<< "de-duplicated = " << m_Knots);
}
CQuantileSketch::TFloatFloatPr CQuantileSketch::mergedKnot(std::size_t l, std::size_t r) const {
auto[xl, nl] = m_Knots[l];
auto[xr, nr] = m_Knots[r];
return {(nl * xl + nr * xr) / (nl + nr), nl + nr};
}
double CQuantileSketch::mergeCost(const TFloatFloatPr& l, const TFloatFloatPr& r) {
// Interestingly, minimizing the approximation error (area between
// curve before and after merging) produces good summary for the
// piecewise constant objective, but a very bad summary for the linear
// objective. Basically, an empirically good strategy is to target
// the piecewise objective when sketching and then perform unbiased
// linear interpolation by linearly interpolating between the mid-
// points of the steps when computing quantiles.
auto[xl, nl] = l;
auto[xr, nr] = r;
return std::min(nl, nr) * (xr - xl);
}
CFastQuantileSketch::CFastQuantileSketch(std::size_t size,
CPRNG::CXorOShiro128Plus rng,
TOptionalDouble reductionFraction)
: CQuantileSketch{fastSketchSize(reductionFraction.value_or(REDUCTION_FACTOR), size)},
m_ReductionFactor{reductionFraction.value_or(REDUCTION_FACTOR)} {
size = this->maxSize();
LOG_TRACE(<< "size = " << size);
m_Tiebreakers.resize(size + 1);
m_MergeCosts.reserve(size - 1);
m_MergeCandidates.reserve(size - 2);
std::uniform_real_distribution<double> u01{0.0, 1.0};
std::generate_n(m_Tiebreakers.begin(), size + 1, [&] { return u01(rng); });
}
std::uint64_t CFastQuantileSketch::checksum(std::uint64_t seed) const {
seed = this->CQuantileSketch::checksum(seed);
return CChecksum::calculate(seed, m_ReductionFactor);
}
void CFastQuantileSketch::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CFastQuantileSketch");
core::memory_debug::dynamicSize("m_Knots", this->knots(), mem);
core::memory_debug::dynamicSize("m_Tiebreakers", m_Tiebreakers, mem);
core::memory_debug::dynamicSize("m_MergeCosts", m_MergeCosts, mem);
core::memory_debug::dynamicSize("m_MergeCandidates", m_MergeCandidates, mem);
}
std::size_t CFastQuantileSketch::memoryUsage() const {
std::size_t mem{this->CQuantileSketch::memoryUsage()};
mem += core::memory::dynamicSize(m_MergeCosts);
mem += core::memory::dynamicSize(m_Tiebreakers);
mem += core::memory::dynamicSize(m_MergeCandidates);
return mem;
}
std::size_t CFastQuantileSketch::staticSize() const {
return sizeof(*this);
}
std::size_t CFastQuantileSketch::fastReduceTargetSize() const {
return static_cast<std::size_t>(
m_ReductionFactor * static_cast<double>(this->maxSize()) + 1.0);
}
void CFastQuantileSketch::fastReduce() {
this->order();
if (this->knots().size() > this->fastReduceTargetSize()) {
TFloatFloatPrVec knots{std::move(this->knots())};
std::size_t numberToMerge{knots.size() - this->fastReduceTargetSize()};
this->computeMergeCosts(knots);
this->computeMergeCandidates(numberToMerge);
m_MergeCandidates[numberToMerge] = static_cast<std::uint32_t>(knots.size() - 1);
LOG_TRACE(<< "merge candidates = " << m_MergeCandidates);
LOG_TRACE(<< "knots before merge = " << knots);
std::uint32_t back{m_MergeCandidates[0] + 1};
std::uint32_t next{back};
for (std::size_t i = 1; i <= numberToMerge; ++i) {
std::uint32_t l{next};
std::uint32_t r{l + 1};
for (next = m_MergeCandidates[i] + 1; next == r;
next = m_MergeCandidates[++i] + 1, ++r) {
}
LOG_TRACE(<< "left = " << l << ", right = " << r << ", next = " << next);
CFloatStorage centre{0.0};
CFloatStorage count{0.0};
for (std::uint32_t j = l; j <= r; ++j) {
auto[x, n] = knots[j];
centre += n * x;
count += n;
}
centre /= count;
knots[back] = TFloatFloatPr{centre, count};
LOG_TRACE(<< "merged knot = " << knots[back]);
for (std::size_t j = back + 1, k = r + 1; k < next; ++j, ++k) {
knots[j] = knots[k];
}
back += next - r;
LOG_TRACE(<< "knots = " << knots << ", back = " << back);
}
knots.resize(back);
LOG_TRACE(<< "knots after merge = " << knots);
this->knots() = std::move(knots);
}
}
void CFastQuantileSketch::computeMergeCosts(TFloatFloatPrVec& knots) {
// This is equivalent to:
//
// for (std::size_t i = 0; i + 3 < knots.size(); ++i) {
// m_MergeCosts[i] = mergeCost(knots[i + 1], knots[i + 2]);
// }
//
// If we let the compiler do its thing on this it works out about 3X slower
// than the following vectorised version. Note that the latency of the loads
// are the dominant cost so we choose to compute three values for two loads
// rather than four values for three loads.
// The following loop requires that knots size is a multiple of three.
std::size_t n{knots.size()};
knots.resize(3 * ((knots.size() + 2) / 3));
m_MergeCosts.resize(knots.size() - 2);
for (std::size_t i = 0; i + 3 < knots.size(); i += 3) {
auto knots12 = ml_unaligned_load_128(&knots[i + 1].first.cstorage());
auto knots34 = ml_unaligned_load_128(&knots[i + 3].first.cstorage());
auto centres1 = ml_shuffle_128(knots12, knots34, ml_shuffle_mask(2, 0, 2, 0));
auto counts1 = ml_shuffle_128(knots12, knots34, ml_shuffle_mask(3, 1, 3, 1));
auto centres2 = ml_rotate_128(centres1);
auto counts2 = ml_rotate_128(counts1);
auto countsMin = ml_minimum_128(counts1, counts2);
auto centresDiff = ml_subtract_128(centres2, centres1);
auto costs = ml_multiply_128(countsMin, centresDiff);
ml_unaligned_store_128(&m_MergeCosts[i].storage(), costs);
}
m_MergeCosts.resize(n - 3);
knots.resize(n);
LOG_TRACE(<< "merge costs = " << m_MergeCosts << ", merge candidates = " << m_MergeCandidates);
}
void CFastQuantileSketch::computeMergeCandidates(std::size_t numberToMerge) {
// This is pseudo greedy unlike CQuantileSketch which is greedy: we
// simply ignore the fact that we have slightly different costs after
// each merge. This allows us to avoid using a heap altogether which
// dominates the computational cost of reduce.
m_MergeCandidates.resize(m_MergeCosts.size());
std::iota(m_MergeCandidates.begin(), m_MergeCandidates.end(), 0);
std::nth_element(m_MergeCandidates.begin(), m_MergeCandidates.begin() + numberToMerge,
m_MergeCandidates.end(), [&](auto lhs, auto rhs) {
return m_MergeCosts[lhs] == m_MergeCosts[rhs]
? m_Tiebreakers[lhs] < m_Tiebreakers[rhs]
: m_MergeCosts[lhs] < m_MergeCosts[rhs];
});
std::sort(m_MergeCandidates.begin(), m_MergeCandidates.begin() + numberToMerge);
}
}
}
}