lib/maths/time_series/CAdaptiveBucketing.cc (750 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <maths/time_series/CAdaptiveBucketing.h>
#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <core/CPersistUtils.h>
#include <core/CStatePersistInserter.h>
#include <core/CStateRestoreTraverser.h>
#include <core/RestoreMacros.h>
#include <maths/common/CBasicStatistics.h>
#include <maths/common/CBasicStatisticsPersist.h>
#include <maths/common/CChecksum.h>
#include <maths/common/CSpline.h>
#include <maths/common/CTools.h>
#include <maths/common/CToolsDetail.h>
#include <boost/math/distributions/binomial.hpp>
#include <algorithm>
#include <cmath>
#include <cstddef>
#include <numeric>
#include <utility>
#include <vector>
namespace ml {
namespace maths {
namespace time_series {
namespace {
using TSpline = common::CSpline<>;
using TSizeVec = std::vector<std::size_t>;
using TFloatUInt32Pr = std::pair<common::CFloatStorage, std::uint32_t>;
//! \brief Used to keep track of continguous points after spreading.
class MATHS_TIME_SERIES_EXPORT CContinugousPoints {
public:
using TMeanAccumulator = common::CBasicStatistics::SSampleMean<double>::TAccumulator;
public:
explicit CContinugousPoints(double point) { m_Centre.add(point); }
std::size_t size() const {
return static_cast<std::size_t>(common::CBasicStatistics::count(m_Centre));
}
double centre() const { return common::CBasicStatistics::mean(m_Centre); }
double location(std::size_t j, double separation) const {
return common::CBasicStatistics::mean(m_Centre) +
static_cast<double>(j) * separation - this->radius(separation);
}
bool overlap(const CContinugousPoints& other, double separation) const {
double left{common::CBasicStatistics::mean(m_Centre)};
double right{common::CBasicStatistics::mean(other.m_Centre)};
return std::fabs(right - left) <
this->radius(separation) + other.radius(separation) + separation;
}
void merge(const CContinugousPoints& other, double a, double b, double separation) {
m_Centre += other.m_Centre;
common::CBasicStatistics::moment<0>(m_Centre) = this->truncate(
common::CBasicStatistics::mean(m_Centre), a, b, separation);
}
private:
double truncate(double point, double a, double b, double separation) const {
return common::CTools::truncate(point, a + this->radius(separation),
b - this->radius(separation));
}
double radius(double separation) const {
return (common::CBasicStatistics::count(m_Centre) - 1.0) * separation / 2.0;
}
private:
TMeanAccumulator m_Centre;
};
//! Convert to a delimited string.
std::string pValueToDelimited(const TFloatUInt32Pr& value) {
return value.first.toString() + common::CBasicStatistics::EXTERNAL_DELIMITER +
core::CStringUtils::typeToString(value.second);
}
//! Initialize from a delimited string.
bool pValueFromDelimited(const std::string& delimited, TFloatUInt32Pr& value) {
std::size_t pos{delimited.find(common::CBasicStatistics::EXTERNAL_DELIMITER)};
if (pos == std::string::npos) {
LOG_ERROR(<< "Failed to delimiter in '" << delimited << "'");
return false;
}
unsigned int count{};
if (value.first.fromString(delimited.substr(0, pos)) == false ||
core::CStringUtils::stringToType(delimited.substr(pos + 1), count) == false) {
LOG_ERROR(<< "Failed to extract value from '" << delimited << "'");
return false;
}
value.second = count;
return true;
}
//! Clear a vector and recover its memory.
template<typename T>
void clearAndShrink(std::vector<T>& vector) {
std::vector<T> empty;
empty.swap(vector);
}
const core::TPersistenceTag DECAY_RATE_TAG{"a", "decay_rate"};
const core::TPersistenceTag ENDPOINT_TAG{"b", "endpoint"};
const core::TPersistenceTag CENTRES_TAG{"c", "centres"};
const core::TPersistenceTag MEAN_DESIRED_DISPLACEMENT_TAG{"d", "mean_desired_displacement"};
const core::TPersistenceTag MEAN_ABS_DESIRED_DISPLACEMENT_TAG{"e", "mean_abs_desired_displacement"};
const core::TPersistenceTag LARGE_ERROR_COUNTS_TAG{"f", "large_error_counts"};
const core::TPersistenceTag TARGET_SIZE_TAG{"g", "target size"};
const core::TPersistenceTag LAST_LARGE_ERROR_BUCKET_TAG{"h", "last_large_error_bucket"};
const core::TPersistenceTag LAST_LARGE_ERROR_PERIOD_TAG{"i", "last_large_error_period"};
const core::TPersistenceTag LARGE_ERROR_COUNT_P_VALUES_TAG{"j", "large_error_counts_p_values"};
const core::TPersistenceTag MEAN_WEIGHT_TAG{"k", "mean weight"};
const std::string EMPTY_STRING;
const double SMOOTHING_FUNCTION[]{0.25, 0.5, 0.25};
const std::size_t WIDTH{std::size(SMOOTHING_FUNCTION) / 2};
const double ALPHA{0.35};
const double EPS{std::numeric_limits<double>::epsilon()};
const double WEIGHTS[]{1.0, 1.0, 1.0, 0.75, 0.5};
const double MINIMUM_DECAY_RATE{0.001};
const double MINIMUM_LARGE_ERROR_COUNT_TO_SPLIT{10.0};
const double MODERATE_SIGNIFICANCE{1e-2};
const double HIGH_SIGNIFICANCE{1e-3};
const double LOG_MODERATE_SIGNIFICANCE{std::log(MODERATE_SIGNIFICANCE)};
const double LOG_HIGH_SIGNIFICANCE{std::log(HIGH_SIGNIFICANCE)};
}
CAdaptiveBucketing::CAdaptiveBucketing(double decayRate, double minimumBucketLength)
: m_DecayRate{std::max(decayRate, MINIMUM_DECAY_RATE)}, m_MinimumBucketLength{minimumBucketLength} {
}
CAdaptiveBucketing::TRestoreFunc CAdaptiveBucketing::getAcceptRestoreTraverser() {
return [this](core::CStateRestoreTraverser& traverser) {
return this->acceptRestoreTraverser(traverser);
};
}
CAdaptiveBucketing::TPersistFunc CAdaptiveBucketing::getAcceptPersistInserter() const {
return [this](core::CStatePersistInserter& inserter) {
this->acceptPersistInserter(inserter);
};
}
bool CAdaptiveBucketing::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
do {
const std::string& name{traverser.name()};
RESTORE_BUILT_IN(DECAY_RATE_TAG, m_DecayRate)
RESTORE_BUILT_IN(TARGET_SIZE_TAG, m_TargetSize)
RESTORE_BUILT_IN(LAST_LARGE_ERROR_BUCKET_TAG, m_LastLargeErrorBucket)
RESTORE_BUILT_IN(LAST_LARGE_ERROR_PERIOD_TAG, m_LastLargeErrorPeriod)
RESTORE(LARGE_ERROR_COUNT_P_VALUES_TAG,
m_LargeErrorCountPValues.fromDelimited(traverser.value(), pValueFromDelimited))
RESTORE(MEAN_WEIGHT_TAG, m_MeanWeight.fromDelimited(traverser.value()))
RESTORE(ENDPOINT_TAG, core::CPersistUtils::fromString(traverser.value(), m_Endpoints))
RESTORE(CENTRES_TAG, core::CPersistUtils::fromString(traverser.value(), m_Centres))
RESTORE(LARGE_ERROR_COUNTS_TAG,
core::CPersistUtils::fromString(traverser.value(), m_LargeErrorCounts))
RESTORE(MEAN_DESIRED_DISPLACEMENT_TAG,
m_MeanDesiredDisplacement.fromDelimited(traverser.value()))
RESTORE(MEAN_ABS_DESIRED_DISPLACEMENT_TAG,
m_MeanAbsDesiredDisplacement.fromDelimited(traverser.value()))
} while (traverser.next());
if (m_TargetSize == 0) {
m_TargetSize = this->size();
}
if (m_LargeErrorCounts.empty()) {
m_LargeErrorCounts.resize(m_Centres.size(), 0.0);
}
this->checkRestoredInvariants();
return true;
}
void CAdaptiveBucketing::checkRestoredInvariants() const {
// Fail if the invariant "m_Endpoints.size() == m_Centres.size() + 1 or both empty"
// does not hold true.
if ((m_Endpoints.size() != m_Centres.size() + 1) &&
(m_Endpoints.empty() == false || m_Centres.empty() == false)) {
LOG_ABORT(<< "Invariance check failed: m_Endpoints.size() != m_Centres.size() + 1."
<< " [" << m_Endpoints.size() << " != " << m_Centres.size() + 1 << "]"
<< " && "
<< "(m_Endpoints.empty() == false || m_Centres.empty() == false)"
<< " [" << std::boolalpha << m_Endpoints.empty() << " || "
<< m_Centres.empty() << "]");
}
VIOLATES_INVARIANT(m_Centres.size(), !=, m_LargeErrorCounts.size());
}
void CAdaptiveBucketing::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(DECAY_RATE_TAG, m_DecayRate, core::CIEEE754::E_SinglePrecision);
inserter.insertValue(TARGET_SIZE_TAG, m_TargetSize);
inserter.insertValue(LAST_LARGE_ERROR_BUCKET_TAG, m_LastLargeErrorBucket);
inserter.insertValue(LAST_LARGE_ERROR_PERIOD_TAG, m_LastLargeErrorPeriod);
inserter.insertValue(LARGE_ERROR_COUNT_P_VALUES_TAG,
m_LargeErrorCountPValues.toDelimited(pValueToDelimited));
inserter.insertValue(MEAN_WEIGHT_TAG, m_MeanWeight.toDelimited());
inserter.insertValue(ENDPOINT_TAG, core::CPersistUtils::toString(m_Endpoints));
inserter.insertValue(CENTRES_TAG, core::CPersistUtils::toString(m_Centres));
inserter.insertValue(LARGE_ERROR_COUNTS_TAG,
core::CPersistUtils::toString(m_LargeErrorCounts));
inserter.insertValue(MEAN_DESIRED_DISPLACEMENT_TAG,
m_MeanDesiredDisplacement.toDelimited());
inserter.insertValue(MEAN_ABS_DESIRED_DISPLACEMENT_TAG,
m_MeanAbsDesiredDisplacement.toDelimited());
}
void CAdaptiveBucketing::swap(CAdaptiveBucketing& other) noexcept {
std::swap(m_DecayRate, other.m_DecayRate);
std::swap(m_MinimumBucketLength, other.m_MinimumBucketLength);
std::swap(m_TargetSize, other.m_TargetSize);
std::swap(m_LastLargeErrorBucket, other.m_LastLargeErrorBucket);
std::swap(m_LastLargeErrorPeriod, other.m_LastLargeErrorPeriod);
std::swap(m_LargeErrorCountPValues, other.m_LargeErrorCountPValues);
std::swap(m_MeanWeight, other.m_MeanWeight);
m_Endpoints.swap(other.m_Endpoints);
m_Centres.swap(other.m_Centres);
m_LargeErrorCounts.swap(other.m_LargeErrorCounts);
std::swap(m_MeanDesiredDisplacement, other.m_MeanDesiredDisplacement);
std::swap(m_MeanAbsDesiredDisplacement, other.m_MeanAbsDesiredDisplacement);
}
bool CAdaptiveBucketing::initialized() const {
return m_Endpoints.size() > 0;
}
bool CAdaptiveBucketing::initialize(double a, double b, std::size_t n) {
if (n == 0) {
LOG_ERROR(<< "Must have at least one bucket");
return false;
}
if (m_MinimumBucketLength > 0.0) {
// Handle the case that the minimum bucket length is
// longer than the period.
m_MinimumBucketLength = std::min(m_MinimumBucketLength, b - a);
n = std::min(n, static_cast<std::size_t>((b - a) / m_MinimumBucketLength));
}
m_TargetSize = n;
m_Endpoints.clear();
m_Endpoints.reserve(n + 1);
double width{(b - a) / static_cast<double>(n)};
for (std::size_t i = 0; i < n + 1; ++i) {
m_Endpoints.push_back(a + static_cast<double>(i) * width);
}
m_Centres.clear();
m_Centres.resize(n);
m_LargeErrorCounts.clear();
m_LargeErrorCounts.resize(n, 0.0);
return true;
}
void CAdaptiveBucketing::initialValues(core_t::TTime start,
core_t::TTime end,
const TFloatMeanAccumulatorVec& values) {
if (this->initialized() == false) {
return;
}
std::size_t n{values.size()};
core_t::TTime bucketLength{(end - start) / static_cast<core_t::TTime>(n)};
core_t::TTime dt{static_cast<core_t::TTime>(common::CTools::truncate(
m_MinimumBucketLength, 1.0, static_cast<double>(bucketLength)))};
core_t::TTime repeat{static_cast<core_t::TTime>(
m_Endpoints[m_Endpoints.size() - 1] - m_Endpoints[0])};
// Initialize splines for seed values:
// - For missing bucket values we use linear interpolation.
// - For intrabucket interpolation we use cubic spline interpolation.
// - For value weights we use linear interpolation.
TDoubleVec knots;
TDoubleVec knotValues;
TDoubleVec knotWeights;
knots.reserve(values.size());
knotValues.reserve(values.size());
for (std::size_t i = 0; i < values.size(); ++i) {
if (common::CBasicStatistics::count(values[i]) > 0.0) {
knots.push_back(static_cast<double>(start + bucketLength * i));
knotValues.push_back(common::CBasicStatistics::mean(values[i]));
}
}
TSpline interpolateMissingValues{TSpline::E_Linear};
if (knots.size() >= 2) {
interpolateMissingValues.interpolate(knots, knotValues, TSpline::E_Natural);
}
knots.resize(values.size());
knotValues.resize(values.size());
knotWeights.resize(values.size());
for (std::size_t i = 0; i < values.size(); ++i) {
knots[i] = static_cast<double>(start + bucketLength * i);
knotValues[i] =
interpolateMissingValues.initialized()
? interpolateMissingValues.value(knots[i])
: static_cast<double>(common::CBasicStatistics::mean(values[i]));
knotWeights[i] = common::CBasicStatistics::count(values[i]);
}
TSpline seedValues{TSpline::E_Cubic};
TSpline seedWeights{TSpline::E_Linear};
seedValues.interpolate(knots, knotValues, TSpline::E_Natural);
seedWeights.interpolate(knots, knotWeights, TSpline::E_Natural);
double scale{std::pow(static_cast<double>(dt) / static_cast<double>(bucketLength), 2.0)};
core_t::TTime observedSinceLastRefine{0};
for (core_t::TTime time = start; time < end; time += dt) {
if (this->inWindow(time)) {
double value{seedValues.value(static_cast<double>(time))};
double weight{scale * seedWeights.value(static_cast<double>(time))};
std::size_t bucket;
if (this->bucket(time, bucket) && weight > 0.0) {
this->addInitialValue(bucket, time, value, weight);
}
observedSinceLastRefine += dt;
if (observedSinceLastRefine >= repeat) {
this->refine(time);
observedSinceLastRefine = 0;
}
}
}
}
std::size_t CAdaptiveBucketing::size() const {
return m_Centres.size();
}
void CAdaptiveBucketing::clear() {
clearAndShrink(m_Endpoints);
clearAndShrink(m_Centres);
clearAndShrink(m_LargeErrorCounts);
}
void CAdaptiveBucketing::add(std::size_t bucket, core_t::TTime time, double weight) {
auto centre = common::CBasicStatistics::momentsAccumulator(
this->bucketCount(bucket), static_cast<double>(m_Centres[bucket]));
centre.add(this->offset(time), weight);
m_Centres[bucket] = common::CBasicStatistics::mean(centre);
m_MeanWeight.add(weight);
}
void CAdaptiveBucketing::addLargeError(std::size_t bucket, core_t::TTime time) {
core_t::TTime period{static_cast<core_t::TTime>(
m_Endpoints[m_Endpoints.size() - 1] - m_Endpoints[0])};
time = common::CIntegerTools::floor(time, period);
if (bucket != m_LastLargeErrorBucket || time != m_LastLargeErrorPeriod) {
m_LargeErrorCounts[bucket] += 1.0;
}
m_LastLargeErrorBucket = bucket;
m_LastLargeErrorPeriod = time;
}
void CAdaptiveBucketing::decayRate(double value) {
m_DecayRate = std::max(value, MINIMUM_DECAY_RATE);
}
double CAdaptiveBucketing::decayRate() const {
return m_DecayRate;
}
void CAdaptiveBucketing::age(double factor) {
for (auto& count : m_LargeErrorCounts) {
count *= factor;
}
m_MeanDesiredDisplacement.age(factor);
m_MeanAbsDesiredDisplacement.age(factor);
m_MeanWeight.age(factor);
}
double CAdaptiveBucketing::minimumBucketLength() const {
return m_MinimumBucketLength;
}
void CAdaptiveBucketing::refine(core_t::TTime time) {
using TDoubleDoublePr = std::pair<double, double>;
using TDoubleDoublePrVec = std::vector<TDoubleDoublePr>;
using TDoubleSizePr = std::pair<double, std::size_t>;
using TMinMaxAccumulator = common::CBasicStatistics::CMinMax<TDoubleSizePr>;
LOG_TRACE(<< "refining at " << time);
if (m_Endpoints.size() < 2) {
return;
}
// Check if any buckets should be split based on the large error counts.
this->maybeSplitBucket();
std::size_t n{m_Endpoints.size() - 1};
double a{m_Endpoints[0]};
double b{m_Endpoints[n]};
// Extract the bucket means.
TDoubleDoublePrVec values;
values.reserve(n);
for (std::size_t i = 0; i < n; ++i) {
values.emplace_back(this->bucketCount(i), this->predict(i, time, m_Centres[i]));
}
LOG_TRACE(<< "values = " << values);
// Compute the function range in each bucket, imposing periodic
// boundary conditions at the start and end of the interval.
TDoubleVec ranges;
ranges.reserve(n);
for (std::size_t i = 0; i < n; ++i) {
TDoubleDoublePr v[]{values[(n + i - 2) % n], values[(n + i - 1) % n],
values[(n + i + 0) % n], // centre
values[(n + i + 1) % n], values[(n + i + 2) % n]};
TMinMaxAccumulator minmax;
for (std::size_t j = 0; j < sizeof(v) / sizeof(v[0]); ++j) {
if (v[j].first > 0.0) {
minmax.add({v[j].second, j});
}
}
if (minmax.initialized()) {
ranges.push_back(WEIGHTS[minmax.max().second > minmax.min().second
? minmax.max().second - minmax.min().second
: minmax.min().second - minmax.max().second] *
std::pow(minmax.max().first - minmax.min().first, 0.75));
} else {
ranges.push_back(0.0);
}
}
// Smooth the ranges by convolving with a smoothing function.
// We do this in the "time" domain because the smoothing
// function is narrow. Estimate the averaging error in each
// bucket by multiplying the smoothed range by the bucket width.
TDoubleVec averagingErrors;
averagingErrors.reserve(n);
for (std::size_t i = 0; i < n; ++i) {
double ai{m_Endpoints[i]};
double bi{m_Endpoints[i + 1]};
double error{0.0};
for (std::size_t j = 0; j < std::size(SMOOTHING_FUNCTION); ++j) {
error += SMOOTHING_FUNCTION[j] * ranges[(n + i + j - WIDTH) % n];
}
double h{bi - ai};
error *= h / (b - a);
averagingErrors.push_back(error);
}
double maxAveragingError{
*std::max_element(averagingErrors.begin(), averagingErrors.end())};
for (const auto& pValue : m_LargeErrorCountPValues) {
if (pValue.first < MODERATE_SIGNIFICANCE) {
averagingErrors[pValue.second] = maxAveragingError;
}
}
double totalAveragingError{
std::accumulate(averagingErrors.begin(), averagingErrors.end(), 0.0)};
LOG_TRACE(<< "averagingErrors = " << averagingErrors);
LOG_TRACE(<< "totalAveragingError = " << totalAveragingError);
double n_{static_cast<double>(n)};
double step{(1.0 - n_ * EPS) * totalAveragingError / n_};
TFloatVec endpoints{m_Endpoints};
LOG_TRACE(<< "step = " << step);
// If all the function values are identical then the end points
// should be equidistant. We check step in case of underflow.
if (step == 0.0) {
m_Endpoints[0] = a;
for (std::size_t i = 0; i < n; ++i) {
m_Endpoints[i] = (b - a) * static_cast<double>(i) / n_;
}
m_Endpoints[n] = b;
} else {
// Noise in the bucket mean values creates a "high" frequency
// mean zero driving force on the buckets' end points desired
// positions. Once they have stabilized on their desired location
// for the trend, we are able to detect this by comparing the
// time averaged desired displacement and the absolute desired
// displacement. The lower the ratio the more smoothing we apply.
// Note we want to damp the noise out because the process of
// adjusting the buckets end points loses a small amount of
// information, see the comments at the start of refresh for
// more details.
double alpha{
ALPHA *
(common::CBasicStatistics::mean(m_MeanAbsDesiredDisplacement) == 0.0
? 1.0
: std::fabs(common::CBasicStatistics::mean(m_MeanDesiredDisplacement)) /
common::CBasicStatistics::mean(m_MeanAbsDesiredDisplacement))};
LOG_TRACE(<< "alpha = " << alpha);
double displacement{0.0};
// Linearly interpolate between the current end points and points
// separated by equal total averaging error. Interpolating is
// equivalent to adding drag to the end point dynamics and damps
// any oscillations.
double unassignedAveragingError{0.0};
for (std::size_t i = 0, j = 1; i < n && j < n + 1; ++i) {
double ai{endpoints[i]};
double bi{endpoints[i + 1]};
double hi{bi - ai};
double ei{averagingErrors[i]};
unassignedAveragingError += ei;
for (double ej = step - (unassignedAveragingError - ei);
unassignedAveragingError >= step;
ej += step, unassignedAveragingError -= step, ++j) {
double xj{hi * ej / ei};
m_Endpoints[j] = std::max(
m_Endpoints[j - 1] + 1e-8 * std::fabs(m_Endpoints[j - 1]),
endpoints[j] + alpha * (ai + xj - endpoints[j]));
displacement += (ai + xj) - endpoints[j];
LOG_TRACE(<< "interval = [" << ai << "," << bi << "]"
<< " averaging error / unit length = " << ei / hi << ", desired translation "
<< endpoints[j] << " -> " << ai + xj);
}
}
// Add a margin to guaranty we'll meet the separation constraint.
spread(a, b, (1.0 + 1e-8) * m_MinimumBucketLength, m_Endpoints);
// By construction, the first and last end point should be
// close "a" and "b", respectively, but we snap them to "a"
// and "b" so that the total interval is unchanged.
m_Endpoints[0] = a;
m_Endpoints[n] = b;
LOG_TRACE(<< "refinedEndpoints = " << m_Endpoints);
m_MeanDesiredDisplacement.add(displacement);
m_MeanAbsDesiredDisplacement.add(std::fabs(displacement));
}
this->refresh(endpoints);
}
bool CAdaptiveBucketing::knots(core_t::TTime time,
common::CSplineTypes::EBoundaryCondition boundary,
TDoubleVec& knots,
TDoubleVec& values,
TDoubleVec& variances) const {
knots.clear();
values.clear();
variances.clear();
std::size_t n{m_Centres.size()};
for (std::size_t i = 0; i < n; ++i) {
if (this->bucketCount(i) > 0.0) {
double wide{3.0 * (m_Endpoints[n] - m_Endpoints[0]) / static_cast<double>(n)};
LOG_TRACE(<< "period " << m_Endpoints[n] - m_Endpoints[0]
<< ", # buckets = " << n << ", wide = " << wide);
// We get two points for each wide bucket but at most
// one third of the buckets can be wide. In this case
// we have 2 * n/3 + 2*n/3 knot points.
knots.reserve(4 * n / 3);
values.reserve(4 * n / 3);
variances.reserve(4 * n / 3);
double c{m_Centres[i]};
double c0{c - m_Endpoints[0]};
knots.push_back(m_Endpoints[0]);
values.push_back(this->predict(i, time, c));
variances.push_back(this->variance(i));
for (/**/; i < n; ++i) {
if (this->bucketCount(i) > 0.0) {
double a{m_Endpoints[i]};
double b{m_Endpoints[i + 1]};
double min{(9.0 * a + b) / 10.0};
double max{(a + 9.0 * b) / 10.0};
c = m_Centres[i];
double m{this->predict(i, time, c)};
double v{this->variance(i)};
if (b - a > wide) {
knots.push_back(std::max(c - (b - a) / 4.0, min));
values.push_back(m);
variances.push_back(v);
knots.push_back(std::min(c + (b - a) / 4.0, max));
values.push_back(m);
variances.push_back(v);
} else {
knots.push_back(std::min(std::max(c, min), max));
values.push_back(m);
variances.push_back(v);
}
}
}
switch (boundary) {
case common::CSplineTypes::E_Natural:
case common::CSplineTypes::E_ParabolicRunout:
knots.push_back(m_Endpoints[n]);
values.push_back(values.back());
variances.push_back(variances.back());
break;
case common::CSplineTypes::E_Periodic:
// We search for the value in the last and next period which
// are adjacent. Note that values need not be the same at the
// start and end of the period because the gradient can vary,
// but we expect them to be continuous.
for (std::size_t j = n - 1; j > 0; --j) {
if (this->bucketCount(j) > 0.0) {
double alpha{m_Endpoints[n] - m_Centres[j]};
double beta{c0};
double Z{alpha + beta};
if (Z == 0.0) {
alpha = beta = 0.5;
} else {
alpha /= Z;
beta /= Z;
}
double lastPeriodValue{
this->predict(j, time, m_Centres[j] - m_Endpoints[n])};
double lastPeriodVariance{this->variance(j)};
knots[0] = m_Endpoints[0];
values[0] = alpha * values[0] + beta * lastPeriodValue;
variances[0] = alpha * variances[0] + beta * lastPeriodVariance;
break;
}
}
for (std::size_t j = 0; j < n; ++j) {
if (this->bucketCount(j) > 0.0) {
double alpha{m_Centres[j]};
double beta{m_Endpoints[n] - knots.back()};
double Z{alpha + beta};
if (Z == 0.0) {
alpha = beta = 0.5;
} else {
alpha /= Z;
beta /= Z;
}
double nextPeriodValue{
this->predict(j, time, m_Endpoints[n] + m_Centres[j])};
double nextPeriodVariance{this->variance(j)};
values.push_back(alpha * values.back() + beta * nextPeriodValue);
variances.push_back(alpha * variances.back() + beta * nextPeriodVariance);
knots.push_back(m_Endpoints[n]);
break;
}
}
break;
}
}
}
if (knots.size() > 2) {
// If the distance between knot points becomes too small the
// spline can become poorly conditioned. We can safely discard
// knot points which are very close to one another.
TSizeVec indices(knots.size());
std::iota(indices.begin(), indices.end(), 0);
indices.erase(std::remove_if(indices.begin() + 1, indices.end() - 1,
[&knots](std::size_t i) {
return knots[i] - knots[i - 1] < 1.0 ||
knots[i + 1] - knots[i] < 1.0;
}),
indices.end() - 1);
if (indices.size() < knots.size()) {
for (std::size_t i = 0; i < indices.size(); ++i) {
knots[i] = knots[indices[i]];
values[i] = values[indices[i]];
variances[i] = variances[indices[i]];
}
knots.resize(indices.size());
values.resize(indices.size());
variances.resize(indices.size());
}
}
return knots.size() >= 2;
}
void CAdaptiveBucketing::spread(double a, double b, double separation, TFloatVec& points) {
if (separation <= 0.0 || points.size() < 2) {
return;
}
if (b <= a) {
LOG_ERROR(<< "Bad interval [" << a << "," << b << "]");
return;
}
// Check if we just need to space the points uniformly.
std::size_t n{points.size() - 1};
if (b - a <= separation * static_cast<double>(n + 1)) {
for (std::size_t i = 0; i <= n; ++i) {
points[i] = a + (b - a) * static_cast<double>(i) / static_cast<double>(n);
}
return;
}
// Check if there's nothing to do.
double minSeparation{points[1] - points[0]};
for (std::size_t i = 2; i < points.size(); ++i) {
minSeparation = std::min(minSeparation, points[i] - points[i - 1]);
}
if (minSeparation > separation) {
return;
}
// We can do this in n * log(n) complexity with at most log(n)
// passes through the points. Provided the minimum separation
// is at least "interval" / "# centres" the problem is feasible.
//
// We want to find the solution which minimizes the sum of the
// distances the points move. This is possible by repeatedly
// merging clusters of contiguous points and then placing them
// at the mean of the points they contain. The process repeats
// until no clusters merge.
std::sort(points.begin(), points.end(),
[](double lhs, double rhs) { return lhs < rhs; });
std::vector<CContinugousPoints> contiguousPoints;
contiguousPoints.reserve(points.size());
for (const auto& point : points) {
contiguousPoints.emplace_back(point);
}
for (std::size_t previousSize = 0; contiguousPoints.size() != previousSize;
/**/) {
previousSize = contiguousPoints.size();
std::size_t last{0};
for (std::size_t i = 1; i < contiguousPoints.size(); ++i) {
if (contiguousPoints[last].overlap(contiguousPoints[i], separation)) {
contiguousPoints[last].merge(contiguousPoints[i], a, b, separation);
} else {
std::swap(contiguousPoints[++last], contiguousPoints[i]);
}
}
contiguousPoints.erase(contiguousPoints.begin() + last + 1,
contiguousPoints.end());
}
double last{-std::numeric_limits<double>::max()};
for (std::size_t i = 0, j = 0; i < contiguousPoints.size(); ++i) {
for (std::size_t k = 0; k < contiguousPoints[i].size(); ++j, ++k) {
points[j] = std::max(last + separation,
contiguousPoints[i].location(k, separation));
last = points[j];
}
}
}
const CAdaptiveBucketing::TFloatVec& CAdaptiveBucketing::endpoints() const {
return m_Endpoints;
}
const CAdaptiveBucketing::TFloatVec& CAdaptiveBucketing::centres() const {
return m_Centres;
}
CAdaptiveBucketing::TFloatVec& CAdaptiveBucketing::centres() {
return m_Centres;
}
const CAdaptiveBucketing::TFloatVec& CAdaptiveBucketing::largeErrorCounts() const {
return m_LargeErrorCounts;
}
CAdaptiveBucketing::TFloatVec& CAdaptiveBucketing::largeErrorCounts() {
return m_LargeErrorCounts;
}
double CAdaptiveBucketing::adjustedWeight(std::size_t bucket, double weight) const {
for (const auto& pValue : m_LargeErrorCountPValues) {
if (bucket == pValue.second) {
double largeWeight{2.0 * common::CBasicStatistics::mean(m_MeanWeight)};
double logPValue{common::CTools::fastLog(pValue.first)};
return common::CTools::linearlyInterpolate(LOG_HIGH_SIGNIFICANCE,
LOG_MODERATE_SIGNIFICANCE,
largeWeight, weight, logPValue);
}
}
return weight;
}
double CAdaptiveBucketing::count() const {
double result{0.0};
for (std::size_t i = 0; i < m_Centres.size(); ++i) {
result += this->bucketCount(i);
}
return result;
}
CAdaptiveBucketing::TDoubleVec CAdaptiveBucketing::values(core_t::TTime time) const {
TDoubleVec result;
result.reserve(m_Centres.size());
for (std::size_t i = 0; i < m_Centres.size(); ++i) {
result.push_back(this->predict(i, time, m_Centres[i]));
}
return result;
}
CAdaptiveBucketing::TDoubleVec CAdaptiveBucketing::variances() const {
TDoubleVec result;
result.reserve(m_Centres.size());
for (std::size_t i = 0; i < m_Centres.size(); ++i) {
result.push_back(this->variance(i));
}
return result;
}
bool CAdaptiveBucketing::bucket(core_t::TTime time, std::size_t& result) const {
double t{this->offset(time)};
std::size_t i(std::upper_bound(m_Endpoints.begin(), m_Endpoints.end(), t) -
m_Endpoints.begin());
std::size_t n{m_Endpoints.size()};
if (t < m_Endpoints[0] || i == n) {
LOG_ERROR(<< "t = " << t << " out of range [" << m_Endpoints[0] << ","
<< m_Endpoints[n - 1] << ")");
return false;
}
result = i - 1;
return true;
}
std::uint64_t CAdaptiveBucketing::checksum(std::uint64_t seed) const {
seed = common::CChecksum::calculate(seed, m_DecayRate);
seed = common::CChecksum::calculate(seed, m_MinimumBucketLength);
seed = common::CChecksum::calculate(seed, m_TargetSize);
seed = common::CChecksum::calculate(seed, m_LastLargeErrorBucket);
seed = common::CChecksum::calculate(seed, m_LastLargeErrorPeriod);
seed = common::CChecksum::calculate(
seed, m_LargeErrorCountPValues.toDelimited(pValueToDelimited));
seed = common::CChecksum::calculate(seed, m_MeanWeight);
seed = common::CChecksum::calculate(seed, m_Endpoints);
seed = common::CChecksum::calculate(seed, m_Centres);
seed = common::CChecksum::calculate(seed, m_LargeErrorCounts);
seed = common::CChecksum::calculate(seed, m_MeanDesiredDisplacement);
return common::CChecksum::calculate(seed, m_MeanAbsDesiredDisplacement);
}
std::size_t CAdaptiveBucketing::memoryUsage() const {
std::size_t mem{core::memory::dynamicSize(m_Endpoints)};
mem += core::memory::dynamicSize(m_Centres);
mem += core::memory::dynamicSize(m_LargeErrorCounts);
return mem;
}
double CAdaptiveBucketing::bucketLargeErrorCountPValue(double totalLargeErrorCount,
std::size_t bucket) const {
// We compute the right tail p-value of the count of large errors
// in a bucket for the null hypothesis that they are uniformly
// distributed on the total bucketed period, i.e. that there is
// not some poorly modelled periodic pattern.
if (m_LargeErrorCounts[bucket] > 2.0) { // Are there repeats?
double interval{m_Endpoints[bucket + 1] - m_Endpoints[bucket]};
double period{m_Endpoints[m_Endpoints.size() - 1] - m_Endpoints[0]};
try {
boost::math::binomial binomial{totalLargeErrorCount, interval / period};
return common::CTools::safeCdfComplement(binomial, m_LargeErrorCounts[bucket]);
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to calculate splitting significance: '" << e.what()
<< "' interval = " << interval << " period = " << period
<< " buckets = " << m_Endpoints << " type = " << this->name());
}
}
return 1.0;
}
void CAdaptiveBucketing::maybeSplitBucket() {
double totalLargeErrorCount{std::accumulate(m_LargeErrorCounts.begin(),
m_LargeErrorCounts.end(), 0.0)};
// We do this indepedent of whether we'll split because we also use the
// significances to adjust the bucket update weight.
m_LargeErrorCountPValues = TFloatUInt32PrMinAccumulator{};
for (std::size_t i = 0; i + 1 < m_Endpoints.size(); ++i) {
m_LargeErrorCountPValues.add({this->bucketLargeErrorCountPValue(totalLargeErrorCount, i),
static_cast<std::uint32_t>(i)});
}
// We're choosing the minimum p-value of number of buckets independent
// statistics so the significance is one minus the chance that all of
// them are greater than the observation.
for (auto& candidateSplit : m_LargeErrorCountPValues) {
candidateSplit.first = common::CTools::oneMinusPowOneMinusX(
candidateSplit.first, static_cast<double>(this->size()));
LOG_TRACE(<< "bucket [" << m_Endpoints[candidateSplit.second] << ","
<< m_Endpoints[candidateSplit.second + 1]
<< ") split p-value = " << candidateSplit.first);
}
m_LargeErrorCountPValues.sort();
double period{m_Endpoints[m_Endpoints.size() - 1] - m_Endpoints[0]};
double pValue;
std::size_t bucketToSplit;
std::tie(pValue, bucketToSplit) = m_LargeErrorCountPValues[0];
// Split if we have received enough data, can divide the interval further
// and the significance is high.
if (totalLargeErrorCount >= MINIMUM_LARGE_ERROR_COUNT_TO_SPLIT &&
m_LargeErrorCountPValues.count() > 0 &&
static_cast<double>(this->size() + 1) * m_MinimumBucketLength < period &&
2 * this->size() < 3 * m_TargetSize && pValue < HIGH_SIGNIFICANCE) {
this->splitBucket(bucketToSplit);
}
}
void CAdaptiveBucketing::splitBucket(std::size_t bucket) {
double leftEnd{m_Endpoints[bucket]};
double rightEnd{m_Endpoints[bucket + 1]};
LOG_TRACE(<< "splitting [" << leftEnd << "," << rightEnd << ")");
double midpoint{(leftEnd + rightEnd) / 2.0};
double centre{m_Centres[bucket]};
double offset{std::min(centre - leftEnd, rightEnd - centre) / 2.0};
m_Endpoints.insert(m_Endpoints.begin() + bucket + 1, midpoint);
m_Centres[bucket] = std::max(centre + offset, midpoint);
m_Centres.insert(m_Centres.begin() + bucket, std::min(centre - offset, midpoint));
m_LargeErrorCounts[bucket] /= 1.75;
m_LargeErrorCounts.insert(m_LargeErrorCounts.begin() + bucket,
m_LargeErrorCounts[bucket]);
this->split(bucket);
}
const double CAdaptiveBucketing::LARGE_ERROR_STANDARD_DEVIATIONS{3.0};
}
}
}