lib/model/CModelTools.cc (338 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CModelTools.h>
#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <maths/common/CBasicStatistics.h>
#include <maths/common/CIntegerTools.h>
#include <maths/common/CModel.h>
#include <maths/common/CMultinomialConjugate.h>
#include <maths/common/CSampling.h>
#include <maths/common/CTools.h>
#include <algorithm>
#include <functional>
#include <numeric>
namespace ml {
namespace model {
namespace {
using TMinAccumulator = maths::common::CBasicStatistics::COrderStatisticsStack<double, 1>;
//! \brief Visitor to add a probability to variant of possible
//! aggregation styles.
struct SAddProbability {
void operator()(double probability,
double weight,
maths::common::CJointProbabilityOfLessLikelySamples& aggregator) const {
aggregator.add(probability, weight);
}
void operator()(double probability,
double /*weight*/,
maths::common::CProbabilityOfExtremeSample& aggregator) const {
aggregator.add(probability);
}
};
//! \brief Visitor to read aggregate probability from a variant
//! of possible aggregation styles.
struct SReadProbability {
template<typename T>
bool operator()(double weight, double& result, const T& aggregator) const {
double probability;
if (!aggregator.calculate(probability)) {
LOG_ERROR(<< "Failed to compute probability");
return false;
}
result *= weight < 1.0 ? std::pow(probability, std::max(weight, 0.0)) : probability;
return true;
}
template<typename T>
bool operator()(TMinAccumulator& result, const T& aggregator) const {
double probability;
if (!aggregator.calculate(probability)) {
LOG_ERROR(<< "Failed to compute probability");
return false;
}
result.add(probability);
return true;
}
};
}
void CModelTools::CFuzzyDeduplicate::add(TDouble2Vec value) {
// We need a very fast way to compute an approximate percentiles
// for a large collection of samples. It is good enough to simply
// take a small random sample and compute percentiles on this. We
// would ideally like to sample each value with equal probability.
// However, since we can't visit the values in random order we
// need an approximately uniform online sampling scheme. In fact,
// we can achieve very nearly uniform sampling by using a modest
// size sample set, adding each value with probability equal to
// the count of preceding values and if a value is selected evicting
// from the sample set uniformly at random. To see this, consider
// a stream of N values and choose the sample set size to be 100.
// Note that the chance of the k'th value being in the final sample
// is given by
//
// P = P(k is sampled) * P(k is not evicted)
//
// Also,
//
// P(k is sampled) = 100 / k
// P(k is not evicted) ~ (1 - 1/100)^E[# evicted in k+1 to N]
//
// It is easy to show by considering a sum of the indicator random
// variables that the expected number of samples evicted in the
// range k+1 to N is
//
// 100 / (k+1) + 100 / (k+2) + ... + 100 / N
//
// This is proportional to the difference of the harmonic numbers
// H(N) - H(k+1). To leading order this is log(N / (k+1)). Setting
// k = f N for f in [0,1] we get for moderate N
//
// P ~ 100 / f / N x (1 - 1/100) ^ (100 x log(N / f / N))
// = 100 / N / f x exp(log(1 - 1/100) x 100 x log(1 / f))
// = 100 / N / f x exp(-(1/100 + O(1/100^2)) x 100 x log(1 / f))
// = 100 / N / f x f x f^O(1/100)
//
// For even moderate f this very quickly converges to the required
// constant 100 / N. For example, for f >= 0.05 we have that
//
// 98.5 / N < P(f N) <= 100 / N
++m_Count;
if (m_RandomSample.size() < 100) {
m_RandomSample.push_back(std::move(value));
} else if (maths::common::CSampling::uniformSample(m_Rng, 0.0, 1.0) <
100.0 / static_cast<double>(m_Count)) {
std::size_t evict{maths::common::CSampling::uniformSample(
m_Rng, 0, m_RandomSample.size())};
m_RandomSample[evict].swap(value);
}
}
void CModelTools::CFuzzyDeduplicate::computeEpsilons(core_t::TTime bucketLength,
std::size_t desiredNumberSamples) {
if (m_Count > 0) {
m_QuantizedValues.reserve(std::min(m_Count, desiredNumberSamples));
m_TimeEps = std::max(bucketLength / 60, core_t::TTime(1));
m_ValueEps.assign(m_RandomSample[0].size(), 0.0);
if (m_RandomSample.size() > 1) {
TDoubleVec values(m_RandomSample.size());
for (std::size_t i = 0; i < m_ValueEps.size(); ++i) {
for (std::size_t j = 0; j < m_RandomSample.size(); ++j) {
values[j] = m_RandomSample[j][i];
}
std::size_t p10{values.size() / 10};
std::size_t p90{(9 * values.size()) / 10};
std::nth_element(values.begin(), values.begin() + p10, values.end());
std::nth_element(values.begin() + p10 + 1, values.begin() + p90,
values.end());
m_ValueEps[i] = (values[p90] - values[p10]) /
static_cast<double>(desiredNumberSamples);
}
}
}
}
std::size_t CModelTools::CFuzzyDeduplicate::duplicate(core_t::TTime time, TDouble2Vec value) {
return m_QuantizedValues
.emplace(boost::unordered::piecewise_construct,
std::forward_as_tuple(this->quantize(time), this->quantize(value)),
std::forward_as_tuple(m_QuantizedValues.size()))
.first->second;
}
CModelTools::TDouble2Vec CModelTools::CFuzzyDeduplicate::quantize(TDouble2Vec value) const {
for (std::size_t i = 0; i < m_ValueEps.size(); ++i) {
value[i] = m_ValueEps[i] > 0.0
? m_ValueEps[i] * std::floor(value[i] / m_ValueEps[i])
: value[i];
}
return value;
}
core_t::TTime CModelTools::CFuzzyDeduplicate::quantize(core_t::TTime time) const {
return m_TimeEps > 0 ? maths::common::CIntegerTools::floor(time, m_TimeEps) : time;
}
std::size_t CModelTools::CFuzzyDeduplicate::SDuplicateValueHash::
operator()(const TTimeDouble2VecPr& value) const {
return static_cast<std::size_t>(std::accumulate(
value.second.begin(), value.second.end(),
static_cast<std::uint64_t>(value.first), [](std::uint64_t seed, double v) {
return core::CHashing::hashCombine(seed, static_cast<std::uint64_t>(v));
}));
}
CModelTools::CProbabilityAggregator::CProbabilityAggregator(EStyle style)
: m_Style(style), m_TotalWeight(0.0) {
}
bool CModelTools::CProbabilityAggregator::empty() const {
return m_TotalWeight == 0.0;
}
void CModelTools::CProbabilityAggregator::add(const TAggregator& aggregator, double weight) {
switch (m_Style) {
case E_Sum:
if (weight > 0.0) {
m_Aggregators.emplace_back(aggregator, weight);
}
break;
case E_Min:
m_Aggregators.emplace_back(aggregator, 1.0);
break;
}
}
void CModelTools::CProbabilityAggregator::add(double probability, double weight) {
m_TotalWeight += weight;
for (auto& aggregator : m_Aggregators) {
std::visit(std::bind<void>(SAddProbability(), probability, weight,
std::placeholders::_1),
aggregator.first);
}
}
bool CModelTools::CProbabilityAggregator::calculate(double& result) const {
result = 1.0;
if (m_TotalWeight == 0.0) {
LOG_TRACE(<< "No samples");
return true;
}
if (m_Aggregators.empty()) {
LOG_ERROR(<< "No probability aggregators specified");
return false;
}
double p{1.0};
switch (m_Style) {
case E_Sum: {
double n{0.0};
for (const auto& aggregator : m_Aggregators) {
n += aggregator.second;
}
for (const auto& aggregator : m_Aggregators) {
if (!std::visit(std::bind<bool>(SReadProbability(), aggregator.second / n,
std::ref(p), std::placeholders::_1),
aggregator.first)) {
return false;
}
}
break;
}
case E_Min: {
TMinAccumulator p_;
for (const auto& aggregator : m_Aggregators) {
if (!std::visit(std::bind<bool>(SReadProbability(), std::ref(p_), std::placeholders::_1),
aggregator.first)) {
return false;
}
}
if (p_.count() > 0) {
p = p_[0];
}
break;
}
}
if (p < 0.0 || p > 1.001) {
LOG_ERROR(<< "Unexpected probability = " << p);
}
result = maths::common::CTools::truncate(
p, maths::common::CTools::smallestProbability(), 1.0);
return true;
}
CModelTools::CCategoryProbabilityCache::CCategoryProbabilityCache()
: m_Prior(nullptr), m_SmallestProbability(1.0) {
}
CModelTools::CCategoryProbabilityCache::CCategoryProbabilityCache(const maths::common::CMultinomialConjugate& prior)
: m_Prior(&prior), m_SmallestProbability(1.0) {
}
bool CModelTools::CCategoryProbabilityCache::lookup(std::size_t attribute, double& result) const {
result = 1.0;
if (!m_Prior || m_Prior->isNonInformative()) {
return false;
}
if (m_Cache.empty()) {
TDoubleVec lb;
TDoubleVec ub;
m_Prior->probabilitiesOfLessLikelyCategories(maths_t::E_TwoSided, lb, ub);
LOG_TRACE(<< "P({c}) >= " << lb);
LOG_TRACE(<< "P({c}) <= " << ub);
m_Cache.swap(lb);
m_SmallestProbability = 1.0;
for (std::size_t i = 0; i < ub.size(); ++i) {
m_Cache[i] = (m_Cache[i] + ub[i]) / 2.0;
m_SmallestProbability = std::min(m_SmallestProbability, m_Cache[i]);
}
}
std::size_t index;
result = (!m_Prior->index(static_cast<double>(attribute), index) ||
index >= m_Cache.size())
? m_SmallestProbability
: m_Cache[index];
return true;
}
CModelTools::TOptionalDouble CModelTools::CCategoryProbabilityCache::medianConcentration() const {
if (m_MedianConcentration) {
return m_MedianConcentration;
}
if (!m_Prior || m_Prior->isNonInformative() || m_Cache.empty()) {
return {};
}
m_MedianConcentration =
maths::common::CBasicStatistics::median(m_Prior->concentrations());
return m_MedianConcentration;
}
void CModelTools::CCategoryProbabilityCache::debugMemoryUsage(
const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CTools::CLessLikelyProbability");
core::memory_debug::dynamicSize("m_Cache", m_Cache, mem->addChild());
if (m_Prior) {
m_Prior->debugMemoryUsage(mem->addChild());
}
}
std::size_t CModelTools::CCategoryProbabilityCache::memoryUsage() const {
std::size_t mem{core::memory::dynamicSize(m_Cache)};
if (m_Prior) {
mem += m_Prior->memoryUsage();
}
return mem;
}
CModelTools::CProbabilityCache::CProbabilityCache(double maximumError)
: m_MaximumError(maximumError) {
}
void CModelTools::CProbabilityCache::clear() {
m_Caches.clear();
}
void CModelTools::CProbabilityCache::addModes(model_t::EFeature feature,
std::size_t id,
const maths::common::CModel& model) {
if (model_t::dimension(feature) == 1) {
TDouble1Vec& modes{m_Caches[{feature, id}].s_Modes};
if (modes.empty()) {
TDouble2Vec1Vec modes_(
model.residualModes(maths_t::CUnitWeights::unit<TDouble2Vec>(1)));
for (const auto& mode : modes_) {
modes.push_back(mode[0]);
}
std::sort(modes.begin(), modes.end());
}
}
}
void CModelTools::CProbabilityCache::addProbability(model_t::EFeature feature,
std::size_t id,
const TDouble2Vec1Vec& value,
const maths::common::SModelProbabilityResult& result) {
if (m_MaximumError > 0.0 && value.size() == 1 && value[0].size() == 1) {
m_Caches[{feature, id}].s_Probabilities.emplace(value[0][0], result);
}
}
bool CModelTools::CProbabilityCache::lookup(model_t::EFeature feature,
std::size_t id,
const TDouble2Vec1Vec& value,
maths::common::SModelProbabilityResult& result) const {
// The idea of this cache is to:
// 1. Check that the requested value x is in a region where the
// probability as a function of value is monotonic
// 2. Check that the difference in the probability at the end
// points of the interval [a, b] including x is less than the
// required tolerance, so by 1 using the interpolated value
// won't introduce an error greater than the tolerance.
//
// To achieve 1 we note that the function is monotonic on an interval
// [a, b] if we can verify it doesn't contain more than one stationary
// point and the gradients satisfy P'(a) * P'(b) > 0.
result = maths::common::SModelProbabilityResult{};
if (m_MaximumError > 0.0 && value.size() == 1 && value[0].size() == 1) {
auto pos = m_Caches.find({feature, id});
if (pos != m_Caches.end()) {
double x{value[0][0]};
const TDouble1Vec& modes{pos->second.s_Modes};
const TDoubleProbabilityFMap& cache{pos->second.s_Probabilities};
auto right = cache.lower_bound(x);
if (right != cache.end() && right->first == x) {
result = right->second;
return true;
} else if (cache.size() >= 4 && right < cache.end() - 2 &&
right > cache.begin() + 1) {
auto left = right - 1;
if (this->canInterpolate(modes, left, right)) {
double probabilities[]{(left - 1)->second.s_Probability,
(left)->second.s_Probability,
(right)->second.s_Probability,
(right + 1)->second.s_Probability};
double tolerance{m_MaximumError *
std::min(probabilities[1], probabilities[2])};
LOG_TRACE(<< "p = " << probabilities);
if ((std::is_sorted(std::begin(probabilities), std::end(probabilities)) ||
std::is_sorted(std::begin(probabilities), std::end(probabilities),
std::greater<double>())) &&
std::fabs(probabilities[2] - probabilities[1]) <= tolerance) {
double a{left->first};
double b{right->first};
auto interpolate = [a, b, x](double pa, double pb) {
return (pa * (b - x) + pb * (x - a)) / (b - a);
};
auto nearest = x - a < b - x ? left : right;
result.s_Probability = interpolate(
left->second.s_Probability, right->second.s_Probability);
result.s_FeatureProbabilities.emplace_back(
left->second.s_FeatureProbabilities[0].s_Label,
interpolate(left->second.s_FeatureProbabilities[0].s_Probability,
right->second.s_FeatureProbabilities[0].s_Probability));
result.s_Tail = nearest->second.s_Tail;
return true;
}
}
}
}
}
return false;
}
bool CModelTools::CProbabilityCache::canInterpolate(const TDouble1Vec& modes,
TDoubleProbabilityFMapCItr left,
TDoubleProbabilityFMapCItr right) const {
return left->second.s_Tail == right->second.s_Tail &&
std::all_of(left - 1, right + 2,
[](const std::pair<double, maths::common::SModelProbabilityResult>& cached) {
return cached.second.s_FeatureProbabilities.size() == 1;
}) &&
std::none_of(left - 1, right + 2,
[](const std::pair<double, maths::common::SModelProbabilityResult>& cached) {
return cached.second.s_Conditional;
}) &&
(std::lower_bound(modes.begin(), modes.end(), (left - 1)->first) ==
std::lower_bound(modes.begin(), modes.end(), (right + 1)->first));
}
}
}