lib/maths/common/CQDigest.cc (792 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <maths/common/CQDigest.h>
#include <core/CLogger.h>
#include <core/CStatePersistInserter.h>
#include <core/CStateRestoreTraverser.h>
#include <core/RestoreMacros.h>
#include <maths/common/CChecksum.h>
#include <boost/math/distributions/beta.hpp>
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_int_distribution.hpp>
#include <algorithm>
#include <cmath>
#include <functional>
#include <iterator>
#include <limits>
#include <sstream>
#include <tuple>
namespace ml {
namespace maths {
namespace common {
namespace {
std::string EMPTY_STRING;
}
const std::string CQDigest::K_TAG("a");
const std::string CQDigest::N_TAG("b");
const std::string CQDigest::NODE_TAG("c");
CQDigest::CQDigest(std::uint64_t k, double decayRate)
: m_K(k), m_N(0), m_Root(nullptr),
m_NodeAllocator(static_cast<std::size_t>(3 * m_K + 2)), m_DecayRate(decayRate) {
m_Root = &m_NodeAllocator.create(CNode(0, 1, 0, 0));
}
void CQDigest::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(K_TAG, m_K);
inserter.insertValue(N_TAG, m_N);
// Note the tree is serialized flat in pre-order.
m_Root->persistRecursive(NODE_TAG, inserter);
}
bool CQDigest::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
std::size_t nodeCount = 0;
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(K_TAG, m_K)
RESTORE_BUILT_IN(N_TAG, m_N)
if (name == NODE_TAG) {
CNode node;
if (traverser.traverseSubLevel([&node](auto& traverser_) {
return node.acceptRestoreTraverser(traverser_);
}) == false) {
LOG_ERROR(<< "Failed to restore NODE_TAG, got " << traverser.value());
traverser.setBadState();
}
if (nodeCount++ == 0) {
m_Root = &m_NodeAllocator.create(node);
} else {
m_Root->insert(m_NodeAllocator, node);
}
continue;
}
} while (traverser.next());
this->checkRestoredInvariants();
return true;
}
void CQDigest::checkRestoredInvariants() const {
VIOLATES_INVARIANT_NO_EVALUATION(m_Root, ==, nullptr);
// This check on invariants is proving unreliable as it
// fails on occasion, see ml-cpp#1728 for details.
// Disabling the check pending investigation.
// if (this->checkInvariants() == false) {
// LOG_ABORT(<< "Invariance check failed for Q Digest");
// }
}
void CQDigest::add(std::uint32_t value, std::uint64_t n) {
LOG_TRACE(<< "Adding = " << value);
m_N += n;
CNode* expanded = m_Root->expand(m_NodeAllocator, value);
if (expanded != nullptr) {
m_Root = expanded;
}
// If we already have the leaf node then incrementing a
// leaf node count can't cause us to violate any constraints
// so there is no need to compress unless we incremented
// floor(n/k), in which case we need to compress the whole
// tree. Otherwise, we can get away with just compressing
// the path from the leaf to the root.
CNode& leaf = m_Root->insert(m_NodeAllocator, CNode(value, value, n, n));
if ((expanded != nullptr) || (m_N / m_K) != ((m_N - n) / m_K)) {
// Compress the whole tree.
this->compress();
} else if (leaf.count() == n) {
// Compress the path to the new leaf.
TNodePtrVec compress(1u, &leaf);
this->compress(compress);
}
}
void CQDigest::merge(const CQDigest& digest) {
TNodePtrVec nodes;
digest.m_Root->postOrder(nodes);
CNode* expanded = m_Root->expand(m_NodeAllocator, digest.m_Root->max());
if (expanded != nullptr) {
m_Root = expanded;
}
for (const auto& node : nodes) {
m_N += node->count();
m_Root->insert(m_NodeAllocator, *node);
}
// Compress the whole tree.
this->compress();
}
void CQDigest::propagateForwardsByTime(double time) {
if (time < 0.0) {
LOG_ERROR(<< "Can't propagate quantiles backwards in time");
return;
}
double alpha = std::exp(-m_DecayRate * time);
m_N = m_Root->age(alpha);
// Compress the whole tree.
this->compress();
}
bool CQDigest::scale(double factor) {
using TUInt32UInt32UInt64Tr = std::tuple<std::uint32_t, std::uint32_t, std::uint64_t>;
using TUInt32UInt32UInt64TrVec = std::vector<TUInt32UInt32UInt64Tr>;
if (factor <= 0.0) {
LOG_ERROR(<< "Scaling factor must be positive");
return false;
}
if (factor == 1.0) {
// Nothing to do.
return true;
}
if (m_N == 0) {
// Nothing to do.
return true;
}
// Get a sketch of the current q-digest.
TNodePtrVec nodes;
m_Root->postOrder(nodes);
std::sort(nodes.begin(), nodes.end(), SLevelLess());
TUInt32UInt32UInt64TrVec sketch;
sketch.reserve(nodes.size());
for (const auto& node : nodes) {
sketch.emplace_back(node->min(), node->max(), node->count());
}
// Start again from scratch.
this->clear();
// Reinsert the scaled summary values.
boost::random::mt11213b generator;
for (std::size_t i = 0; i < sketch.size(); ++i) {
const TUInt32UInt32UInt64Tr& node = sketch[i];
std::uint32_t min = std::get<0>(node);
std::uint32_t max = std::get<1>(node);
std::uint32_t span = max - min + 1;
std::uint64_t count = std::get<2>(node) / span;
std::uint64_t remainder = std::get<2>(node) - count * span;
LOG_TRACE(<< "min = " << min << ", max = " << max
<< ", count = " << count << ", remainder = " << remainder);
if (count > 0) {
for (std::uint32_t j = 0; j < span; ++j) {
this->add(static_cast<std::uint32_t>(
factor * static_cast<double>(min + j) + 0.5),
count);
}
}
if (remainder > 0) {
boost::random::uniform_int_distribution<std::uint32_t> uniform(0, span - 1);
for (std::uint64_t j = 0; j < remainder; ++j) {
this->add(static_cast<std::uint32_t>(
factor * static_cast<double>(min + uniform(generator)) + 0.5));
}
}
}
return true;
}
void CQDigest::clear() {
// Release all current nodes.
TNodePtrVec nodes;
m_Root->postOrder(nodes);
for (const auto& node : nodes) {
m_N -= node->count();
}
// Reset root to its initial state and sanity check total count.
m_Root = &m_NodeAllocator.create(CNode(0, 1, 0, 0));
if (m_N != 0) {
LOG_ERROR(<< "Inconsistency - sum of node counts did not equal N");
m_N = 0;
}
}
bool CQDigest::quantile(double q, std::uint32_t& result) const {
result = 0;
if (m_N == 0) {
LOG_ERROR(<< "Can't compute quantiles on empty set");
return false;
}
// Compute the count fraction we need to the left of the value.
auto n = static_cast<std::uint64_t>(q * static_cast<double>(m_N) + 0.5);
result = m_Root->quantile(0, n);
return true;
}
bool CQDigest::quantileSublevelSetSupremum(double f, std::uint32_t& result) const {
result = 0;
if (m_N == 0) {
LOG_ERROR(<< "Can't compute level set for empty set");
return false;
}
if (f <= 0.0) {
m_Root->sublevelSetSupremum(-1, result);
return true;
}
if (f > 1.0) {
m_Root->superlevelSetInfimum(m_Root->max() + 1, result);
return true;
}
auto n = static_cast<std::uint64_t>(f * static_cast<double>(m_N) + 0.5);
m_Root->quantileSublevelSetSupremum(n, 0, result);
return true;
}
double CQDigest::cdfQuantile(double n, double p, double q) {
if (q == 0.5) {
return p;
}
// This accounts for the fact that we have a finite sample size
// when computing the fraction f corresponding to a c.d.f value
// of p.
//
// We use a Bayesian approach. The count of events with fraction
// f is binomially distributed. Therefore, if we assume a non-
// informative beta prior we can get a posterior distribution
// for the true fraction simply by setting:
// alpha = alpha + p * n
// beta = beta + n * (1 - p).
static const double ONE_THIRD = 1.0 / 3.0;
try {
double a = n * p + ONE_THIRD;
double b = n * (1.0 - p) + ONE_THIRD;
boost::math::beta_distribution<> beta(a, b);
return boost::math::quantile(beta, q);
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to calculate c.d.f. quantile: " << e.what()
<< ", n = " << n << ", p = " << p << ", q = " << q);
}
return p;
}
bool CQDigest::cdf(std::uint32_t x, double confidence, double& lowerBound, double& upperBound) const {
lowerBound = 0.0;
upperBound = 0.0;
if (m_N == 0) {
LOG_ERROR(<< "Can't compute c.d.f. for empty set");
return false;
}
std::uint64_t l = 0;
m_Root->cdfLowerBound(x, l);
lowerBound = static_cast<double>(l) / static_cast<double>(m_N);
if (confidence > 0.0) {
lowerBound = cdfQuantile(static_cast<double>(m_N), lowerBound,
(100.0 - confidence) / 200.0);
}
std::uint64_t u = 0;
m_Root->cdfUpperBound(x, u);
upperBound = static_cast<double>(u) / static_cast<double>(m_N);
if (confidence > 0.0) {
upperBound = cdfQuantile(static_cast<double>(m_N), upperBound,
(100.0 + confidence) / 200.0);
}
return true;
}
void CQDigest::pdf(std::uint32_t x, double confidence, double& lowerBound, double& upperBound) const {
lowerBound = 0.0;
upperBound = 0.0;
if (m_N == 0) {
return;
}
std::uint32_t infimum = 0;
m_Root->superlevelSetInfimum(x, infimum);
std::uint32_t supremum = std::numeric_limits<std::uint32_t>::max();
m_Root->sublevelSetSupremum(static_cast<std::int64_t>(x), supremum);
double infimumLowerBound;
double infimumUpperBound;
this->cdf(infimum, confidence, infimumLowerBound, infimumUpperBound);
double supremumLowerBound;
double supremumUpperBound;
this->cdf(supremum, confidence, supremumLowerBound, supremumUpperBound);
lowerBound = std::max(supremumLowerBound - infimumUpperBound, 0.0) /
std::max(static_cast<double>(supremum - infimum), 1.0);
upperBound = std::max(supremumUpperBound - infimumLowerBound, 0.0) /
std::max(static_cast<double>(supremum - infimum), 1.0);
LOG_TRACE(<< "x = " << x << ", supremum = " << supremum
<< ", infimum = " << infimum << ", cdf(supremum) = ["
<< supremumLowerBound << "," << supremumUpperBound << "]"
<< ", cdf(infimum) = [" << infimumLowerBound << "," << infimumUpperBound << "]"
<< ", pdf = [" << lowerBound << "," << upperBound << "]");
}
void CQDigest::sublevelSetSupremum(std::uint32_t x, std::uint32_t& result) const {
m_Root->sublevelSetSupremum(static_cast<std::int64_t>(x), result);
}
void CQDigest::superlevelSetInfimum(std::uint32_t x, std::uint32_t& result) const {
m_Root->superlevelSetInfimum(x, result);
}
void CQDigest::summary(TUInt32UInt64PrVec& result) const {
result.clear();
if (m_N == 0) {
return;
}
TNodePtrVec nodes;
m_Root->postOrder(nodes);
result.reserve(nodes.size());
std::uint32_t last = nodes[0]->max();
std::uint64_t count = nodes[0]->count();
for (std::size_t i = 1; i < nodes.size(); ++i) {
if (nodes[i]->max() != last) {
result.emplace_back(last, count);
last = nodes[i]->max();
}
count += nodes[i]->count();
}
// Check if any count is aligned with the root max.
if (result.empty() || result.back().second < count) {
result.emplace_back(m_Root->max(), count);
}
if (result.back().second != m_N) {
LOG_ERROR(<< "Got " << result.back().second << " expected " << m_N);
}
}
std::uint64_t CQDigest::n() const {
return m_N;
}
std::uint64_t CQDigest::k() const {
return m_K;
}
std::uint64_t CQDigest::checksum(std::uint64_t seed) const {
seed = CChecksum::calculate(seed, m_K);
seed = CChecksum::calculate(seed, m_N);
seed = CChecksum::calculate(seed, m_DecayRate);
TUInt32UInt64PrVec summary;
this->summary(summary);
return CChecksum::calculate(seed, summary);
}
bool CQDigest::checkInvariants() const {
// These are:
// 1) |Q| <= 3 * k.
// 2) Subtree count at the root = n
// 2) The node invariants are satisfied.
if (m_Root->size() > 3 * m_K) {
LOG_ERROR(<< "|Q| = " << m_Root->size() << " 3k = " << 3 * m_K);
return false;
}
if (m_Root->subtreeCount() != m_N) {
LOG_ERROR(<< "Bad count: " << m_Root->subtreeCount() << ", n = " << m_N);
return false;
}
return m_Root->checkInvariants(m_N / m_K);
}
std::string CQDigest::print() const {
std::ostringstream result;
TNodePtrVec nodes;
m_Root->postOrder(nodes);
result << m_N << " | " << m_K << " | {";
for (const auto& node : nodes) {
result << " \"" << node->print() << ',' << node->count() << ','
<< node->subtreeCount() << '"';
}
result << " }";
return result.str();
}
void CQDigest::compress() {
for (std::size_t i = 0; i < 3 * m_K + 2; ++i) {
TNodePtrVec compress;
m_Root->postOrder(compress);
if (!this->compress(compress)) {
return;
}
}
LOG_ERROR(<< "Failed to compress tree");
}
bool CQDigest::compress(TNodePtrVec& compress) {
bool compressed = false;
std::make_heap(compress.begin(), compress.end(), SLevelLess());
while (!compress.empty()) {
CNode& node = *compress.front();
std::pop_heap(compress.begin(), compress.end(), SLevelLess());
compress.pop_back();
if (CNode* parent = node.compress(m_NodeAllocator, m_N / m_K)) {
compressed = true;
compress.push_back(parent);
std::push_heap(compress.begin(), compress.end(), SLevelLess());
}
}
return compressed;
}
bool CQDigest::SLevelLess::operator()(const CNode* lhs, const CNode* rhs) const {
return lhs->span() > rhs->span() ||
(lhs->span() == rhs->span() && lhs->max() > rhs->max());
}
bool CQDigest::SPostLess::operator()(const CNode* lhs, const CNode* rhs) const {
return lhs->max() < rhs->max() ||
(lhs->max() == rhs->max() && lhs->span() < rhs->span());
}
const std::string CQDigest::CNode::MIN_TAG("a");
const std::string CQDigest::CNode::MAX_TAG("b");
const std::string CQDigest::CNode::COUNT_TAG("c");
CQDigest::CNode::CNode()
: m_Ancestor(nullptr), m_Descendants(), m_Min(0xDEADBEEF),
m_Max(0xDEADBEEF), m_Count(0xDEADBEEF), m_SubtreeCount(0xDEADBEEF) {
}
CQDigest::CNode::CNode(std::uint32_t min, std::uint32_t max, std::uint64_t count, std::uint64_t subtreeCount)
: m_Ancestor(nullptr), m_Descendants(), m_Min(min), m_Max(max),
m_Count(count), m_SubtreeCount(subtreeCount) {
}
std::size_t CQDigest::CNode::size() const {
std::size_t size = 1;
for (const auto& descendant : m_Descendants) {
size += descendant->size();
}
return size;
}
std::uint32_t CQDigest::CNode::quantile(std::uint64_t leftCount, std::uint64_t n) const {
// We need to find the smallest node in post-order where
// the left count is greater than n. At each level we visit
// the smallest, in post order, node in the q-digest for
// which the left count is greater than n. Terminating when
// this node doesn't have any descendants.
for (const auto& descendant : m_Descendants) {
std::uint64_t count = descendant->subtreeCount();
if (leftCount + count >= n) {
return descendant->quantile(leftCount, n);
}
leftCount += count;
}
return m_Max;
}
bool CQDigest::CNode::quantileSublevelSetSupremum(std::uint64_t n,
std::uint64_t leftCount,
std::uint32_t& result) const {
// We are looking for the right end of the rightmost node
// whose count together with those nodes to the left is
// is less than n.
if (leftCount + m_SubtreeCount < n) {
result = std::max(result, m_Max);
return true;
}
leftCount += m_SubtreeCount;
for (auto i = m_Descendants.rbegin(); i != m_Descendants.rend(); ++i) {
leftCount -= (*i)->subtreeCount();
if (leftCount + (*i)->count() < n &&
(*i)->quantileSublevelSetSupremum(n, leftCount, result)) {
break;
}
}
return false;
}
void CQDigest::CNode::cdfLowerBound(std::uint32_t x, std::uint64_t& result) const {
// The lower bound is the sum of the counts at the nodes
// for which the maximum value is less than or equal to x.
if (m_Max <= x) {
result += m_SubtreeCount;
} else {
for (const auto& descendant : m_Descendants) {
descendant->cdfLowerBound(x, result);
}
}
}
void CQDigest::CNode::cdfUpperBound(std::uint32_t x, std::uint64_t& result) const {
// The upper bound is the sum of the counts at the nodes
// for which the minimum value is less than or equal to x.
if (m_Max <= x) {
result += m_SubtreeCount;
} else if (m_Min <= x) {
result += m_Count;
for (const auto& descendant : m_Descendants) {
descendant->cdfUpperBound(x, result);
}
}
}
void CQDigest::CNode::sublevelSetSupremum(const std::int64_t x, std::uint32_t& result) const {
for (auto i = m_Descendants.rbegin(); i != m_Descendants.rend(); ++i) {
if (static_cast<std::int64_t>((*i)->max()) > x) {
result = std::min(result, (*i)->max());
} else {
(*i)->sublevelSetSupremum(x, result);
break;
}
}
if (static_cast<std::int64_t>(m_Max) > x && m_Count > 0) {
result = std::min(result, m_Max);
}
}
void CQDigest::CNode::superlevelSetInfimum(std::uint32_t x, std::uint32_t& result) const {
for (const auto& descendant : m_Descendants) {
if (descendant->max() < x) {
result = std::max(result, descendant->max());
} else {
descendant->superlevelSetInfimum(x, result);
break;
}
}
if (m_Max < x && m_Count > 0) {
result = std::max(result, m_Max);
}
}
void CQDigest::CNode::postOrder(TNodePtrVec& nodes) const {
for (const auto& descendant : m_Descendants) {
descendant->postOrder(nodes);
}
nodes.push_back(const_cast<CNode*>(this));
}
CQDigest::CNode* CQDigest::CNode::expand(CNodeAllocator& allocator,
const std::uint32_t& value) {
if (m_Max >= value) {
// No expansion necessary.
return nullptr;
}
CNode* result = m_Count == 0 ? this : &allocator.create(CNode(m_Min, m_Max, 0, 0));
std::uint32_t levelSpan = result->span();
do {
result->m_Max += levelSpan;
levelSpan <<= 1;
} while (result->m_Max < value);
if (result != this) {
m_Ancestor = result;
result->m_Descendants.push_back(this);
result->m_SubtreeCount += m_SubtreeCount;
}
return result;
}
CQDigest::CNode& CQDigest::CNode::insert(CNodeAllocator& allocator, const CNode& node) {
m_SubtreeCount += node.subtreeCount();
if (*this == node) {
m_Count += node.count();
return *this;
}
auto next = std::lower_bound(m_Descendants.begin(), m_Descendants.end(),
&node, SPostLess());
// If it exists the ancestor will be after the node
// in post order.
for (auto i = next; i != m_Descendants.end(); ++i) {
if ((*i)->isAncestor(node) || **i == node) {
return (*i)->insert(allocator, node);
}
}
// This is the lowest ancestor in the q-digest. Insert
// the node below it in post order and move descendants
// if necessary.
CNode& newNode = allocator.create(node);
newNode.m_Ancestor = this;
m_Descendants.insert(next, &newNode);
if (!newNode.isLeaf()) {
newNode.takeDescendants(*this);
}
return newNode;
}
CQDigest::CNode* CQDigest::CNode::compress(CNodeAllocator& allocator,
std::uint64_t compressionFactor) {
if (m_Ancestor == nullptr) {
// The node is no longer in the q-digest.
return nullptr;
}
// Warning this function zeros m_Ancestor copy up front.
CNode* ancestor = m_Ancestor;
// Get the sibling of this node if it exists.
CNode* sibling = ancestor->sibling(*this);
std::uint64_t count = (ancestor->isParent(*this) ? ancestor->count() : 0) +
this->count() + (sibling != nullptr ? sibling->count() : 0);
// Check if we should compress this node.
if (count >= compressionFactor) {
return nullptr;
}
if (ancestor->isParent(*this)) {
ancestor->m_Count = count;
this->detach(allocator);
if (sibling != nullptr) {
sibling->detach(allocator);
}
return ancestor;
}
// We'll recycle this node for the parent.
m_Count = count;
this->isLeftChild() ? m_Max += this->span() : m_Min -= this->span();
this->takeDescendants(*ancestor);
if (sibling != nullptr) {
sibling->detach(allocator);
}
return this;
}
std::uint64_t CQDigest::CNode::age(double factor) {
m_SubtreeCount = 0;
for (auto& descendant : m_Descendants) {
m_SubtreeCount += descendant->age(factor);
}
if (m_Count > 0) {
m_Count = static_cast<std::uint64_t>(
std::max(static_cast<double>(m_Count) * factor + 0.5, 1.0));
}
m_SubtreeCount += m_Count;
return m_SubtreeCount;
}
std::uint32_t CQDigest::CNode::span() const {
return m_Max - m_Min + 1;
}
std::uint32_t CQDigest::CNode::min() const {
return m_Min;
}
std::uint32_t CQDigest::CNode::max() const {
return m_Max;
}
const std::uint64_t& CQDigest::CNode::count() const {
return m_Count;
}
const std::uint64_t& CQDigest::CNode::subtreeCount() const {
return m_SubtreeCount;
}
void CQDigest::CNode::persistRecursive(const std::string& nodeTag,
core::CStatePersistInserter& inserter) const {
inserter.insertLevel(NODE_TAG, [this](auto& inserter_) {
this->acceptPersistInserter(inserter_);
});
// Note the tree is serialized flat in pre-order.
for (const auto& descendant : m_Descendants) {
descendant->persistRecursive(nodeTag, inserter);
}
}
void CQDigest::CNode::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(MIN_TAG, m_Min);
inserter.insertValue(MAX_TAG, m_Max);
inserter.insertValue(COUNT_TAG, m_Count);
}
bool CQDigest::CNode::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
do {
const std::string& name = traverser.name();
if (name == MIN_TAG) {
if (core::CStringUtils::stringToType(traverser.value(), m_Min) == false) {
LOG_ERROR(<< "Invalid min in " << traverser.value());
return false;
}
}
if (name == MAX_TAG) {
if (core::CStringUtils::stringToType(traverser.value(), m_Max) == false) {
LOG_ERROR(<< "Invalid max in " << traverser.value());
return false;
}
}
if (name == COUNT_TAG) {
if (core::CStringUtils::stringToType(traverser.value(), m_Count) == false) {
LOG_ERROR(<< "Invalid count in " << traverser.value());
return false;
}
m_SubtreeCount = m_Count;
}
} while (traverser.next());
return true;
}
bool CQDigest::CNode::checkInvariants(std::uint64_t compressionFactor) const {
// 1) span is a power of 2
// 2) q-digest connectivity is consistent.
// 3) subtree counts are consistent.
// 4) if node != leaf count(node) <= floor(n/k)
// 5) count(parent) + count(left) + count(right) > floor(n/k)
// Subtracting 1 will flip all the bits to the right of the last 1 in the
// current binary representation. If the span is a power of 2 then it will
// only have 1 bit set, so span minus 1 will have completely different bits
// set to span. Then an inclusive OR and an exclusive OR of span and span
// minus 1 will be identical. If span is not a power of 2 then subtracting
// 1 will leave some set bits set, meaning the OR and XOR give different
// results.
std::uint32_t span(this->span());
std::uint32_t spanMinusOne(span - 1);
if ((span | spanMinusOne) != (span ^ spanMinusOne)) {
LOG_ERROR(<< "Bad span: " << this->print());
return false;
}
SPostLess postLess;
std::uint64_t subtreeCount = m_Count;
for (std::size_t i = 0; i < m_Descendants.size(); ++i) {
if (m_Descendants[i]->m_Ancestor != this) {
LOG_ERROR(<< "Bad connectivity: " << this->print() << " -> "
<< m_Descendants[i]->print() << " <- "
<< m_Descendants[i]->m_Ancestor->print());
}
if (!this->isAncestor(*m_Descendants[i])) {
LOG_ERROR(<< "Bad connectivity: " << this->print() << " -> "
<< m_Descendants[i]->print());
return false;
}
if (i + 1u < m_Descendants.size() &&
!postLess(m_Descendants[i], m_Descendants[i + 1u])) {
LOG_ERROR(<< "Bad order: " << m_Descendants[i]->print()
<< " >= " << m_Descendants[i + 1u]->print());
return false;
}
if (!m_Descendants[i]->checkInvariants(compressionFactor)) {
return false;
}
subtreeCount += m_Descendants[i]->subtreeCount();
}
if (subtreeCount != m_SubtreeCount) {
LOG_ERROR(<< "Bad subtree count: expected " << subtreeCount << " got " << m_SubtreeCount);
return false;
}
if (!this->isLeaf() && !this->isRoot() && m_Count > compressionFactor) {
LOG_ERROR(<< "Bad count: " << m_Count << ", floor(n/k) = " << compressionFactor);
return false;
}
if (!this->isRoot()) {
const CNode* sibling = m_Ancestor->sibling(*this);
std::uint64_t count = m_Count + (sibling != nullptr ? sibling->count() : 0) +
(m_Ancestor->isParent(*this) ? m_Ancestor->count() : 0);
if (count < compressionFactor) {
LOG_ERROR(<< "Bad triple count: " << count << ", floor(n/k) = " << compressionFactor);
return false;
}
}
return true;
}
std::string CQDigest::CNode::print() const {
std::ostringstream result;
result << '[' << m_Min << ',' << m_Max << ']';
return result.str();
}
bool CQDigest::CNode::operator==(const CNode& node) const {
return m_Min == node.m_Min && m_Max == node.m_Max;
}
std::size_t CQDigest::CNode::numberDescendants() const {
return m_Descendants.size();
}
CQDigest::TNodePtrVecCItr CQDigest::CNode::beginDescendants() const {
return m_Descendants.begin();
}
CQDigest::TNodePtrVecCItr CQDigest::CNode::endDescendants() const {
return m_Descendants.end();
}
CQDigest::CNode* CQDigest::CNode::sibling(const CNode& node) const {
std::uint32_t min = node.min();
node.isLeftChild() ? min += node.span() : min -= node.span();
std::uint32_t max = node.max();
node.isLeftChild() ? max += node.span() : max -= node.span();
CNode sibling(min, max, 0, 0);
auto next = std::lower_bound(m_Descendants.begin(), m_Descendants.end(),
&sibling, SPostLess());
if (next != m_Descendants.end() && (*next)->isSibling(node)) {
return *next;
}
return nullptr;
}
bool CQDigest::CNode::isSibling(const CNode& node) const {
// Check if the nodes are on the same level and share a parent.
return this->span() == node.span() &&
(this->isLeftChild() ? m_Max + 1 == node.m_Min : m_Min == node.m_Max + 1);
}
bool CQDigest::CNode::isParent(const CNode& node) const {
// Check is ancestor and is in level above.
return this->isAncestor(node) && this->span() == 2 * node.span();
}
bool CQDigest::CNode::isAncestor(const CNode& node) const {
// Check for inclusion of node range.
return (m_Min < node.m_Min && m_Max >= node.m_Max) ||
(m_Min <= node.m_Min && m_Max > node.m_Max);
}
bool CQDigest::CNode::isRoot() const {
return m_Ancestor == nullptr;
}
bool CQDigest::CNode::isLeaf() const {
return this->span() == 1;
}
bool CQDigest::CNode::isLeftChild() const {
// The left child nodes are always an even multiple of the
// level range from the start of the overall range and the
// right child nodes an odd multiple. To reduce storage we
// pass in the start of the overall range.
return (m_Min / this->span()) % 2 == 0;
}
void CQDigest::CNode::detach(CNodeAllocator& allocator) {
m_Ancestor->removeDescendant(*this);
m_Ancestor->takeDescendants(*this);
m_Ancestor = nullptr;
allocator.release(*this);
}
void CQDigest::CNode::removeDescendant(CNode& node) {
// Remove node from the descendants.
m_Descendants.erase(std::remove(m_Descendants.begin(), m_Descendants.end(), &node),
m_Descendants.end());
}
bool CQDigest::CNode::takeDescendants(CNode& node) {
if (node.numberDescendants() == 0) {
return false;
}
if (!this->isAncestor(node)) {
// Find our descendants among the descendants of node.
TNodePtrVec nodesToTake;
TNodePtrVec nodesToLeave;
for (auto i = node.beginDescendants(); i != node.endDescendants(); ++i) {
if (this->isAncestor(**i)) {
nodesToTake.push_back(*i);
(*i)->m_Ancestor = this;
m_SubtreeCount += (*i)->subtreeCount();
} else {
nodesToLeave.push_back(*i);
}
}
// Merge the descendants.
TNodePtrVec descendants;
descendants.reserve(m_Descendants.size() + nodesToTake.size());
std::merge(m_Descendants.begin(), m_Descendants.end(), nodesToTake.begin(),
nodesToTake.end(), std::back_inserter(descendants), SPostLess());
// Update the node's descendants.
nodesToLeave.swap(node.m_Descendants);
// Write the result back to this node.
descendants.swap(m_Descendants);
return !nodesToTake.empty();
}
for (auto i = node.beginDescendants(); i != node.endDescendants(); ++i) {
(*i)->m_Ancestor = this;
}
// Merge the descendants.
TNodePtrVec descendants;
descendants.reserve(m_Descendants.size() + node.numberDescendants());
std::merge(m_Descendants.begin(), m_Descendants.end(), node.beginDescendants(),
node.endDescendants(), std::back_inserter(descendants), SPostLess());
// Clear out the node's descendants.
TNodePtrVec empty;
empty.swap(node.m_Descendants);
// Write the result back to this node.
descendants.swap(m_Descendants);
return true;
}
CQDigest::CNodeAllocator::CNodeAllocator(std::size_t size) {
m_Nodes.push_back(TNodeVec());
m_Nodes.back().reserve(size);
m_FreeNodes.push_back(TNodePtrVec());
}
CQDigest::CNode& CQDigest::CNodeAllocator::create(const CNode& node) {
if (m_FreeNodes.front().empty()) {
// Add a new collection if necessary. This should
// only happen when merging two q-digests.
std::size_t size = m_Nodes.back().size();
if (size == m_Nodes.back().capacity()) {
m_Nodes.push_back(TNodeVec());
m_Nodes.back().reserve(size);
m_FreeNodes.push_back(TNodePtrVec());
LOG_TRACE(<< "Added new block " << m_Nodes.size());
}
TNodeVec& nodes = m_Nodes.back();
nodes.resize(nodes.size() + 1);
nodes.back() = node;
return nodes.back();
}
CNode* freeNode = m_FreeNodes.front().back();
*freeNode = node;
m_FreeNodes.front().pop_back();
return *freeNode;
}
void CQDigest::CNodeAllocator::release(CNode& node) {
std::size_t block = this->findBlock(node);
if (block >= m_FreeNodes.size()) {
LOG_ABORT(<< "Bad block address = " << block
<< ", max = " << m_FreeNodes.size() - 1);
}
m_FreeNodes[block].push_back(&node);
if (m_Nodes.size() > 1) {
auto nodeItr = m_Nodes.begin();
std::advance(nodeItr, block);
// Remove the block if none of its nodes are in use.
if (m_FreeNodes[block].size() > nodeItr->size()) {
LOG_TRACE(<< "Removing block " << block);
m_FreeNodes.erase(m_FreeNodes.begin() + block);
m_Nodes.erase(nodeItr);
}
}
}
std::size_t CQDigest::CNodeAllocator::findBlock(const CNode& node) const {
std::size_t result = 0;
if (m_Nodes.size() == 1) {
return result;
}
const auto le = std::less_equal<const CNode*>();
for (auto i = m_Nodes.begin(); i != m_Nodes.end(); ++i, ++result) {
auto first = i->begin();
auto last = i->end();
if (first == last) {
continue;
}
--last;
if (le(&(*first), &node) && le(&node, &(*last))) {
break;
}
}
return result;
}
}
}
}