lib/maths/time_series/CTimeSeriesModel.cc (2,563 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <maths/time_series/CTimeSeriesModel.h>
#include <core/CAllocationStrategy.h>
#include <core/CFunctional.h>
#include <core/CMemoryDef.h>
#include <core/CPersistUtils.h>
#include <core/RestoreMacros.h>
#include <maths/common/CBasicStatistics.h>
#include <maths/common/CBasicStatisticsPersist.h>
#include <maths/common/CModel.h>
#include <maths/common/CModelDetail.h>
#include <maths/common/CMultivariateNormalConjugate.h>
#include <maths/common/CMultivariatePrior.h>
#include <maths/common/COrderings.h>
#include <maths/common/COrderingsSimultaneousSort.h>
#include <maths/common/CPrior.h>
#include <maths/common/CPriorStateSerialiser.h>
#include <maths/common/CTools.h>
#include <maths/common/Constants.h>
#include <maths/common/MathsTypes.h>
#include <maths/time_series/CDecayRateController.h>
#include <maths/time_series/CTimeSeriesDecomposition.h>
#include <maths/time_series/CTimeSeriesDecompositionStateSerialiser.h>
#include <maths/time_series/CTimeSeriesMultibucketFeatureSerialiser.h>
#include <maths/time_series/CTimeSeriesMultibucketFeatures.h>
#include <cmath>
#include <cstddef>
#include <limits>
#include <numeric>
#include <tuple>
namespace ml {
namespace maths {
namespace time_series {
namespace {
using TDoubleVec = std::vector<double>;
using TSizeDoublePr = std::pair<std::size_t, double>;
using TSizeVec = std::vector<std::size_t>;
using TDouble4Vec = core::CSmallVector<double, 4>;
using TDouble10Vec = core::CSmallVector<double, 10>;
using TDouble10VecVec = std::vector<TDouble10Vec>;
using TDouble10Vec1Vec = core::CSmallVector<TDouble10Vec, 1>;
using TDouble10Vec2Vec = core::CSmallVector<TDouble10Vec, 2>;
using TSize10Vec = core::CSmallVector<std::size_t, 10>;
using TDoubleDoublePr = std::pair<double, double>;
using TSizeDoublePr10Vec = core::CSmallVector<TSizeDoublePr, 10>;
using TCalculation2Vec = core::CSmallVector<maths_t::EProbabilityCalculation, 2>;
using TTail10Vec = core::CSmallVector<maths_t::ETail, 10>;
using TMeanAccumulator = common::CBasicStatistics::SSampleMean<double>::TAccumulator;
using TUnivariatePriorPtr = std::shared_ptr<common::CPrior>;
using TMultivariatePriorCPtrSizePr1Vec = CTimeSeriesCorrelations::TMultivariatePriorCPtrSizePr1Vec;
//! The decay rate controllers we maintain.
enum EDecayRateController {
E_TrendControl = 0,
E_ResidualControl,
E_NumberControls
};
const std::size_t MAXIMUM_CORRELATIONS{5000};
const double MINIMUM_CORRELATE_PRIOR_SAMPLE_COUNT{24.0};
const TSize10Vec NOTHING_TO_MARGINALIZE;
const TSizeDoublePr10Vec NOTHING_TO_CONDITION;
//! Expand \p calculation for computing multibucket anomalies.
TCalculation2Vec expand(maths_t::EProbabilityCalculation calculation) {
switch (calculation) {
case maths_t::E_TwoSided:
return {maths_t::E_OneSidedBelow, maths_t::E_OneSidedAbove};
case maths_t::E_OneSidedBelow:
case maths_t::E_OneSidedAbove:
return {calculation};
}
return {};
}
//! Aggregate one or more feature probabilities.
double aggregateFeatureProbabilities(const TDouble4Vec& probabilities, double correlation) {
if (probabilities.size() > 1) {
common::CJointProbabilityOfLessLikelySamples pJoint{correlation};
for (auto p : probabilities) {
pJoint.add(p);
}
double result;
pJoint.calculate(result);
return result;
}
return probabilities[0];
}
std::shared_ptr<common::CPrior> conditional(const common::CMultivariatePrior& prior,
std::size_t dimension,
const core::CSmallVector<double, 10>& value) {
std::size_t dimensions{prior.dimension()};
TSizeDoublePr10Vec condition(dimensions - 1);
for (std::size_t i = 0, j = 0; i < dimensions; ++i) {
if (i != dimension) {
condition[j++] = std::make_pair(i, value[i]);
}
}
return prior.univariate(NOTHING_TO_MARGINALIZE, condition).first;
}
const std::string VERSION_7_3_TAG("7.3");
const std::string VERSION_7_11_TAG("7.11");
// Models
// Version >= 6.3
const std::string ID_6_3_TAG{"a"};
const std::string IS_NON_NEGATIVE_6_3_TAG{"b"};
const std::string IS_FORECASTABLE_6_3_TAG{"c"};
//const std::string RNG_6_3_TAG{"d"}; Removed in 6.5
const std::string CONTROLLER_6_3_TAG{"e"};
const core::TPersistenceTag TREND_MODEL_6_3_TAG{"f", "trend_model"};
const core::TPersistenceTag RESIDUAL_MODEL_6_3_TAG{"g", "residual_model"};
const std::string ANOMALY_MODEL_6_3_TAG{"h"};
const std::string MULTIBUCKET_FEATURE_6_3_TAG{"m"};
const std::string MULTIBUCKET_FEATURE_MODEL_6_3_TAG{"n"};
// Anomaly model
// Version >= 7.3
const std::string LAST_ANOMALOUS_BUCKET_TIME_7_3_TAG{"d"};
// Version >= 6.5
const std::string ANOMALY_6_5_TAG{"e"};
const std::string ANOMALY_FEATURE_MODEL_6_5_TAG{"f"};
// Version < 6.5
// Discarded on state upgrade because features have changed.
// Anomaly only restored for 6.5 state.
const std::string FIRST_ANOMALOUS_BUCKET_TIME_6_5_TAG{"a"};
const std::string SUM_PREDICTION_ERROR_6_5_TAG{"b"};
const std::string MEAN_ABS_PREDICTION_ERROR_6_5_TAG{"c"};
// Correlations
const std::string K_MOST_CORRELATED_TAG{"a"};
const std::string CORRELATED_LOOKUP_TAG{"b"};
const std::string CORRELATION_MODELS_TAG{"c"};
// Correlations nested
const std::string FIRST_CORRELATE_ID_TAG{"a"};
const std::string SECOND_CORRELATE_ID_TAG{"b"};
const std::string CORRELATION_MODEL_TAG{"c"};
const std::string CORRELATION_TAG{"d"};
namespace forecast {
const std::string INFO_INSUFFICIENT_HISTORY{"Insufficient history to forecast"};
const std::string INFO_COULD_NOT_FORECAST_FOR_FULL_DURATION{
"Unable to accurately forecast for the full requested time interval"};
const std::string ERROR_MULTIVARIATE{"Forecast not supported for multivariate features"};
}
namespace outliers {
constexpr double MINIMUM_WEIGHT{0.01};
const double MAXIMUM_P_VALUE{1e-3};
const double MINIMUM_P_VALUE{1e-5};
const double LOG_MAXIMUM_P_VALUE{std::log(MAXIMUM_P_VALUE)};
const double LOG_MINIMUM_P_VALUE{std::log(MINIMUM_P_VALUE)};
const double MINUS_LOG_TOLERANCE{
-std::log(1.0 - 100.0 * std::numeric_limits<double>::epsilon())};
//! Derate the minimum outlier weight.
double deratedMinimumWeight(double derate) {
derate = common::CTools::truncate(derate, 0.0, 1.0);
return MINIMUM_WEIGHT + (0.5 - MINIMUM_WEIGHT) * derate;
}
//! Compute the one tail p-value of \p value.
double computePValue(const common::CPrior& prior,
const maths_t::TDoubleWeightsAry& weights,
double value) {
double lowerBound;
double upperBound;
if (prior.minusLogJointCdf({value}, {weights}, lowerBound, upperBound) == false) {
return 1.0;
}
if (upperBound >= MINUS_LOG_TOLERANCE) {
double f{std::exp(-(lowerBound + upperBound) / 2.0)};
return std::min(f, 1.0 - f);
}
if (prior.minusLogJointCdfComplement({value}, {weights}, lowerBound, upperBound) == false) {
return 1.0;
}
return std::exp(-(lowerBound + upperBound) / 2.0);
}
double weight(const common::CPrior& prior,
const maths_t::TDoubleWeightsAry& weights,
double derate,
double value) {
double pValue{computePValue(prior, weights, value)};
if (pValue >= MAXIMUM_P_VALUE) {
return 1.0;
}
double minimumWeight{deratedMinimumWeight(derate)};
if (pValue <= MINIMUM_P_VALUE) {
return minimumWeight;
}
// We logarithmically interpolate between 1.0 and the minimum weight
// on the interval [MAXIMUM_P_VALUE, MINIMUM_P_VALUE].
double maximumExponent{-std::log(minimumWeight) / LOG_MINIMUM_P_VALUE /
(LOG_MINIMUM_P_VALUE - LOG_MAXIMUM_P_VALUE)};
double logPValue{std::log(pValue)};
double weight{std::exp(-maximumExponent * logPValue * (logPValue - LOG_MAXIMUM_P_VALUE))};
LOG_TRACE(<< "sample = " << value << " p-value = " << pValue << ", weight = " << weight);
return common::CMathsFuncs::isNan(weight) ? 1.0 : weight;
}
}
}
//! \brief A model of anomalous sections of a time series.
class CTimeSeriesAnomalyModel {
public:
CTimeSeriesAnomalyModel();
CTimeSeriesAnomalyModel(core_t::TTime bucketLength, double decayRate);
//! Update the anomaly with prediction error and probability.
//!
//! Extends the current anomaly if \p probability is small; otherwise,
//! it closes it. If the time series is currently anomalous, update the
//! model with the anomaly feature vector.
void sample(const common::CModelProbabilityParams& params,
core_t::TTime time,
double error,
double bucketProbability,
double overallProbability);
//! Reset the mean error norm.
void reset();
//! If the time series is currently anomalous, compute the anomalousness
//! of the anomaly feature vector.
TDoubleDoublePr probability(double bucketProbability, double overallProbability) const;
//! Age the model to account for \p time elapsed time.
void propagateForwardsByTime(double time);
//! Compute a checksum for this object.
std::uint64_t checksum(std::uint64_t seed) const;
//! Debug the memory used by this object.
void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const;
//! Get the memory used by this object.
std::size_t memoryUsage() const;
//! Initialize reading state from \p traverser.
bool acceptRestoreTraverser(const common::SModelRestoreParams& params,
core::CStateRestoreTraverser& traverser);
//! Persist by passing information to \p inserter.
void acceptPersistInserter(core::CStatePersistInserter& inserter) const;
double sumPredictionError() const {
if (m_Anomaly == std::nullopt) {
return 0.0;
}
return m_Anomaly->sumPredictionError();
}
//! Return the length of the anomaly until \p time in the number of buckets.
//! If this is the first bucket with the anomaly, the length is 1.
std::size_t length(core_t::TTime time) const {
if (!m_Anomaly) {
return static_cast<std::size_t>(0);
}
return m_Anomaly->length(time / m_BucketLength) + 1;
}
private:
using TMultivariateNormalConjugate = common::CMultivariateNormalConjugate<2>;
using TMultivariateNormalConjugateVec = std::vector<TMultivariateNormalConjugate>;
//! \brief Extracts features related to anomalous time periods.
class CAnomaly {
public:
//! See core::CMemory.
static constexpr bool dynamicSizeAlwaysZero() { return true; }
public:
CAnomaly() = default;
explicit CAnomaly(core_t::TTime time)
: m_FirstAnomalousBucketTime(time), m_LastAnomalousBucketTime(time) {}
//! Add a result to the anomaly.
void update(core_t::TTime time, double predictionError) {
m_LastAnomalousBucketTime = time;
m_SumPredictionError += predictionError;
m_MeanAbsPredictionError.add(std::fabs(predictionError));
}
//! Get the weight to apply to this anomaly on update.
double weight() const {
core_t::TTime length{m_LastAnomalousBucketTime - m_FirstAnomalousBucketTime};
return 1.0 / (1.0 + std::max(static_cast<double>(length), 0.0));
}
//! Check if this anomaly is positive or negative.
bool positive() const { return m_SumPredictionError > 0.0; }
//! Get the feature vector for this anomaly.
TDouble10Vec features() const {
return {static_cast<double>(m_LastAnomalousBucketTime - m_FirstAnomalousBucketTime),
common::CBasicStatistics::mean(m_MeanAbsPredictionError)};
}
//! Compute a checksum for this object.
std::uint64_t checksum(std::uint64_t seed) const {
seed = common::CChecksum::calculate(seed, m_FirstAnomalousBucketTime);
seed = common::CChecksum::calculate(seed, m_LastAnomalousBucketTime);
seed = common::CChecksum::calculate(seed, m_SumPredictionError);
return common::CChecksum::calculate(seed, m_MeanAbsPredictionError);
}
//! Return the number of seconds from the first anomalous bucket until \p time.
std::size_t length(core_t::TTime time) const {
return static_cast<std::size_t>(time - m_FirstAnomalousBucketTime);
}
//! Initialize reading state from \p traverser.
bool acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
do {
const std::string& name{traverser.name()};
RESTORE_BUILT_IN(FIRST_ANOMALOUS_BUCKET_TIME_6_5_TAG, m_FirstAnomalousBucketTime)
RESTORE_BUILT_IN(LAST_ANOMALOUS_BUCKET_TIME_7_3_TAG, m_LastAnomalousBucketTime)
RESTORE_BUILT_IN(SUM_PREDICTION_ERROR_6_5_TAG, m_SumPredictionError)
RESTORE(MEAN_ABS_PREDICTION_ERROR_6_5_TAG,
m_MeanAbsPredictionError.fromDelimited(traverser.value()))
} while (traverser.next());
return true;
}
//! Persist by passing information to \p inserter.
void acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(FIRST_ANOMALOUS_BUCKET_TIME_6_5_TAG, m_FirstAnomalousBucketTime);
inserter.insertValue(LAST_ANOMALOUS_BUCKET_TIME_7_3_TAG, m_LastAnomalousBucketTime);
inserter.insertValue(SUM_PREDICTION_ERROR_6_5_TAG, m_SumPredictionError,
core::CIEEE754::E_SinglePrecision);
inserter.insertValue(MEAN_ABS_PREDICTION_ERROR_6_5_TAG,
m_MeanAbsPredictionError.toDelimited());
}
double sumPredictionError() const { return m_SumPredictionError; }
private:
//! The time at which the first anomalous bucket was detected.
core_t::TTime m_FirstAnomalousBucketTime{0};
//! The time at which the last anomalous bucket was detected.
core_t::TTime m_LastAnomalousBucketTime{0};
//! The sum of the errors in our base model predictions for the
//! anomaly.
double m_SumPredictionError{0.0};
//! The mean of minus the log probabilities from our base model
//! in the anomaly.
TMeanAccumulator m_MeanAbsPredictionError;
};
using TOptionalAnomaly = std::optional<CAnomaly>;
private:
//! A unit weight.
static const maths_t::TDouble10VecWeightsAry1Vec UNIT;
private:
//! Update the anomaly model with a sample of the current feature vector.
void sample(const common::CModelProbabilityParams& params, double weight);
//! Compute the probability of the anomaly feature vector.
bool anomalyProbability(double& result) const;
//! Get the largest probability the model counts as anomalous.
double largestAnomalyProbability() const {
return 2.0 * common::LARGEST_SIGNIFICANT_PROBABILITY;
}
//! Get the scaled time.
core_t::TTime scale(core_t::TTime time) const {
return time / m_BucketLength;
}
private:
//! The data bucketing interval.
core_t::TTime m_BucketLength{0};
//! The current anomaly (if there is one).
TOptionalAnomaly m_Anomaly;
//! The model describing features of anomalous time periods.
TMultivariateNormalConjugateVec m_AnomalyFeatureModels;
};
CTimeSeriesAnomalyModel::CTimeSeriesAnomalyModel() {
m_AnomalyFeatureModels.reserve(2);
m_AnomalyFeatureModels.push_back(
TMultivariateNormalConjugate::nonInformativePrior(maths_t::E_ContinuousData));
m_AnomalyFeatureModels.push_back(
TMultivariateNormalConjugate::nonInformativePrior(maths_t::E_ContinuousData));
}
CTimeSeriesAnomalyModel::CTimeSeriesAnomalyModel(core_t::TTime bucketLength, double decayRate)
: m_BucketLength(bucketLength) {
m_AnomalyFeatureModels.reserve(2);
m_AnomalyFeatureModels.push_back(TMultivariateNormalConjugate::nonInformativePrior(
maths_t::E_ContinuousData, this->largestAnomalyProbability() * decayRate / 2.0));
m_AnomalyFeatureModels.push_back(TMultivariateNormalConjugate::nonInformativePrior(
maths_t::E_ContinuousData, this->largestAnomalyProbability() * decayRate / 2.0));
}
void CTimeSeriesAnomalyModel::sample(const common::CModelProbabilityParams& params,
core_t::TTime time,
double predictionError,
double bucketProbability,
double overallProbability) {
if (overallProbability < this->largestAnomalyProbability()) {
if (m_Anomaly == std::nullopt) {
m_Anomaly.emplace(CAnomaly{this->scale(time)});
}
if (bucketProbability < this->largestAnomalyProbability()) {
m_Anomaly->update(this->scale(time), predictionError);
this->sample(params, m_Anomaly->weight());
}
} else if (m_Anomaly != std::nullopt) {
this->sample(params, 1.0 - m_Anomaly->weight());
m_Anomaly.reset();
}
}
void CTimeSeriesAnomalyModel::sample(const common::CModelProbabilityParams& params,
double weight) {
// In case a rule triggered to skip model update,
// this is the bit that we want to skip.
// The rest of sample is necessary as it creates
// the feature vector related to the current anomaly.
double initialCountWeight{params.initialCountWeight()};
if (initialCountWeight > 0.0) {
auto& model = m_AnomalyFeatureModels[m_Anomaly->positive() ? 0 : 1];
model.addSamples({m_Anomaly->features()},
{maths_t::countWeight(weight * initialCountWeight, 2)});
}
}
void CTimeSeriesAnomalyModel::reset() {
for (auto& model : m_AnomalyFeatureModels) {
model = TMultivariateNormalConjugate::nonInformativePrior(
maths_t::E_ContinuousData, model.decayRate());
}
}
TDoubleDoublePr CTimeSeriesAnomalyModel::probability(double bucketProbability,
double overallProbability) const {
double anomalyProbability{1.0};
if (overallProbability < this->largestAnomalyProbability() &&
this->anomalyProbability(anomalyProbability)) {
// We logarithmically interpolate the anomaly probability and the
// probability we've determined for the bucket. This determines
// the weight assigned to the anomaly probability. We arrange for
// the following properties for the weight (alpha) as a function
// of the bucket and anomaly probabilities:
// 1) The weight function is continuous,
// 2) For small bucket probabilities we take the geometric mean
// (which corresponds to a weight equal to 0.5),
// 3) For fixed anomaly probability the derivative of the weight
// w.r.t. minus log the bucket probability is negative and
// approaches 0.0 at the largest anomaly probability, and
// 4) For fixed bucket probability the derivative of the weight
// w.r.t. minus log the anomaly probability is positive.
// Note that condition 1) means we won't fall into the case that
// a small perturbation in input data can lead to a large change in
// results, condition 2) means that we will always correct anomalous
// bucket probabilities based on how unusual they are in the context
// of anomalous buckets we've seen before, condition 3) means that
// the correction is continuous at the decision boundary for whether
// to correct for the anomaly probability and is also important to
// avoid the case that small perturbations lead to significant result
// changes, finally condition 4) means that if the anomaly features
// are highly unusual we can still assign the bucket a low probability
// even if we don't think the bucket value is particularly unusual.
// We relax the anomaly probability back to 1.0 by a factor lambda
// based on how normal the individual bucket is.
double a{-common::CTools::fastLog(this->largestAnomalyProbability())};
double b{-common::CTools::fastLog(common::SMALL_PROBABILITY)};
double lambda{bucketProbability == 0.0
? 1.0
: std::min(this->largestAnomalyProbability() / bucketProbability, 1.0)};
double logOverallProbability{common::CTools::fastLog(overallProbability)};
double logAnomalyProbability{common::CTools::fastLog(anomalyProbability)};
double x{std::max((b + logOverallProbability) / (b - a), 0.0)};
double y{(1.0 - b / (b - logAnomalyProbability))};
double alpha{0.5 * (1.0 - x + x * y)};
overallProbability = std::exp((1.0 - alpha) * logOverallProbability +
alpha * lambda * logAnomalyProbability);
LOG_TRACE(<< "alpha = " << alpha << ", p(combined) = " << overallProbability);
}
return {overallProbability, anomalyProbability};
}
bool CTimeSeriesAnomalyModel::anomalyProbability(double& result) const {
const auto& model = m_AnomalyFeatureModels[m_Anomaly->positive() ? 0 : 1];
if (m_Anomaly == std::nullopt || model.isNonInformative()) {
return false;
}
double pl;
double pu;
TTail10Vec tail;
if (model.probabilityOfLessLikelySamples(
maths_t::E_OneSidedAbove, {m_Anomaly->features()}, UNIT, pl, pu, tail) == false) {
return false;
}
result = (pl + pu) / 2.0;
LOG_TRACE(<< "features = " << m_Anomaly->features() << " p(anomaly) = " << result);
return true;
}
void CTimeSeriesAnomalyModel::propagateForwardsByTime(double time) {
m_AnomalyFeatureModels[0].propagateForwardsByTime(time);
m_AnomalyFeatureModels[1].propagateForwardsByTime(time);
}
std::uint64_t CTimeSeriesAnomalyModel::checksum(std::uint64_t seed) const {
seed = common::CChecksum::calculate(seed, m_BucketLength);
seed = common::CChecksum::calculate(seed, m_Anomaly);
seed = common::CChecksum::calculate(seed, m_AnomalyFeatureModels[0]);
return common::CChecksum::calculate(seed, m_AnomalyFeatureModels[1]);
}
void CTimeSeriesAnomalyModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CTimeSeriesAnomalyModel");
core::memory_debug::dynamicSize("m_Anomalies", m_Anomaly, mem);
core::memory_debug::dynamicSize("m_AnomalyFeatureModels", m_AnomalyFeatureModels, mem);
}
std::size_t CTimeSeriesAnomalyModel::memoryUsage() const {
return core::memory::dynamicSize(m_Anomaly) +
core::memory::dynamicSize(m_AnomalyFeatureModels);
}
bool CTimeSeriesAnomalyModel::acceptRestoreTraverser(const common::SModelRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
m_BucketLength = core::unwrap_ref(params.s_Params).bucketLength();
if (traverser.name() == VERSION_7_3_TAG) {
std::size_t index{0};
while (traverser.next()) {
const std::string& name{traverser.name()};
RESTORE_SETUP_TEARDOWN(ANOMALY_6_5_TAG, CAnomaly restored,
traverser.traverseSubLevel([&](auto& traverser_) {
return restored.acceptRestoreTraverser(traverser_);
}),
m_Anomaly.emplace(std::move(restored)))
RESTORE(ANOMALY_FEATURE_MODEL_6_5_TAG, traverser.traverseSubLevel([&](auto& traverser_) {
return m_AnomalyFeatureModels[index++].acceptRestoreTraverser(traverser_);
}))
}
} else {
LOG_ERROR(<< "Input error: unsupported state serialization version '"
<< traverser.name()
<< "'. Currently supported minimum version: " << VERSION_7_3_TAG);
return false;
}
return true;
}
void CTimeSeriesAnomalyModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(VERSION_7_3_TAG, "");
if (m_Anomaly) {
inserter.insertLevel(ANOMALY_6_5_TAG, [this](auto& inserter_) {
m_Anomaly->acceptPersistInserter(inserter_);
});
}
inserter.insertLevel(ANOMALY_FEATURE_MODEL_6_5_TAG, [this](auto& inserter_) {
m_AnomalyFeatureModels[0].acceptPersistInserter(inserter_);
});
inserter.insertLevel(ANOMALY_FEATURE_MODEL_6_5_TAG, [this](auto& inserter_) {
m_AnomalyFeatureModels[1].acceptPersistInserter(inserter_);
});
}
const maths_t::TDouble10VecWeightsAry1Vec CTimeSeriesAnomalyModel::UNIT{
maths_t::CUnitWeights::unit<TDouble10Vec>(2)};
CUnivariateTimeSeriesModel::CUnivariateTimeSeriesModel(
const common::CModelParams& params,
std::size_t id,
const CTimeSeriesDecompositionInterface& trendModel,
const common::CPrior& residualModel,
const TDecayRateController2Ary* controllers,
const TMultibucketFeature* multibucketFeature,
bool modelAnomalies)
: common::CModel(params), m_Id(id), m_TrendModel(trendModel.clone()),
m_ResidualModel(residualModel.clone()),
m_MultibucketFeature(multibucketFeature != nullptr ? multibucketFeature->clone()
: nullptr),
m_MultibucketFeatureModel(multibucketFeature != nullptr ? residualModel.clone() : nullptr),
m_AnomalyModel(modelAnomalies ? std::make_unique<CTimeSeriesAnomalyModel>(
params.bucketLength(),
params.decayRate())
: nullptr) {
if (controllers != nullptr) {
m_Controllers = std::make_unique<TDecayRateController2Ary>(*controllers);
}
}
CUnivariateTimeSeriesModel::CUnivariateTimeSeriesModel(const common::SModelRestoreParams& params,
core::CStateRestoreTraverser& traverser)
: common::CModel(params.s_Params) {
if (traverser.traverseSubLevel([&](auto& traverser_) {
return this->acceptRestoreTraverser(params, traverser_);
}) == false) {
traverser.setBadState();
}
}
CUnivariateTimeSeriesModel::~CUnivariateTimeSeriesModel() {
if (m_Correlations != nullptr) {
m_Correlations->removeTimeSeries(m_Id);
}
}
std::size_t CUnivariateTimeSeriesModel::identifier() const {
return m_Id;
}
CUnivariateTimeSeriesModel* CUnivariateTimeSeriesModel::clone(std::size_t id) const {
CUnivariateTimeSeriesModel* result{new CUnivariateTimeSeriesModel{*this, id}};
if (m_Correlations != nullptr) {
result->modelCorrelations(*m_Correlations);
}
return result;
}
CUnivariateTimeSeriesModel* CUnivariateTimeSeriesModel::cloneForPersistence() const {
return new CUnivariateTimeSeriesModel{*this, m_Id};
}
CUnivariateTimeSeriesModel* CUnivariateTimeSeriesModel::cloneForForecast() const {
return new CUnivariateTimeSeriesModel{*this, m_Id, true};
}
bool CUnivariateTimeSeriesModel::isForecastPossible() const {
return m_IsForecastable && !m_ResidualModel->isNonInformative();
}
void CUnivariateTimeSeriesModel::modelCorrelations(CTimeSeriesCorrelations& model) {
m_Correlations = &model;
m_Correlations->addTimeSeries(m_Id, *this);
}
CUnivariateTimeSeriesModel::TSize2Vec1Vec CUnivariateTimeSeriesModel::correlates() const {
TSize2Vec1Vec result;
TSize1Vec correlated;
TSize2Vec1Vec variables;
TMultivariatePriorCPtrSizePr1Vec correlationModels;
TModelCPtr1Vec correlatedTimeSeriesModels;
this->correlationModels(correlated, variables, correlationModels, correlatedTimeSeriesModels);
result.resize(correlated.size(), TSize2Vec(2));
for (std::size_t i = 0; i < correlated.size(); ++i) {
result[i][variables[i][0]] = m_Id;
result[i][variables[i][1]] = correlated[i];
}
return result;
}
void CUnivariateTimeSeriesModel::addBucketValue(const TTimeDouble2VecSizeTrVec& values) {
for (const auto& value : values) {
m_ResidualModel->adjustOffset(
{m_TrendModel->detrend(value.first, value.second[0], 0.0, m_IsNonNegative)},
maths_t::CUnitWeights::SINGLE_UNIT);
}
}
CUnivariateTimeSeriesModel::EUpdateResult
CUnivariateTimeSeriesModel::addSamples(const common::CModelAddSamplesParams& params,
TTimeDouble2VecSizeTrVec samples) {
if (samples.empty()) {
return E_Success;
}
// Update the data characteristics.
m_IsNonNegative = params.isNonNegative();
maths_t::EDataType type{params.type()};
m_ResidualModel->dataType(type);
if (m_MultibucketFeatureModel != nullptr) {
m_MultibucketFeatureModel->dataType(type);
}
m_TrendModel->dataType(type);
EUpdateResult result{this->updateTrend(params, samples)};
auto[residuals, decayRateMultiplier] =
this->updateResidualModels(params, std::move(samples));
// Age the anomaly model. Note that update requires the probability
// to be calculated. This is expensive to compute and so unlike our
// other model components is done in that function.
if (m_AnomalyModel != nullptr) {
m_AnomalyModel->propagateForwardsByTime(params.propagationInterval());
}
// Add the samples to the correlation models if there are any.
if (m_Correlations != nullptr) {
m_Correlations->addSamples(m_Id, params, residuals, decayRateMultiplier);
}
return result;
}
void CUnivariateTimeSeriesModel::skipTime(core_t::TTime gap) {
m_TrendModel->skipTime(gap);
}
CUnivariateTimeSeriesModel::TDouble2Vec
CUnivariateTimeSeriesModel::mode(core_t::TTime time, const TDouble2VecWeightsAry& weights) const {
return {m_ResidualModel->marginalLikelihoodMode(unpack(weights)) +
m_TrendModel->value(time, 0.0, m_IsNonNegative).mean()};
}
CUnivariateTimeSeriesModel::TDouble2Vec1Vec
CUnivariateTimeSeriesModel::correlateModes(core_t::TTime time,
const TDouble2VecWeightsAry1Vec& weights) const {
TDouble2Vec1Vec result;
TSize1Vec correlated;
TSize2Vec1Vec variables;
TMultivariatePriorCPtrSizePr1Vec correlationModels;
TModelCPtr1Vec correlatedTimeSeriesModels;
if (this->correlationModels(correlated, variables, correlationModels,
correlatedTimeSeriesModels)) {
result.resize(correlated.size(), TDouble10Vec(2));
double baseline[2];
baseline[0] = m_TrendModel->value(time, 0.0, m_IsNonNegative).mean();
for (std::size_t i = 0; i < correlated.size(); ++i) {
baseline[1] = correlatedTimeSeriesModels[i]
->m_TrendModel
->value(time, 0.0, correlatedTimeSeriesModels[i]->m_IsNonNegative)
.mean();
TDouble10Vec mode(correlationModels[i].first->marginalLikelihoodMode(
CMultivariateTimeSeriesModel::unpack(weights[i])));
std::size_t v0{variables[i][0]};
std::size_t v1{variables[i][1]};
result[i][v0] = baseline[0] + mode[v0];
result[i][v1] = baseline[1] + mode[v1];
}
}
return result;
}
CUnivariateTimeSeriesModel::TDouble2Vec1Vec
CUnivariateTimeSeriesModel::residualModes(const TDouble2VecWeightsAry& weights) const {
TDouble2Vec1Vec result;
TDouble1Vec modes(m_ResidualModel->marginalLikelihoodModes(unpack(weights)));
result.reserve(modes.size());
for (auto mode : modes) {
result.push_back({mode});
}
return result;
}
void CUnivariateTimeSeriesModel::detrend(const TTime2Vec1Vec& time,
double confidenceInterval,
TDouble2Vec1Vec& value) const {
if (value.empty()) {
return;
}
if (value[0].size() == 1) {
value[0][0] = m_TrendModel->detrend(time[0][0], value[0][0],
confidenceInterval, m_IsNonNegative);
return;
}
TSize1Vec correlated;
TSize2Vec1Vec variables;
TMultivariatePriorCPtrSizePr1Vec correlationModels;
TModelCPtr1Vec correlatedTimeSeriesModels;
if (this->correlationModels(correlated, variables, correlationModels,
correlatedTimeSeriesModels)) {
for (std::size_t i = 0; i < variables.size(); ++i) {
if (value[i].empty() == false) {
std::size_t v0{variables[i][0]};
std::size_t v1{variables[i][1]};
value[i][v0] = m_TrendModel->detrend(
time[i][v0], value[i][v0], confidenceInterval, m_IsNonNegative);
value[i][v1] = correlatedTimeSeriesModels[i]->m_TrendModel->detrend(
time[i][v1], value[i][v1], confidenceInterval,
correlatedTimeSeriesModels[i]->m_IsNonNegative);
}
}
}
}
CUnivariateTimeSeriesModel::TDouble2Vec
CUnivariateTimeSeriesModel::predict(core_t::TTime time,
const TSizeDoublePr1Vec& correlatedValue,
TDouble2Vec hint) const {
double correlateCorrection{0.0};
if (correlatedValue.empty() == false) {
TSize1Vec correlated{correlatedValue[0].first};
TSize2Vec1Vec variables;
TMultivariatePriorCPtrSizePr1Vec correlationModel;
TModelCPtr1Vec correlatedTimeSeriesModels;
if (m_Correlations->correlationModels(m_Id, correlated, variables, correlationModel,
correlatedTimeSeriesModels)) {
double sample{correlatedTimeSeriesModels[0]->m_TrendModel->detrend(
time, correlatedValue[0].second, 0.0,
correlatedTimeSeriesModels[0]->m_IsNonNegative)};
TSize10Vec marginalize{variables[0][1]};
TSizeDoublePr10Vec condition{{variables[0][1], sample}};
const common::CMultivariatePrior* joint{correlationModel[0].first};
TPriorPtr margin{
joint->univariate(marginalize, NOTHING_TO_CONDITION).first};
TPriorPtr conditional{
joint->univariate(NOTHING_TO_MARGINALIZE, condition).first};
correlateCorrection = conditional->marginalLikelihoodMean() -
margin->marginalLikelihoodMean();
}
}
double trend{0.0};
if (m_TrendModel->initialized()) {
trend = m_TrendModel->value(time, 0.0, m_IsNonNegative).mean();
}
if (hint.size() == 1) {
hint[0] = m_TrendModel->detrend(time, hint[0], 0.0, m_IsNonNegative);
}
double median{
m_ResidualModel->isNonInformative()
? m_ResidualModel->marginalLikelihoodMean()
: (hint.empty()
? common::CBasicStatistics::mean(
m_ResidualModel->marginalLikelihoodConfidenceInterval(0.0))
: m_ResidualModel->nearestMarginalLikelihoodMean(hint[0]))};
double result{trend + median + correlateCorrection};
return {m_IsNonNegative ? std::max(result, 0.0) : result};
}
CUnivariateTimeSeriesModel::TDouble2Vec3Vec
CUnivariateTimeSeriesModel::confidenceInterval(core_t::TTime time,
double confidenceInterval,
const TDouble2VecWeightsAry& weights_) const {
if (m_ResidualModel->isNonInformative()) {
return {};
}
double trend{m_TrendModel->initialized()
? m_TrendModel->value(time, 0.0, m_IsNonNegative).mean()
: 0.0};
TDoubleWeightsAry weights(unpack(weights_));
double median{common::CBasicStatistics::mean(
m_ResidualModel->marginalLikelihoodConfidenceInterval(0.0, weights))};
TDoubleDoublePr interval{m_ResidualModel->marginalLikelihoodConfidenceInterval(
confidenceInterval, weights)};
double result[]{trend + interval.first, trend + median, trend + interval.second};
return {{m_IsNonNegative ? std::max(result[0], 0.0) : result[0]},
{m_IsNonNegative ? std::max(result[1], 0.0) : result[1]},
{m_IsNonNegative ? std::max(result[2], 0.0) : result[2]}};
}
bool CUnivariateTimeSeriesModel::forecast(core_t::TTime firstDataTime,
core_t::TTime lastDataTime,
core_t::TTime startTime,
core_t::TTime endTime,
double confidenceInterval,
const TDouble2Vec& minimum_,
const TDouble2Vec& maximum_,
const common::TForecastPushDatapointFunc& forecastPushDataPointFunc,
std::string& messageOut) {
core_t::TTime horizon{std::min(lastDataTime + (lastDataTime - firstDataTime),
lastDataTime + m_TrendModel->maximumForecastInterval())};
if (m_ResidualModel->isNonInformative() || startTime >= horizon) {
messageOut = forecast::INFO_INSUFFICIENT_HISTORY;
return true;
}
if (endTime > horizon) {
// Truncate to the forecast horizon
endTime = horizon;
messageOut = forecast::INFO_COULD_NOT_FORECAST_FOR_FULL_DURATION;
}
using TDouble3Vec = core::CSmallVector<double, 3>;
core_t::TTime bucketLength{this->params().bucketLength()};
double minimum{m_IsNonNegative ? std::max(minimum_[0], 0.0) : minimum_[0]};
double maximum{m_IsNonNegative ? std::max(maximum_[0], 0.0) : maximum_[0]};
auto writer = [&](core_t::TTime time, const TDouble3Vec& prediction) {
common::SErrorBar errorBar{
time, bucketLength,
common::CTools::truncate(prediction[0], minimum,
maximum + prediction[0] - prediction[1]),
common::CTools::truncate(prediction[1], minimum, maximum),
common::CTools::truncate(
prediction[2], minimum + prediction[2] - prediction[1], maximum)};
forecastPushDataPointFunc(errorBar);
};
m_TrendModel->forecast(startTime, endTime, bucketLength, confidenceInterval,
this->params().minimumSeasonalVarianceScale(),
m_IsNonNegative, writer);
return true;
}
bool CUnivariateTimeSeriesModel::probability(const common::CModelProbabilityParams& params,
const TTime2Vec1Vec& time,
const TDouble2Vec1Vec& value,
common::SModelProbabilityResult& result) const {
result = common::SModelProbabilityResult{};
if (value.empty()) {
return true;
}
return value[0].size() == 1
? this->uncorrelatedProbability(params, time, value, result)
: this->correlatedProbability(params, time, value, result);
}
bool CUnivariateTimeSeriesModel::uncorrelatedProbability(
const common::CModelProbabilityParams& params,
const TTime2Vec1Vec& time_,
const TDouble2Vec1Vec& value,
common::SModelProbabilityResult& result) const {
maths_t::EProbabilityCalculation calculation{params.calculation(0)};
maths_t::TDoubleWeightsAry1Vec weights{unpack(params.weights()[0])};
TDouble4Vec probabilities;
common::SModelProbabilityResult::TFeatureProbability4Vec featureProbabilities;
double pl;
double pu;
maths_t::ETail tail;
core_t::TTime time{time_[0][0]};
TDouble1Vec sample{m_TrendModel->detrend(
time, value[0][0], params.seasonalConfidenceInterval(), m_IsNonNegative)};
if (m_ResidualModel->probabilityOfLessLikelySamples(calculation, sample,
weights, pl, pu, tail)) {
LOG_TRACE(<< "P(" << sample << " | weight = " << weights
<< ", time = " << time << ") = " << (pl + pu) / 2.0);
double pSingleBucket{(pl + pu) / 2.0};
probabilities.push_back(pSingleBucket);
featureProbabilities.emplace_back(
common::SModelProbabilityResult::E_SingleBucketProbability, pSingleBucket);
if (pSingleBucket < common::LARGEST_SIGNIFICANT_PROBABILITY) {
result.s_AnomalyScoreExplanation.s_SingleBucketImpact =
static_cast<int>(std::round(-std::log10(pSingleBucket)));
}
if (maths_t::seasonalVarianceScale(weights[0]) > 1.0) {
result.s_AnomalyScoreExplanation.s_HighVariancePenalty = true;
}
if (maths_t::countVarianceScale(weights[0]) > 1.0) {
result.s_AnomalyScoreExplanation.s_IncompleteBucketPenalty = true;
}
} else {
LOG_ERROR(<< "Failed to compute P(" << sample
<< " | weight = " << weights << ", time = " << time << ")");
return false;
}
double correlation{0.0};
if (m_MultibucketFeatureModel != nullptr && params.useMultibucketFeatures()) {
double pMultiBucket{1.0};
TDouble1Vec feature;
std::tie(feature, std::ignore) = m_MultibucketFeature->value();
if (feature.empty() == false) {
for (auto calculation_ : expand(calculation)) {
maths_t::ETail dummy;
if (m_MultibucketFeatureModel->probabilityOfLessLikelySamples(
calculation_, feature,
maths_t::CUnitWeights::SINGLE_UNIT, pl, pu, dummy)) {
LOG_TRACE(<< "P(" << feature << ") = " << (pl + pu) / 2.0);
} else {
LOG_ERROR(<< "Failed to compute P(" << feature << ")");
return false;
}
pMultiBucket = std::min(pMultiBucket, (pl + pu) / 2.0);
}
correlation = m_MultibucketFeature->correlationWithBucketValue();
}
if (pMultiBucket < common::LARGEST_SIGNIFICANT_PROBABILITY) {
result.s_AnomalyScoreExplanation.s_MultiBucketImpact =
static_cast<int>(std::round(-std::log10(pMultiBucket)));
}
probabilities.push_back(pMultiBucket);
featureProbabilities.emplace_back(
common::SModelProbabilityResult::E_MultiBucketProbability, pMultiBucket);
}
double pOverall{aggregateFeatureProbabilities(probabilities, correlation)};
if (m_AnomalyModel != nullptr && params.useAnomalyModel()) {
TDouble2Vec seasonalWeight;
this->seasonalWeight(0.0, time, seasonalWeight);
double residual{
(sample[0] - m_ResidualModel->nearestMarginalLikelihoodMean(sample[0])) /
std::max(std::sqrt(seasonalWeight[0]), 1.0)};
double pSingleBucket{probabilities[0]};
m_AnomalyModel->sample(params, time, residual, pSingleBucket, pOverall);
double pAnomaly;
std::tie(pOverall, pAnomaly) = m_AnomalyModel->probability(pSingleBucket, pOverall);
if (pAnomaly < common::LARGEST_SIGNIFICANT_PROBABILITY) {
result.s_AnomalyScoreExplanation.s_AnomalyType =
(m_AnomalyModel->sumPredictionError() > (0.0))
? common::SAnomalyScoreExplanation::E_SPIKE
: common::SAnomalyScoreExplanation::E_DIP;
result.s_AnomalyScoreExplanation.s_AnomalyLength = m_AnomalyModel->length(time);
result.s_AnomalyScoreExplanation.s_AnomalyCharacteristicsImpact =
static_cast<int>(std::round(-std::log10(pAnomaly)));
}
probabilities.push_back(pAnomaly);
featureProbabilities.emplace_back(
common::SModelProbabilityResult::E_AnomalyModelProbability, pAnomaly);
}
if (pOverall < common::LARGEST_SIGNIFICANT_PROBABILITY) {
LOG_TRACE(<< "Computing confidence bounds");
TDouble2Vec3Vec interval(this->confidenceInterval(
time, CModel::DEFAULT_BOUNDS_PERCENTILE, params.weights()[0]));
result.s_AnomalyScoreExplanation.s_LowerConfidenceBound = interval[0][0];
result.s_AnomalyScoreExplanation.s_TypicalValue = interval[1][0];
result.s_AnomalyScoreExplanation.s_UpperConfidenceBound = interval[2][0];
}
result.s_AnomalyScoreExplanation.s_MultimodalDistribution =
m_ResidualModel->isSelectedModelMultimodal();
LOG_TRACE(<< "Multimodel distribution: "
<< result.s_AnomalyScoreExplanation.s_MultimodalDistribution);
result.s_Probability = pOverall;
result.s_FeatureProbabilities = std::move(featureProbabilities);
result.s_Tail = {tail};
return true;
}
bool CUnivariateTimeSeriesModel::correlatedProbability(
const common::CModelProbabilityParams& params,
const TTime2Vec1Vec& time,
const TDouble2Vec1Vec& value,
common::SModelProbabilityResult& result) const {
TSize1Vec correlated;
TSize2Vec1Vec variables;
TMultivariatePriorCPtrSizePr1Vec correlationModels;
TModelCPtr1Vec correlatedTimeSeriesModels;
if (this->correlationModels(correlated, variables, correlationModels,
correlatedTimeSeriesModels) == false) {
return false;
}
double neff{effectiveCount(variables.size())};
common::CProbabilityOfExtremeSample aggregator;
common::CBasicStatistics::COrderStatisticsStack<double, 1> minProbability;
// Declared outside the loop to minimize the number of times they are created.
maths_t::EProbabilityCalculation calculation{params.calculation(0)};
TDouble10Vec1Vec sample{TDouble10Vec(2)};
maths_t::TDouble10VecWeightsAry1Vec weights{
maths_t::CUnitWeights::unit<TDouble10Vec>(2)};
TDouble10Vec2Vec pli;
TDouble10Vec2Vec pui;
TTail10Vec ti;
core_t::TTime mostAnomalousTime{0};
double mostAnomalousSample{0.0};
TPriorPtr mostAnomalousCorrelationModel;
TTail2Vec tail;
bool conditional{false};
TSize1Vec mostAnomalousCorrelate;
TSize1Vec correlateIndices;
if (params.mostAnomalousCorrelate() != std::nullopt) {
if (*params.mostAnomalousCorrelate() >= variables.size()) {
LOG_ERROR(<< "Unexpected correlate " << *params.mostAnomalousCorrelate());
return false;
}
correlateIndices.push_back(*params.mostAnomalousCorrelate());
} else {
correlateIndices.resize(variables.size());
std::iota(correlateIndices.begin(), correlateIndices.end(), 0);
}
for (std::size_t i = 0; i < correlateIndices.size(); ++i) {
if (value[i].empty()) {
aggregator.add(1.0, neff);
} else {
std::size_t correlateIndex{correlateIndices[i]};
std::size_t v0{variables[correlateIndex][0]};
std::size_t v1{variables[correlateIndex][1]};
TDecompositionPtr trendModels[2];
trendModels[v0] = m_TrendModel;
trendModels[v1] = correlatedTimeSeriesModels[correlateIndex]->m_TrendModel;
const auto& correlationModel = correlationModels[correlateIndex].first;
bool isNonNegative[2];
isNonNegative[v0] = m_IsNonNegative;
isNonNegative[v1] = correlatedTimeSeriesModels[correlateIndex]->m_IsNonNegative;
sample[0][0] = trendModels[0]->detrend(time[i][0], value[i][0],
params.seasonalConfidenceInterval(),
isNonNegative[0]);
sample[0][1] = trendModels[1]->detrend(time[i][1], value[i][1],
params.seasonalConfidenceInterval(),
isNonNegative[1]);
weights[0] = CMultivariateTimeSeriesModel::unpack(params.weights()[i]);
if (correlationModel->probabilityOfLessLikelySamples(
calculation, sample, weights, {v0}, pli, pui, ti)) {
LOG_TRACE(<< "Marginal P(" << sample
<< " | weight = " << weights << ", coordinate = " << v0
<< ") = " << (pli[0][0] + pui[0][0]) / 2.0);
LOG_TRACE(<< "Conditional P(" << sample
<< " | weight = " << weights << ", coordinate = " << v0
<< ") = " << (pli[1][0] + pui[1][0]) / 2.0);
} else {
LOG_ERROR(<< "Failed to compute P(" << sample << " | weight = " << weights
<< ", coordinate = " << v0 << ")");
continue;
}
double pl{std::sqrt(pli[0][0] * pli[1][0])};
double pu{std::sqrt(pui[0][0] * pui[1][0])};
double pi{(pl + pu) / 2.0};
aggregator.add(pi, neff);
if (minProbability.add(pi)) {
tail.assign(1, ti[0]);
conditional = ((pli[1][0] + pui[1][0]) < (pli[0][0] + pui[0][0]));
mostAnomalousCorrelate.assign(1, correlateIndex);
mostAnomalousTime = time[i][v0];
mostAnomalousSample = sample[0][v0];
mostAnomalousCorrelationModel =
conditional ? correlationModel
->univariate(NOTHING_TO_MARGINALIZE,
{{v1, sample[0][v1]}})
.first
: correlationModel
->univariate({v1}, NOTHING_TO_CONDITION)
.first;
}
}
}
double pSingleBucket;
aggregator.calculate(pSingleBucket);
TDouble4Vec probabilities{pSingleBucket};
common::SModelProbabilityResult::TFeatureProbability4Vec featureProbabilities;
featureProbabilities.emplace_back(
common::SModelProbabilityResult::E_SingleBucketProbability, pSingleBucket);
double pOverall{pSingleBucket};
if (m_AnomalyModel != nullptr && params.useAnomalyModel()) {
TDouble2Vec seasonalWeight;
this->seasonalWeight(0.0, mostAnomalousTime, seasonalWeight);
double residual{(mostAnomalousSample - mostAnomalousCorrelationModel->nearestMarginalLikelihoodMean(
mostAnomalousSample)) /
std::max(std::sqrt(seasonalWeight[0]), 1.0)};
m_AnomalyModel->sample(params, mostAnomalousTime, residual, pSingleBucket, pOverall);
double pAnomaly;
std::tie(pOverall, pAnomaly) = m_AnomalyModel->probability(pSingleBucket, pOverall);
probabilities.push_back(pAnomaly);
featureProbabilities.emplace_back(
common::SModelProbabilityResult::E_AnomalyModelProbability, pAnomaly);
}
result.s_Probability = pOverall;
result.s_Conditional = conditional;
result.s_FeatureProbabilities = std::move(featureProbabilities);
result.s_Tail = std::move(tail);
result.s_MostAnomalousCorrelate = std::move(mostAnomalousCorrelate);
return true;
}
void CUnivariateTimeSeriesModel::countWeights(core_t::TTime time,
const TDouble2Vec& value,
double trendCountWeight,
double residualCountWeight,
double outlierWeightDerate,
double countVarianceScale,
TDouble2VecWeightsAry& trendWeights,
TDouble2VecWeightsAry& residuaWeights) const {
if (m_TrendModel->seasonalComponents().empty() == false) {
countVarianceScale = 1.0;
}
TDouble2Vec seasonalWeight;
this->seasonalWeight(0.0, time, seasonalWeight);
double sample{m_TrendModel->detrend(time, value[0], 0.0, m_IsNonNegative)};
auto weights = maths_t::CUnitWeights::UNIT;
maths_t::setCount(std::min(residualCountWeight / trendCountWeight, 1.0), weights);
maths_t::setSeasonalVarianceScale(seasonalWeight[0], weights);
double outlierWeight{outliers::weight(
*m_ResidualModel, weights,
std::max(outlierWeightDerate, m_TrendModel->outlierWeightDerate(time, sample)),
sample)};
double changeWeight{m_TrendModel->countWeight(time)};
trendCountWeight /= countVarianceScale;
residualCountWeight *= changeWeight;
TDouble2Vec trendOutlierWeight{outlierWeight * changeWeight};
TDouble2Vec residualOutlierWeight{outlierWeight};
maths_t::setCount(TDouble2Vec{trendCountWeight}, trendWeights);
maths_t::setCount(TDouble2Vec{residualCountWeight}, residuaWeights);
maths_t::setOutlierWeight(trendOutlierWeight, trendWeights);
maths_t::setOutlierWeight(residualOutlierWeight, residuaWeights);
maths_t::setCountVarianceScale(TDouble2Vec{countVarianceScale}, trendWeights);
maths_t::setCountVarianceScale(TDouble2Vec{countVarianceScale}, residuaWeights);
}
void CUnivariateTimeSeriesModel::addCountWeights(core_t::TTime time,
double trendCountWeight,
double residualCountWeight,
double countVarianceScale,
TDouble2VecWeightsAry& trendWeights,
TDouble2VecWeightsAry& residuaWeights) const {
if (m_TrendModel->seasonalComponents().empty()) {
countVarianceScale = 1.0;
}
residualCountWeight *= m_TrendModel->countWeight(time);
trendCountWeight /= countVarianceScale;
maths_t::addCount(TDouble2Vec{trendCountWeight}, trendWeights);
maths_t::addCount(TDouble2Vec{residualCountWeight}, residuaWeights);
}
void CUnivariateTimeSeriesModel::seasonalWeight(double confidence,
core_t::TTime time,
TDouble2Vec& weight) const {
double scale{m_TrendModel->varianceScaleWeight(
time, m_ResidualModel->marginalLikelihoodVariance(), confidence)(1)};
weight.assign(1, std::max(scale, this->params().minimumSeasonalVarianceScale()));
}
std::uint64_t CUnivariateTimeSeriesModel::checksum(std::uint64_t seed) const {
seed = common::CChecksum::calculate(seed, m_IsNonNegative);
seed = common::CChecksum::calculate(seed, m_Controllers);
seed = common::CChecksum::calculate(seed, m_TrendModel);
seed = common::CChecksum::calculate(seed, m_ResidualModel);
seed = common::CChecksum::calculate(seed, m_MultibucketFeature);
seed = common::CChecksum::calculate(seed, m_MultibucketFeatureModel);
seed = common::CChecksum::calculate(seed, m_AnomalyModel);
return common::CChecksum::calculate(seed, m_Correlations != nullptr);
}
void CUnivariateTimeSeriesModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CUnivariateTimeSeriesModel");
core::memory_debug::dynamicSize("m_Controllers", m_Controllers, mem);
core::memory_debug::dynamicSize("m_TrendModel", m_TrendModel, mem);
core::memory_debug::dynamicSize("m_ResidualModel", m_ResidualModel, mem);
core::memory_debug::dynamicSize("m_MultibucketFeature", m_MultibucketFeature, mem);
core::memory_debug::dynamicSize("m_MultibucketFeatureModel",
m_MultibucketFeatureModel, mem);
core::memory_debug::dynamicSize("m_AnomalyModel", m_AnomalyModel, mem);
}
std::size_t CUnivariateTimeSeriesModel::memoryUsage() const {
return core::memory::dynamicSize(m_Controllers) +
core::memory::dynamicSize(m_TrendModel) +
core::memory::dynamicSize(m_ResidualModel) +
core::memory::dynamicSize(m_MultibucketFeature) +
core::memory::dynamicSize(m_MultibucketFeatureModel) +
core::memory::dynamicSize(m_AnomalyModel);
}
bool CUnivariateTimeSeriesModel::acceptRestoreTraverser(const common::SModelRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
if (traverser.name() == VERSION_7_11_TAG) {
while (traverser.next()) {
const std::string& name{traverser.name()};
RESTORE_BUILT_IN(ID_6_3_TAG, m_Id)
RESTORE_BOOL(IS_NON_NEGATIVE_6_3_TAG, m_IsNonNegative)
RESTORE_BOOL(IS_FORECASTABLE_6_3_TAG, m_IsForecastable)
RESTORE_SETUP_TEARDOWN(
CONTROLLER_6_3_TAG,
m_Controllers = std::make_unique<TDecayRateController2Ary>(),
core::CPersistUtils::restore(CONTROLLER_6_3_TAG, *m_Controllers, traverser),
/**/)
RESTORE(TREND_MODEL_6_3_TAG,
traverser.traverseSubLevel(
[&, serialiser = CTimeSeriesDecompositionStateSerialiser{}](auto& traverser_) {
return serialiser(params.s_DecompositionParams, m_TrendModel, traverser_);
}))
RESTORE(RESIDUAL_MODEL_6_3_TAG,
traverser.traverseSubLevel(
[&, serialiser = common::CPriorStateSerialiser{} ](auto& traverser_) {
return serialiser(params.s_DistributionParams,
m_ResidualModel, traverser_);
}))
RESTORE(MULTIBUCKET_FEATURE_6_3_TAG, traverser.traverseSubLevel([
this, serialiser = CTimeSeriesMultibucketFeatureSerialiser{}
](auto& traverser_) {
return serialiser(m_MultibucketFeature, traverser_);
}))
RESTORE(MULTIBUCKET_FEATURE_MODEL_6_3_TAG,
traverser.traverseSubLevel(
[&, serialiser = common::CPriorStateSerialiser{} ](auto& traverser_) {
return serialiser(params.s_DistributionParams,
m_MultibucketFeatureModel, traverser_);
}))
RESTORE_SETUP_TEARDOWN(
ANOMALY_MODEL_6_3_TAG,
m_AnomalyModel = std::make_unique<CTimeSeriesAnomalyModel>(),
traverser.traverseSubLevel([&](auto& traverser_) {
return m_AnomalyModel->acceptRestoreTraverser(params, traverser_);
}),
/**/)
}
} else {
LOG_ERROR(<< "Unsupported version '" << traverser.name() << "'");
return false;
}
this->checkRestoredInvariants();
return true;
}
void CUnivariateTimeSeriesModel::checkRestoredInvariants() const {
VIOLATES_INVARIANT_NO_EVALUATION(m_TrendModel, ==, nullptr);
VIOLATES_INVARIANT_NO_EVALUATION(m_ResidualModel, ==, nullptr);
}
void CUnivariateTimeSeriesModel::persistModelsState(core::CStatePersistInserter& inserter) const {
if (m_TrendModel != nullptr) {
inserter.insertLevel(TREND_MODEL_6_3_TAG, [
this, serialiser = CTimeSeriesDecompositionStateSerialiser{}
](auto& inserter_) { serialiser(*m_TrendModel, inserter_); });
}
if (m_ResidualModel != nullptr) {
inserter.insertLevel(RESIDUAL_MODEL_6_3_TAG, [
this, serialiser = common::CPriorStateSerialiser{}
](auto& inserter_) { serialiser(*m_ResidualModel, inserter_); });
}
}
void CUnivariateTimeSeriesModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
// Note that we don't persist this->params() or the correlations
// because that state is reinitialized.
inserter.insertValue(VERSION_7_11_TAG, "");
inserter.insertValue(ID_6_3_TAG, m_Id);
inserter.insertValue(IS_NON_NEGATIVE_6_3_TAG, static_cast<int>(m_IsNonNegative));
inserter.insertValue(IS_FORECASTABLE_6_3_TAG, static_cast<int>(m_IsForecastable));
if (m_Controllers != nullptr) {
core::CPersistUtils::persist(CONTROLLER_6_3_TAG, *m_Controllers, inserter);
}
if (m_TrendModel != nullptr) {
inserter.insertLevel(TREND_MODEL_6_3_TAG, [
this, serialiser = CTimeSeriesDecompositionStateSerialiser{}
](auto& inserter_) { serialiser(*m_TrendModel, inserter_); });
}
if (m_ResidualModel != nullptr) {
inserter.insertLevel(RESIDUAL_MODEL_6_3_TAG, [
this, serialiser = common::CPriorStateSerialiser{}
](auto& inserter_) { serialiser(*m_ResidualModel, inserter_); });
}
if (m_MultibucketFeature != nullptr) {
inserter.insertLevel(MULTIBUCKET_FEATURE_6_3_TAG, [
this, serialiser = CTimeSeriesMultibucketFeatureSerialiser{}
](auto& inserter_) { serialiser(m_MultibucketFeature, inserter_); });
}
if (m_MultibucketFeatureModel != nullptr) {
inserter.insertLevel(MULTIBUCKET_FEATURE_MODEL_6_3_TAG, [
this, serialiser = common::CPriorStateSerialiser{}
](auto& inserter_) { serialiser(*m_MultibucketFeatureModel, inserter_); });
}
if (m_AnomalyModel != nullptr) {
inserter.insertLevel(ANOMALY_MODEL_6_3_TAG, [this](auto& inserter_) {
m_AnomalyModel->acceptPersistInserter(inserter_);
});
}
}
maths_t::EDataType CUnivariateTimeSeriesModel::dataType() const {
return m_ResidualModel->dataType();
}
CUnivariateTimeSeriesModel::TDoubleWeightsAry
CUnivariateTimeSeriesModel::unpack(const TDouble2VecWeightsAry& weights) {
TDoubleWeightsAry result{maths_t::CUnitWeights::UNIT};
for (std::size_t i = 0; i < weights.size(); ++i) {
result[i] = weights[i][0];
}
return result;
}
const CTimeSeriesDecompositionInterface& CUnivariateTimeSeriesModel::trendModel() const {
return *m_TrendModel;
}
const common::CPrior& CUnivariateTimeSeriesModel::residualModel() const {
return *m_ResidualModel;
}
const CUnivariateTimeSeriesModel::TDecayRateController2Ary*
CUnivariateTimeSeriesModel::decayRateControllers() const {
return m_Controllers.get();
}
CUnivariateTimeSeriesModel::CUnivariateTimeSeriesModel(const CUnivariateTimeSeriesModel& other,
std::size_t id,
bool isForForecast)
: common::CModel(other.params()), m_Id(id),
m_IsNonNegative(other.m_IsNonNegative), m_IsForecastable(other.m_IsForecastable),
m_TrendModel(other.m_TrendModel->clone()),
m_ResidualModel(other.m_ResidualModel->clone()),
m_MultibucketFeature(!isForForecast && other.m_MultibucketFeature
? other.m_MultibucketFeature->clone()
: nullptr),
m_MultibucketFeatureModel(!isForForecast && other.m_MultibucketFeatureModel != nullptr
? other.m_MultibucketFeatureModel->clone()
: nullptr),
m_AnomalyModel(!isForForecast && other.m_AnomalyModel != nullptr
? std::make_unique<CTimeSeriesAnomalyModel>(*other.m_AnomalyModel)
: nullptr) {
if (!isForForecast && other.m_Controllers != nullptr) {
m_Controllers = std::make_unique<TDecayRateController2Ary>(*other.m_Controllers);
}
}
CUnivariateTimeSeriesModel::EUpdateResult
CUnivariateTimeSeriesModel::updateTrend(const common::CModelAddSamplesParams& params,
const TTimeDouble2VecSizeTrVec& samples) {
for (const auto& sample : samples) {
if (sample.second.size() != 1) {
LOG_ERROR(<< "Dimension mismatch: '" << sample.second.size() << " != 1'");
return E_Failure;
}
}
// Time order is not a total order, for example if the data are polled
// the times of all samples will be the same. So break ties using the
// sample value.
TSize1Vec timeorder(samples.size());
std::iota(timeorder.begin(), timeorder.end(), 0);
std::stable_sort(timeorder.begin(), timeorder.end(),
[&samples](std::size_t lhs, std::size_t rhs) {
return common::COrderings::lexicographicalCompare(
samples[lhs].first, samples[lhs].second,
samples[rhs].first, samples[rhs].second);
});
// Do the update.
TFloatMeanAccumulatorVec window;
EUpdateResult result{E_Success};
auto componentChangeCallback = [&window, &result](TFloatMeanAccumulatorVec window_) {
if (window_.empty() == false) {
window = std::move(window_);
}
result = E_Reset;
};
const auto& weights = params.trendWeights();
const auto& modelAnnotationCallback = params.annotationCallback();
double occupancy{params.bucketOccupancy()};
core_t::TTime firstValueTime{params.firstValueTime()};
for (auto i : timeorder) {
core_t::TTime time{samples[i].first};
double value{samples[i].second[0]};
TDoubleWeightsAry weight{unpack(weights[i])};
m_TrendModel->addPoint(time, value, params.memoryCircuitBreaker(), weight,
componentChangeCallback, modelAnnotationCallback,
occupancy, firstValueTime);
}
if (result == E_Reset) {
this->reinitializeStateGivenNewComponent(params, std::move(window));
}
return result;
}
CUnivariateTimeSeriesModel::TTimeDouble2VecSizeTrVecDoublePr
CUnivariateTimeSeriesModel::updateResidualModels(const common::CModelAddSamplesParams& params,
TTimeDouble2VecSizeTrVec samples) {
for (auto& residual : samples) {
residual.second[0] = m_TrendModel->detrend(
residual.first, residual.second[0], 0.0, m_IsNonNegative);
}
// We add the samples in value order since it makes clustering more stable.
TSize1Vec valueorder(samples.size());
std::iota(valueorder.begin(), valueorder.end(), 0);
std::stable_sort(valueorder.begin(), valueorder.end(),
[&](std::size_t lhs, std::size_t rhs) {
return samples[lhs].second < samples[rhs].second;
});
TDouble1Vec residuals;
maths_t::TDoubleWeightsAry1Vec weights;
TMeanAccumulator averageTimeAccumulator;
weights.reserve(samples.size());
residuals.reserve(samples.size());
for (auto i : valueorder) {
core_t::TTime time{samples[i].first};
auto weight = unpack(params.priorWeights()[i]);
residuals.push_back(samples[i].second[0]);
weights.push_back(weight);
averageTimeAccumulator.add(static_cast<double>(time),
maths_t::countForUpdate(weight));
}
core_t::TTime averageTime{static_cast<core_t::TTime>(
common::CBasicStatistics::mean(averageTimeAccumulator))};
m_ResidualModel->addSamples(residuals, weights);
m_ResidualModel->propagateForwardsByTime(params.propagationInterval());
if (m_MultibucketFeatureModel != nullptr) {
TDouble2Vec seasonalWeight;
for (std::size_t i = 0; i < valueorder.size(); ++i) {
core_t::TTime time{samples[i].first};
this->seasonalWeight(0.0, time, seasonalWeight);
maths_t::setSeasonalVarianceScale(seasonalWeight[0], weights[i]);
}
m_MultibucketFeature->add(averageTime, this->params().bucketLength(),
residuals, weights);
const auto & [ feature, featureWeight ] = m_MultibucketFeature->value();
if (feature.empty() == false) {
m_MultibucketFeatureModel->addSamples(feature, featureWeight);
m_MultibucketFeatureModel->propagateForwardsByTime(params.propagationInterval());
}
}
double decayRateMultiplier{this->updateDecayRates(params, averageTime, residuals)};
return {std::move(samples), decayRateMultiplier};
}
double CUnivariateTimeSeriesModel::updateDecayRates(const common::CModelAddSamplesParams& params,
core_t::TTime time,
const TDouble1Vec& samples) {
double multiplier{1.0};
if (m_Controllers == nullptr) {
return multiplier;
}
TDouble1VecVec errors[2];
errors[0].reserve(samples.size());
errors[1].reserve(samples.size());
for (auto sample : samples) {
this->appendPredictionErrors(params.propagationInterval(), sample, errors);
}
{
CDecayRateController& controller{(*m_Controllers)[E_TrendControl]};
TDouble1Vec trendMean{m_TrendModel->meanValue(time)};
multiplier = controller.multiplier(
trendMean, errors[E_TrendControl], this->params().bucketLength(),
this->params().learnRate(), this->params().decayRate());
if (multiplier != 1.0) {
m_TrendModel->decayRate(multiplier * m_TrendModel->decayRate());
LOG_TRACE(<< "trend decay rate = " << m_TrendModel->decayRate());
}
}
{
CDecayRateController& controller{(*m_Controllers)[E_ResidualControl]};
TDouble1Vec residualMean{m_ResidualModel->marginalLikelihoodMean()};
multiplier = controller.multiplier(
residualMean, errors[E_ResidualControl], this->params().bucketLength(),
this->params().learnRate(), this->params().decayRate());
if (multiplier != 1.0) {
double decayRate{multiplier * m_ResidualModel->decayRate()};
m_ResidualModel->decayRate(decayRate);
if (m_MultibucketFeatureModel != nullptr) {
m_MultibucketFeatureModel->decayRate(decayRate);
}
LOG_TRACE(<< "prior decay rate = " << decayRate);
}
}
return multiplier;
}
void CUnivariateTimeSeriesModel::appendPredictionErrors(double interval,
double sample_,
TDouble1VecVec (&result)[2]) {
using TDecompositionPtr1Vec = core::CSmallVector<TDecompositionPtr, 1>;
TDouble1Vec sample{sample_};
TDecompositionPtr1Vec trend{m_TrendModel};
if (auto error = predictionError(interval, m_ResidualModel, sample)) {
result[E_ResidualControl].push_back(*error);
}
if (auto error = predictionError(trend, sample)) {
result[E_TrendControl].push_back(*error);
}
}
void CUnivariateTimeSeriesModel::reinitializeStateGivenNewComponent(
const common::CModelAddSamplesParams& params,
TFloatMeanAccumulatorVec residuals) {
if (m_Controllers != nullptr) {
m_ResidualModel->decayRate(m_ResidualModel->decayRate() /
(*m_Controllers)[E_ResidualControl].multiplier());
m_TrendModel->decayRate(m_TrendModel->decayRate() /
(*m_Controllers)[E_TrendControl].multiplier());
for (auto& controller : *m_Controllers) {
controller.reset();
}
}
// We can't properly handle periodicity in the variance of the rate if
// using a Poisson process so remove it from model selection if we detect
// seasonality.
m_ResidualModel->removeModels(
common::CPrior::CModelFilter().remove(common::CPrior::E_Poisson));
m_ResidualModel->setToNonInformative(0.0, m_ResidualModel->decayRate());
// Reinitialize the residual model with any values we've been given. Note
// that if we have sparse data we reduce the sample weights so we smoothly
// transition to modelling only non-empty values. This must be reflected
// in the sample weights when reinitialising the residual model.
if (residuals.empty() == false) {
double emptyBucketWeight{CModel::emptyBucketWeight(params.bucketOccupancy())};
maths_t::TDoubleWeightsAry1Vec weights(1);
double buckets{std::accumulate(residuals.begin(), residuals.end(), 0.0,
[](auto partialBuckets, const auto& residual) {
return partialBuckets +
common::CBasicStatistics::count(residual);
}) /
this->params().learnRate()};
double time{emptyBucketWeight * buckets / static_cast<double>(residuals.size())};
for (const auto& residual : residuals) {
double weight{emptyBucketWeight * common::CBasicStatistics::count(residual)};
if (weight > 0.0) {
weights[0] = maths_t::countWeight(weight);
m_ResidualModel->addSamples({common::CBasicStatistics::mean(residual)}, weights);
m_ResidualModel->propagateForwardsByTime(time);
}
}
}
// Reset the multi-bucket residual model. We can't reinitialize this from
// the initial values because they are not typically at the granularity of
// the job's bucket length.
if (m_MultibucketFeature != nullptr) {
m_MultibucketFeature->clear();
}
if (m_MultibucketFeatureModel != nullptr) {
m_MultibucketFeatureModel->removeModels(
common::CPrior::CModelFilter().remove(common::CPrior::E_Poisson));
m_MultibucketFeatureModel->setToNonInformative(0.0, m_ResidualModel->decayRate());
}
if (m_Correlations != nullptr) {
m_Correlations->clearCorrelationModels(m_Id);
}
if (m_AnomalyModel != nullptr) {
m_AnomalyModel->reset();
}
}
bool CUnivariateTimeSeriesModel::correlationModels(TSize1Vec& correlated,
TSize2Vec1Vec& variables,
TMultivariatePriorCPtrSizePr1Vec& correlationModels,
TModelCPtr1Vec& correlatedTimeSeriesModels) const {
if (m_Correlations != nullptr) {
correlated = m_Correlations->correlated(m_Id);
m_Correlations->correlationModels(m_Id, correlated, variables, correlationModels,
correlatedTimeSeriesModels);
}
return correlated.empty() == false;
}
CTimeSeriesCorrelations::CTimeSeriesCorrelations(double minimumSignificantCorrelation,
double decayRate)
: m_MinimumSignificantCorrelation(minimumSignificantCorrelation),
m_Correlations(MAXIMUM_CORRELATIONS, decayRate) {
}
CTimeSeriesCorrelations::CTimeSeriesCorrelations(const CTimeSeriesCorrelations& other,
bool isForPersistence)
: m_MinimumSignificantCorrelation(other.m_MinimumSignificantCorrelation),
m_SampleData(other.m_SampleData), m_Correlations(other.m_Correlations),
m_CorrelatedLookup(other.m_CorrelatedLookup),
m_TimeSeriesModels(isForPersistence ? TModelCPtrVec() : other.m_TimeSeriesModels) {
for (const auto& model : other.m_CorrelationDistributionModels) {
m_CorrelationDistributionModels.emplace(
model.first,
std::make_pair(TMultivariatePriorPtr(model.second.first->clone()),
model.second.second));
}
}
CTimeSeriesCorrelations::~CTimeSeriesCorrelations() = default;
CTimeSeriesCorrelations* CTimeSeriesCorrelations::clone() const {
return new CTimeSeriesCorrelations(*this);
}
CTimeSeriesCorrelations* CTimeSeriesCorrelations::cloneForPersistence() const {
return new CTimeSeriesCorrelations(*this, true);
}
void CTimeSeriesCorrelations::processSamples() {
using TSizeSizePrMultivariatePriorPtrDoublePrUMapCItrVec =
std::vector<TSizeSizePrMultivariatePriorPtrDoublePrUMap::const_iterator>;
// The priors use a shared pseudo random number generator which
// generates a fixed sequence of random numbers. Since the order
// of the correlated priors map can change across persist and
// restore we can effectively get a different sequence of random
// numbers depending on whether we persist and restore or not.
// We'd like results to be as independent as possible from the
// action of persisting and restoring so sort the collection to
// preserve the random number sequence.
TSizeSizePrMultivariatePriorPtrDoublePrUMapCItrVec iterators;
iterators.reserve(m_CorrelationDistributionModels.size());
for (auto i = m_CorrelationDistributionModels.begin();
i != m_CorrelationDistributionModels.end(); ++i) {
iterators.push_back(i);
}
std::sort(iterators.begin(), iterators.end(),
core::CFunctional::SDereference<common::COrderings::SFirstLess>());
TDouble10Vec1Vec multivariateSamples;
maths_t::TDouble10VecWeightsAry1Vec multivariateWeights;
for (auto i : iterators) {
std::size_t pid1{i->first.first};
std::size_t pid2{i->first.second};
auto i1 = m_SampleData.find(pid1);
auto i2 = m_SampleData.find(pid2);
if (i1 == m_SampleData.end() || i2 == m_SampleData.end()) {
continue;
}
const TMultivariatePriorPtr& prior{i->second.first};
SSampleData* samples1{&i1->second};
SSampleData* samples2{&i2->second};
std::size_t n1{samples1->s_Times.size()};
std::size_t n2{samples2->s_Times.size()};
std::size_t indices[]{0, 1};
if (n1 < n2) {
std::swap(samples1, samples2);
std::swap(n1, n2);
std::swap(indices[0], indices[1]);
}
multivariateSamples.assign(n1, TDouble10Vec(2));
multivariateWeights.assign(n1, maths_t::CUnitWeights::unit<TDouble10Vec>(2));
TSize1Vec& tags2{samples2->s_Tags};
TTime1Vec& times2{samples2->s_Times};
common::COrderings::simultaneousSort(tags2, times2, samples2->s_Samples,
samples2->s_Weights);
for (auto j = tags2.begin(); j != tags2.end(); /**/) {
auto k = std::upper_bound(j, tags2.end(), *j);
std::size_t a = j - tags2.begin();
std::size_t b = k - tags2.begin();
common::COrderings::simultaneousSort(
core::make_range(times2, a, b),
core::make_range(samples2->s_Samples, a, b),
core::make_range(samples2->s_Weights, a, b));
j = k;
}
for (std::size_t j1 = 0; j1 < n1; ++j1) {
std::size_t j2{0};
if (n2 > 1) {
std::size_t tag{samples1->s_Tags[j1]};
core_t::TTime time{samples1->s_Times[j1]};
std::size_t a_ = std::lower_bound(tags2.begin(), tags2.end(), tag) -
tags2.begin();
std::size_t b_ = std::upper_bound(tags2.begin(), tags2.end(), tag) -
tags2.begin();
std::size_t b{common::CTools::truncate(
static_cast<std::size_t>(
std::lower_bound(times2.begin() + a_, times2.begin() + b_, time) -
times2.begin()),
std::size_t(1), n2 - 1)};
std::size_t a{b - 1};
j2 = std::abs(times2[a] - time) < std::abs(times2[b] - time) ? a : b;
}
multivariateSamples[j1][indices[0]] = samples1->s_Samples[j1];
multivariateSamples[j1][indices[1]] = samples2->s_Samples[j2];
for (std::size_t w = 0; w < maths_t::NUMBER_WEIGHT_STYLES; ++w) {
multivariateWeights[j1][w][indices[0]] = samples1->s_Weights[j1][w];
multivariateWeights[j1][w][indices[1]] = samples2->s_Weights[j2][w];
}
}
LOG_TRACE(<< "correlate samples = " << multivariateSamples
<< ", correlate weights = " << multivariateWeights);
prior->dataType(samples1->s_Type == maths_t::E_IntegerData ||
samples2->s_Type == maths_t::E_IntegerData
? maths_t::E_IntegerData
: maths_t::E_ContinuousData);
prior->addSamples(multivariateSamples, multivariateWeights);
prior->propagateForwardsByTime(std::min(samples1->s_Interval, samples2->s_Interval));
prior->decayRate(std::sqrt(samples1->s_Multiplier * samples2->s_Multiplier) *
prior->decayRate());
LOG_TRACE(<< "correlation prior:" << core_t::LINE_ENDING << prior->print());
LOG_TRACE(<< "decayRate = " << prior->decayRate());
}
m_Correlations.capture();
m_SampleData.clear();
}
void CTimeSeriesCorrelations::refresh(const CTimeSeriesCorrelateModelAllocator& allocator) {
using TSizeSizePrVec = std::vector<TSizeSizePr>;
if (m_Correlations.changed()) {
TSizeSizePrVec correlated;
TDoubleVec correlationCoeffs;
m_Correlations.mostCorrelated(
static_cast<std::size_t>(
1.2 * static_cast<double>(allocator.maxNumberCorrelations())),
correlated, &correlationCoeffs);
LOG_TRACE(<< "correlated = " << correlated);
LOG_TRACE(<< "correlationCoeffs = " << correlationCoeffs);
std::ptrdiff_t cutoff{
std::upper_bound(correlationCoeffs.begin(), correlationCoeffs.end(),
0.5 * m_MinimumSignificantCorrelation,
[](double lhs, double rhs) {
return std::fabs(lhs) > std::fabs(rhs);
}) -
correlationCoeffs.begin()};
LOG_TRACE(<< "cutoff = " << cutoff);
correlated.erase(correlated.begin() + cutoff, correlated.end());
if (correlated.empty()) {
m_CorrelationDistributionModels.clear();
this->refreshLookup();
return;
}
// Extract the correlated pairs which are and aren't already
// being modeled.
TSizeSizePrVec present;
TSizeVec presentRank;
TSizeSizePrVec missing;
TSizeVec missingRank;
std::size_t np{static_cast<std::size_t>(
std::max(0.9 * static_cast<double>(correlated.size()), 1.0))};
std::size_t nm{static_cast<std::size_t>(
std::max(0.1 * static_cast<double>(correlated.size()), 1.0))};
present.reserve(np);
presentRank.reserve(np);
missing.reserve(nm);
missingRank.reserve(nm);
for (std::size_t j = 0; j < correlated.size(); ++j) {
bool isPresent{m_CorrelationDistributionModels.count(correlated[j]) > 0};
(isPresent ? present : missing).push_back(correlated[j]);
(isPresent ? presentRank : missingRank).push_back(j);
}
// Remove any weakly correlated models.
std::size_t initial{m_CorrelationDistributionModels.size()};
common::COrderings::simultaneousSort(present, presentRank);
for (auto i = m_CorrelationDistributionModels.begin();
i != m_CorrelationDistributionModels.end();
/**/) {
std::size_t j = std::lower_bound(present.begin(), present.end(), i->first) -
present.begin();
if (j == present.size() || i->first != present[j]) {
i = m_CorrelationDistributionModels.erase(i);
} else {
i->second.second = correlationCoeffs[presentRank[j]];
++i;
}
}
// Remove the remaining most weakly correlated models subject
// to the capacity constraint.
common::COrderings::simultaneousSortWith(std::greater<>(), presentRank, present);
for (std::size_t i = 0; m_CorrelationDistributionModels.size() >
allocator.maxNumberCorrelations();
++i) {
m_CorrelationDistributionModels.erase(present[i]);
}
if (allocator.areAllocationsAllowed()) {
for (std::size_t i = 0, nextChunk =
std::min(allocator.maxNumberCorrelations(),
initial + allocator.chunkSize());
m_CorrelationDistributionModels.size() < allocator.maxNumberCorrelations() &&
i < missing.size() &&
(m_CorrelationDistributionModels.size() <= initial ||
!allocator.exceedsLimit(m_CorrelationDistributionModels.size()));
nextChunk = std::min(allocator.maxNumberCorrelations(),
nextChunk + allocator.chunkSize())) {
for (/**/; i < missing.size() &&
m_CorrelationDistributionModels.size() < nextChunk;
++i) {
m_CorrelationDistributionModels.emplace(
missing[i], TMultivariatePriorPtrDoublePr{
allocator.newPrior(),
correlationCoeffs[missingRank[i]]});
}
}
}
this->refreshLookup();
}
}
const CTimeSeriesCorrelations::TSizeSizePrMultivariatePriorPtrDoublePrUMap&
CTimeSeriesCorrelations::correlationModels() const {
return m_CorrelationDistributionModels;
}
void CTimeSeriesCorrelations::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CTimeSeriesCorrelations");
core::memory_debug::dynamicSize("m_SampleData", m_SampleData, mem);
core::memory_debug::dynamicSize("m_Correlations", m_Correlations, mem);
core::memory_debug::dynamicSize("m_CorrelatedLookup", m_CorrelatedLookup, mem);
core::memory_debug::dynamicSize("m_CorrelationDistributionModels",
m_CorrelationDistributionModels, mem);
}
std::size_t CTimeSeriesCorrelations::memoryUsage() const {
return core::memory::dynamicSize(m_SampleData) +
core::memory::dynamicSize(m_Correlations) +
core::memory::dynamicSize(m_CorrelatedLookup) +
core::memory::dynamicSize(m_CorrelationDistributionModels);
}
bool CTimeSeriesCorrelations::acceptRestoreTraverser(const common::SDistributionRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
do {
const std::string& name{traverser.name()};
RESTORE(K_MOST_CORRELATED_TAG, traverser.traverseSubLevel([this](auto& traverser_) {
return m_Correlations.acceptRestoreTraverser(traverser_);
}))
RESTORE_WITH_UTILS(CORRELATED_LOOKUP_TAG, m_CorrelatedLookup)
RESTORE(CORRELATION_MODELS_TAG, traverser.traverseSubLevel([&](auto& traverser_) {
return this->restoreCorrelationModels(params, traverser_);
}))
} while (traverser.next());
return true;
}
void CTimeSeriesCorrelations::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
// Note we don't persist the minimum significant correlation or the
// models because that state is reinitialized. The sample is only
// maintained transitively during an update at the end of a bucket
// and so always empty at the point persistence occurs.
inserter.insertLevel(K_MOST_CORRELATED_TAG, [this](auto& inserter_) {
m_Correlations.acceptPersistInserter(inserter_);
});
core::CPersistUtils::persist(CORRELATED_LOOKUP_TAG, m_CorrelatedLookup, inserter);
inserter.insertLevel(CORRELATION_MODELS_TAG, [this](auto& inserter_) {
this->persistCorrelationModels(inserter_);
});
}
bool CTimeSeriesCorrelations::restoreCorrelationModels(const common::SDistributionRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
do {
const std::string& name{traverser.name()};
RESTORE_SETUP_TEARDOWN(CORRELATION_MODEL_TAG,
TSizeSizePrMultivariatePriorPtrDoublePrPr prior,
traverser.traverseSubLevel([&](auto& traverser_) {
return restore(params, prior, traverser_);
}),
m_CorrelationDistributionModels.insert(std::move(prior)))
} while (traverser.next());
return true;
}
void CTimeSeriesCorrelations::persistCorrelationModels(core::CStatePersistInserter& inserter) const {
using TSizeSizePrMultivariatePriorPtrDoublePrUMapCItrVec =
std::vector<TSizeSizePrMultivariatePriorPtrDoublePrUMap::const_iterator>;
TSizeSizePrMultivariatePriorPtrDoublePrUMapCItrVec ordered;
ordered.reserve(m_CorrelationDistributionModels.size());
for (auto prior = m_CorrelationDistributionModels.begin();
prior != m_CorrelationDistributionModels.end(); ++prior) {
ordered.push_back(prior);
}
std::sort(ordered.begin(), ordered.end(),
core::CFunctional::SDereference<common::COrderings::SFirstLess>());
for (auto prior : ordered) {
inserter.insertLevel(CORRELATION_MODEL_TAG, [&](auto& inserter_) {
return persist(*prior, inserter_);
});
}
}
bool CTimeSeriesCorrelations::restore(const common::SDistributionRestoreParams& params,
TSizeSizePrMultivariatePriorPtrDoublePrPr& model,
core::CStateRestoreTraverser& traverser) {
do {
const std::string& name{traverser.name()};
RESTORE_BUILT_IN(FIRST_CORRELATE_ID_TAG, model.first.first)
RESTORE_BUILT_IN(SECOND_CORRELATE_ID_TAG, model.first.second)
RESTORE(CORRELATION_MODEL_TAG,
traverser.traverseSubLevel(
[&, serialiser = common::CPriorStateSerialiser{} ](auto& traverser_) {
return serialiser(params, model.second.first, traverser_);
}))
RESTORE_BUILT_IN(CORRELATION_TAG, model.second.second)
} while (traverser.next());
return true;
}
void CTimeSeriesCorrelations::persist(const TConstSizeSizePrMultivariatePriorPtrDoublePrPr& model,
core::CStatePersistInserter& inserter) {
inserter.insertValue(FIRST_CORRELATE_ID_TAG, model.first.first);
inserter.insertValue(SECOND_CORRELATE_ID_TAG, model.first.second);
inserter.insertLevel(CORRELATION_MODEL_TAG, [
&model, serialiser = common::CPriorStateSerialiser{}
](auto& inserter_) { serialiser(*model.second.first, inserter_); });
inserter.insertValue(CORRELATION_TAG, model.second.second, core::CIEEE754::E_SinglePrecision);
}
void CTimeSeriesCorrelations::addTimeSeries(std::size_t id,
const CUnivariateTimeSeriesModel& model) {
m_Correlations.addVariables(id + 1);
core::CAllocationStrategy::resize(m_TimeSeriesModels,
std::max(id + 1, m_TimeSeriesModels.size()));
m_TimeSeriesModels[id] = &model;
}
void CTimeSeriesCorrelations::removeTimeSeries(std::size_t id) {
this->clearCorrelationModels(id);
m_TimeSeriesModels[id] = nullptr;
}
void CTimeSeriesCorrelations::clearCorrelationModels(std::size_t id) {
auto correlated_ = m_CorrelatedLookup.find(id);
if (correlated_ != m_CorrelatedLookup.end()) {
TSize1Vec& correlated{correlated_->second};
for (const auto& correlate : correlated) {
m_CorrelationDistributionModels.erase({id, correlate});
m_CorrelationDistributionModels.erase({correlate, id});
}
this->refreshLookup();
}
m_Correlations.removeVariables({id});
}
void CTimeSeriesCorrelations::addSamples(std::size_t id,
const common::CModelAddSamplesParams& params,
const TTimeDouble2VecSizeTrVec& samples,
double multiplier) {
SSampleData& data{m_SampleData[id]};
data.s_Type = params.type();
data.s_Times.reserve(samples.size());
data.s_Samples.reserve(samples.size());
data.s_Tags.reserve(samples.size());
for (std::size_t i = 0; i < samples.size(); ++i) {
data.s_Times.push_back(samples[i].first);
data.s_Samples.push_back(samples[i].second[0]);
data.s_Tags.push_back(samples[i].third);
data.s_Weights.push_back(
CUnivariateTimeSeriesModel::unpack(params.priorWeights()[i]));
}
data.s_Interval = params.propagationInterval();
data.s_Multiplier = multiplier;
m_Correlations.add(id, common::CBasicStatistics::median(data.s_Samples));
}
CTimeSeriesCorrelations::TSize1Vec CTimeSeriesCorrelations::correlated(std::size_t id) const {
auto correlated = m_CorrelatedLookup.find(id);
return correlated != m_CorrelatedLookup.end() ? correlated->second : TSize1Vec();
}
bool CTimeSeriesCorrelations::correlationModels(std::size_t id,
TSize1Vec& correlated,
TSize2Vec1Vec& variables,
TMultivariatePriorCPtrSizePr1Vec& correlationModels,
TModelCPtr1Vec& correlatedTimeSeriesModels) const {
variables.clear();
correlationModels.clear();
correlatedTimeSeriesModels.clear();
if (correlated.empty()) {
return false;
}
variables.reserve(correlated.size());
correlationModels.reserve(correlated.size());
correlatedTimeSeriesModels.reserve(correlated.size());
std::size_t end{0};
for (auto correlate : correlated) {
auto i = m_CorrelationDistributionModels.find({id, correlate});
TSize2Vec variable{0, 1};
if (i == m_CorrelationDistributionModels.end()) {
i = m_CorrelationDistributionModels.find({correlate, id});
std::swap(variable[0], variable[1]);
}
if (i == m_CorrelationDistributionModels.end()) {
LOG_ERROR(<< "Unexpectedly missing prior for correlation (" << id
<< "," << correlate << ")");
continue;
}
if (std::fabs(i->second.second) < m_MinimumSignificantCorrelation) {
LOG_TRACE(<< "Correlation " << i->second.second << " is too small to model");
continue;
}
if (i->second.first->numberSamples() < MINIMUM_CORRELATE_PRIOR_SAMPLE_COUNT) {
LOG_TRACE(<< "Too few samples in correlate model");
continue;
}
correlated[end] = correlate;
correlationModels.emplace_back(i->second.first.get(), variable[0]);
variables.push_back(std::move(variable));
++end;
}
correlated.resize(variables.size());
for (auto correlate : correlated) {
correlatedTimeSeriesModels.push_back(m_TimeSeriesModels[correlate]);
}
return correlationModels.empty() == false;
}
void CTimeSeriesCorrelations::refreshLookup() {
m_CorrelatedLookup.clear();
for (const auto& prior : m_CorrelationDistributionModels) {
std::size_t x0{prior.first.first};
std::size_t x1{prior.first.second};
m_CorrelatedLookup[x0].push_back(x1);
m_CorrelatedLookup[x1].push_back(x0);
}
for (auto& prior : m_CorrelatedLookup) {
std::sort(prior.second.begin(), prior.second.end());
}
}
CMultivariateTimeSeriesModel::CMultivariateTimeSeriesModel(
const common::CModelParams& params,
const CTimeSeriesDecompositionInterface& trend,
const common::CMultivariatePrior& residualModel,
const TDecayRateController2Ary* controllers,
const TMultibucketFeature* multibucketFeature,
bool modelAnomalies)
: common::CModel(params), m_ResidualModel(residualModel.clone()),
m_MultibucketFeature(multibucketFeature != nullptr ? multibucketFeature->clone()
: nullptr),
m_MultibucketFeatureModel(multibucketFeature != nullptr ? residualModel.clone() : nullptr),
m_AnomalyModel(modelAnomalies ? std::make_unique<CTimeSeriesAnomalyModel>(
params.bucketLength(),
params.decayRate())
: nullptr) {
if (controllers != nullptr) {
m_Controllers = std::make_unique<TDecayRateController2Ary>(*controllers);
}
for (std::size_t d = 0; d < this->dimension(); ++d) {
m_TrendModel.emplace_back(trend.clone());
}
}
CMultivariateTimeSeriesModel::CMultivariateTimeSeriesModel(const CMultivariateTimeSeriesModel& other)
: common::CModel(other.params()), m_IsNonNegative(other.m_IsNonNegative),
m_ResidualModel(other.m_ResidualModel->clone()),
m_MultibucketFeature(other.m_MultibucketFeature != nullptr
? other.m_MultibucketFeature->clone()
: nullptr),
m_MultibucketFeatureModel(other.m_MultibucketFeatureModel != nullptr
? other.m_MultibucketFeatureModel->clone()
: nullptr),
m_AnomalyModel(other.m_AnomalyModel != nullptr
? std::make_unique<CTimeSeriesAnomalyModel>(*other.m_AnomalyModel)
: nullptr) {
if (other.m_Controllers != nullptr) {
m_Controllers = std::make_unique<TDecayRateController2Ary>(*other.m_Controllers);
}
m_TrendModel.reserve(other.m_TrendModel.size());
for (const auto& trend : other.m_TrendModel) {
m_TrendModel.emplace_back(trend->clone());
}
}
CMultivariateTimeSeriesModel::CMultivariateTimeSeriesModel(const common::SModelRestoreParams& params,
core::CStateRestoreTraverser& traverser)
: common::CModel(params.s_Params) {
if (traverser.traverseSubLevel([&](auto& traverser_) {
return this->acceptRestoreTraverser(params, traverser_);
}) == false) {
traverser.setBadState();
}
}
CMultivariateTimeSeriesModel::~CMultivariateTimeSeriesModel() = default;
std::size_t CMultivariateTimeSeriesModel::identifier() const {
return 0;
}
CMultivariateTimeSeriesModel* CMultivariateTimeSeriesModel::clone(std::size_t /*id*/) const {
return new CMultivariateTimeSeriesModel{*this};
}
CMultivariateTimeSeriesModel* CMultivariateTimeSeriesModel::cloneForPersistence() const {
return new CMultivariateTimeSeriesModel{*this};
}
CMultivariateTimeSeriesModel* CMultivariateTimeSeriesModel::cloneForForecast() const {
// Note: placeholder as there is no forecast support for multivariate time series for now
return new CMultivariateTimeSeriesModel{*this};
}
bool CMultivariateTimeSeriesModel::isForecastPossible() const {
return false;
}
void CMultivariateTimeSeriesModel::modelCorrelations(CTimeSeriesCorrelations& /*model*/) {
// no-op
}
CMultivariateTimeSeriesModel::TSize2Vec1Vec CMultivariateTimeSeriesModel::correlates() const {
return {};
}
void CMultivariateTimeSeriesModel::addBucketValue(const TTimeDouble2VecSizeTrVec& /*value*/) {
// no-op
}
CMultivariateTimeSeriesModel::EUpdateResult
CMultivariateTimeSeriesModel::addSamples(const common::CModelAddSamplesParams& params,
TTimeDouble2VecSizeTrVec samples) {
if (samples.empty()) {
return E_Success;
}
std::size_t dimension{this->dimension()};
samples.erase(std::remove_if(samples.begin(), samples.end(),
[&](const auto& sample) {
return sample.second.size() != dimension;
}),
samples.end());
// Update the data characteristics.
m_IsNonNegative = params.isNonNegative();
maths_t::EDataType type{params.type()};
m_ResidualModel->dataType(type);
if (m_MultibucketFeatureModel != nullptr) {
m_MultibucketFeatureModel->dataType(type);
}
for (auto& trendModel : m_TrendModel) {
trendModel->dataType(type);
}
EUpdateResult result{this->updateTrend(params, samples)};
this->updateResidualModels(params, std::move(samples));
// Age the anomaly model. Note that update requires the probability
// to be calculated. This is expensive to compute and so unlike our
// other model components is done in that function.
if (m_AnomalyModel != nullptr) {
m_AnomalyModel->propagateForwardsByTime(params.propagationInterval());
}
return result;
}
void CMultivariateTimeSeriesModel::skipTime(core_t::TTime gap) {
for (const auto& trend : m_TrendModel) {
trend->skipTime(gap);
}
}
CMultivariateTimeSeriesModel::TDouble2Vec
CMultivariateTimeSeriesModel::mode(core_t::TTime time,
const TDouble2VecWeightsAry& weights) const {
std::size_t dimension{this->dimension()};
TDouble2Vec result(dimension);
TDouble10Vec mode(m_ResidualModel->marginalLikelihoodMode(unpack(weights)));
for (std::size_t d = 0; d < dimension; ++d) {
result[d] = mode[d] + m_TrendModel[d]->value(time, 0.0, m_IsNonNegative).mean();
}
return result;
}
CMultivariateTimeSeriesModel::TDouble2Vec1Vec
CMultivariateTimeSeriesModel::correlateModes(core_t::TTime /*time*/,
const TDouble2VecWeightsAry1Vec& /*weights*/) const {
return {};
}
CMultivariateTimeSeriesModel::TDouble2Vec1Vec
CMultivariateTimeSeriesModel::residualModes(const TDouble2VecWeightsAry& weights) const {
TDouble10Vec1Vec modes(m_ResidualModel->marginalLikelihoodModes(unpack(weights)));
TDouble2Vec1Vec result;
result.reserve(modes.size());
for (const auto& mode : modes) {
result.push_back(mode);
}
return result;
}
void CMultivariateTimeSeriesModel::detrend(const TTime2Vec1Vec& time_,
double confidenceInterval,
TDouble2Vec1Vec& value) const {
std::size_t dimension{this->dimension()};
core_t::TTime time{time_[0][0]};
for (std::size_t d = 0; d < dimension; ++d) {
value[0][d] = m_TrendModel[d]->detrend(time, value[0][d],
confidenceInterval, m_IsNonNegative);
}
}
CMultivariateTimeSeriesModel::TDouble2Vec
CMultivariateTimeSeriesModel::predict(core_t::TTime time,
const TSizeDoublePr1Vec& /*correlated*/,
TDouble2Vec hint) const {
std::size_t dimension{this->dimension()};
if (hint.size() == dimension) {
for (std::size_t d = 0; d < dimension; ++d) {
hint[d] = m_TrendModel[d]->detrend(time, hint[d], 0.0, m_IsNonNegative);
}
}
TSize10Vec marginalize(dimension - 1);
std::iota(marginalize.begin(), marginalize.end(), 1);
TDouble2Vec result(dimension);
TDouble10Vec mean(m_ResidualModel->marginalLikelihoodMean());
for (std::size_t d = 0; d < dimension; --marginalize[std::min(d, dimension - 2)], ++d) {
double trend{0.0};
if (m_TrendModel[d]->initialized()) {
trend = m_TrendModel[d]->value(time, 0.0, m_IsNonNegative).mean();
}
double median{mean[d]};
if (m_ResidualModel->isNonInformative() == false) {
TUnivariatePriorPtr marginal{
m_ResidualModel->univariate(marginalize, NOTHING_TO_CONDITION).first};
median = hint.empty()
? common::CBasicStatistics::mean(
marginal->marginalLikelihoodConfidenceInterval(0.0))
: marginal->nearestMarginalLikelihoodMean(hint[d]);
}
result[d] = trend + median;
if (m_IsNonNegative) {
result[d] = std::max(result[d], 0.0);
}
}
return result;
}
CMultivariateTimeSeriesModel::TDouble2Vec3Vec
CMultivariateTimeSeriesModel::confidenceInterval(core_t::TTime time,
double confidenceInterval,
const TDouble2VecWeightsAry& weights_) const {
if (m_ResidualModel->isNonInformative()) {
return {};
}
std::size_t dimension{this->dimension()};
TSize10Vec marginalize(dimension - 1);
std::iota(marginalize.begin(), marginalize.end(), 1);
TDouble2Vec3Vec result(3, TDouble2Vec(dimension));
maths_t::TDoubleWeightsAry weights{maths_t::CUnitWeights::UNIT};
for (std::size_t d = 0; d < dimension; --marginalize[std::min(d, dimension - 2)], ++d) {
double trend{m_TrendModel[d]->initialized()
? m_TrendModel[d]->value(time, 0.0, m_IsNonNegative).mean()
: 0.0};
for (std::size_t i = 0; i < maths_t::NUMBER_WEIGHT_STYLES; ++i) {
weights[i] = weights_[i][d];
}
TUnivariatePriorPtr marginal{
m_ResidualModel->univariate(marginalize, NOTHING_TO_CONDITION).first};
double median{common::CBasicStatistics::mean(
marginal->marginalLikelihoodConfidenceInterval(0.0, weights))};
TDoubleDoublePr interval{marginal->marginalLikelihoodConfidenceInterval(
confidenceInterval, weights)};
result[0][d] = trend + interval.first;
result[1][d] = trend + median;
result[2][d] = trend + interval.second;
if (m_IsNonNegative) {
result[0][d] = std::max(result[0][d], 0.0);
result[1][d] = std::max(result[1][d], 0.0);
result[2][d] = std::max(result[2][d], 0.0);
}
}
return result;
}
bool CMultivariateTimeSeriesModel::forecast(core_t::TTime /*firstDataTime*/,
core_t::TTime /*lastDataTime*/,
core_t::TTime /*startTime*/,
core_t::TTime /*endTime*/,
double /*confidenceInterval*/,
const TDouble2Vec& /*minimum*/,
const TDouble2Vec& /*maximum*/,
const common::TForecastPushDatapointFunc& /*forecastPushDataPointFunc*/,
std::string& messageOut) {
LOG_DEBUG(<< forecast::ERROR_MULTIVARIATE);
messageOut = forecast::ERROR_MULTIVARIATE;
return false;
}
bool CMultivariateTimeSeriesModel::probability(const common::CModelProbabilityParams& params,
const TTime2Vec1Vec& time_,
const TDouble2Vec1Vec& value,
common::SModelProbabilityResult& result) const {
TSize2Vec coordinates(params.coordinates());
if (coordinates.empty()) {
coordinates.resize(this->dimension());
std::iota(coordinates.begin(), coordinates.end(), 0);
}
TTail2Vec tail(coordinates.size(), maths_t::E_UndeterminedTail);
result = common::SModelProbabilityResult{
1.0, false, {{common::SModelProbabilityResult::E_SingleBucketProbability, 1.0}}, tail, {}, {}};
std::size_t dimension{this->dimension()};
core_t::TTime time{time_[0][0]};
TDouble10Vec1Vec sample{TDouble10Vec(dimension)};
for (std::size_t d = 0; d < dimension; ++d) {
sample[0][d] = m_TrendModel[d]->detrend(
time, value[0][d], params.seasonalConfidenceInterval(), m_IsNonNegative);
}
maths_t::TDouble10VecWeightsAry1Vec weights{unpack(params.weights()[0])};
struct SJointProbability {
common::CJointProbabilityOfLessLikelySamples s_MarginalLower;
common::CJointProbabilityOfLessLikelySamples s_MarginalUpper;
common::CJointProbabilityOfLessLikelySamples s_ConditionalLower;
common::CJointProbabilityOfLessLikelySamples s_ConditionalUpper;
};
using TJointProbability2Vec = core::CSmallVector<SJointProbability, 2>;
auto updateJointProbabilities = [&](const TDouble10Vec2Vec& pl,
const TDouble10Vec2Vec& pu,
SJointProbability& joint) {
joint.s_MarginalLower.add(pl[0][0]);
joint.s_MarginalUpper.add(pu[0][0]);
joint.s_ConditionalLower.add(pl[1][0]);
joint.s_ConditionalUpper.add(pu[1][0]);
};
TJointProbability2Vec jointProbabilities(
m_MultibucketFeatureModel != nullptr && params.useMultibucketFeatures() ? 2 : 1);
double correlation{0.0};
for (std::size_t i = 0; i < coordinates.size(); ++i) {
maths_t::EProbabilityCalculation calculation{params.calculation(i)};
TSize10Vec coordinate{coordinates[i]};
TDouble10Vec2Vec pSingleBucket[2];
TTail10Vec tail_;
if (m_ResidualModel->probabilityOfLessLikelySamples(
calculation, sample, weights, coordinate, pSingleBucket[0],
pSingleBucket[1], tail_) == false) {
LOG_ERROR(<< "Failed to compute P(" << sample << " | weight = " << weights << ")");
return false;
}
updateJointProbabilities(pSingleBucket[0], pSingleBucket[1],
jointProbabilities[0]);
tail[i] = tail_[0];
if (m_MultibucketFeatureModel != nullptr && params.useMultibucketFeatures()) {
TDouble10Vec1Vec feature;
std::tie(feature, std::ignore) = m_MultibucketFeature->value();
if (feature.empty() == false) {
TDouble10Vec2Vec pMultiBucket[2]{{{1.0}, {1.0}}, {{1.0}, {1.0}}};
for (auto calculation_ : expand(calculation)) {
TDouble10Vec2Vec pl;
TDouble10Vec2Vec pu;
TTail10Vec dummy;
if (m_MultibucketFeatureModel->probabilityOfLessLikelySamples(
calculation_, feature,
maths_t::CUnitWeights::singleUnit<TDouble10Vec>(dimension),
coordinate, pl, pu, dummy) == false) {
LOG_ERROR(<< "Failed to compute P(" << feature << ")");
return false;
}
pMultiBucket[0][0][0] = std::min(pMultiBucket[0][0][0], pl[0][0]);
pMultiBucket[1][0][0] = std::min(pMultiBucket[1][0][0], pu[0][0]);
pMultiBucket[0][1][0] = std::min(pMultiBucket[0][1][0], pl[1][0]);
pMultiBucket[1][1][0] = std::min(pMultiBucket[1][1][0], pu[1][0]);
}
updateJointProbabilities(pMultiBucket[0], pMultiBucket[1],
jointProbabilities[1]);
correlation = m_MultibucketFeature->correlationWithBucketValue();
}
}
}
TDouble4Vec probabilities;
for (const auto& probability : jointProbabilities) {
double marginalLower;
double marginalUpper;
double conditionalLower;
double conditionalUpper;
if (probability.s_MarginalLower.calculate(marginalLower) == false ||
probability.s_MarginalUpper.calculate(marginalUpper) == false ||
probability.s_ConditionalLower.calculate(conditionalLower) == false ||
probability.s_ConditionalUpper.calculate(conditionalUpper) == false) {
return false;
}
probabilities.push_back((std::sqrt(marginalLower * conditionalLower) +
std::sqrt(marginalUpper * conditionalUpper)) /
2.0);
}
common::SModelProbabilityResult::EFeatureProbabilityLabel labels[]{
common::SModelProbabilityResult::E_SingleBucketProbability,
common::SModelProbabilityResult::E_MultiBucketProbability};
common::SModelProbabilityResult::TFeatureProbability4Vec featureProbabilities;
for (std::size_t i = 0; i < probabilities.size(); ++i) {
featureProbabilities.emplace_back(labels[i], probabilities[i]);
}
double pOverall{aggregateFeatureProbabilities(probabilities, correlation)};
if (m_AnomalyModel != nullptr && params.useAnomalyModel()) {
double residual{0.0};
TDouble10Vec nearest(m_ResidualModel->nearestMarginalLikelihoodMean(sample[0]));
TDouble2Vec seasonalWeight;
this->seasonalWeight(0.0, time, seasonalWeight);
for (std::size_t i = 0; i < dimension; ++i) {
residual += (sample[0][i] - nearest[i]) /
std::max(std::sqrt(seasonalWeight[i]), 1.0);
}
double pSingleBucket{probabilities[0]};
m_AnomalyModel->sample(params, time, residual, pSingleBucket, pOverall);
double pAnomaly;
std::tie(pOverall, pAnomaly) = m_AnomalyModel->probability(pSingleBucket, pOverall);
probabilities.push_back(pAnomaly);
featureProbabilities.emplace_back(
common::SModelProbabilityResult::E_AnomalyModelProbability, pAnomaly);
}
result.s_Probability = pOverall;
result.s_FeatureProbabilities = std::move(featureProbabilities);
result.s_Tail = tail;
return true;
}
void CMultivariateTimeSeriesModel::countWeights(core_t::TTime time,
const TDouble2Vec& value,
double trendCountWeight,
double residualCountWeight,
double outlierWeightDerate,
double countVarianceScale,
TDouble2VecWeightsAry& trendWeights,
TDouble2VecWeightsAry& residuaWeights) const {
std::size_t dimension{this->dimension()};
TDouble2Vec seasonalWeight;
this->seasonalWeight(0.0, time, seasonalWeight);
TDouble2Vec trendCountWeights(dimension, trendCountWeight);
TDouble2Vec residualCountWeights(dimension, residualCountWeight);
TDouble2Vec trendOutlierWeight(dimension);
TDouble2Vec residualOutlierWeight(dimension);
TDouble2Vec countVarianceScales(dimension, 1.0);
TDouble10Vec sample(dimension);
for (std::size_t d = 0; d < dimension; ++d) {
sample[d] = m_TrendModel[d]->detrend(time, value[d], 0.0, m_IsNonNegative);
if (m_TrendModel[d]->seasonalComponents().empty()) {
trendCountWeights[d] /= countVarianceScale;
countVarianceScales[d] = countVarianceScale;
}
}
for (std::size_t d = 0; d < dimension; ++d) {
double changeWeight{m_TrendModel[d]->countWeight(time)};
auto weights = maths_t::CUnitWeights::UNIT;
maths_t::setCount(std::min(residualCountWeight / trendCountWeight, 1.0), weights);
maths_t::setSeasonalVarianceScale(seasonalWeight[d], weights);
double outlierWeight{outliers::weight(
*conditional(*m_ResidualModel, d, sample), weights,
std::max(outlierWeightDerate,
m_TrendModel[d]->outlierWeightDerate(time, sample[d])),
sample[d])};
residualCountWeights[d] *= changeWeight;
trendOutlierWeight[d] = outlierWeight * changeWeight;
residualOutlierWeight[d] = outlierWeight;
}
maths_t::setCount(trendCountWeights, trendWeights);
maths_t::setCount(residualCountWeights, residuaWeights);
maths_t::setOutlierWeight(trendOutlierWeight, trendWeights);
maths_t::setOutlierWeight(trendOutlierWeight, residuaWeights);
maths_t::setCountVarianceScale(countVarianceScales, trendWeights);
maths_t::setCountVarianceScale(countVarianceScales, residuaWeights);
}
void CMultivariateTimeSeriesModel::addCountWeights(core_t::TTime time,
double trendCountWeight,
double residualCountWeight,
double countVarianceScale,
TDouble2VecWeightsAry& trendWeights,
TDouble2VecWeightsAry& residuaWeights) const {
std::size_t dimension{this->dimension()};
TDouble2Vec trendCountWeights(dimension, trendCountWeight);
TDouble2Vec residualCountWeights(dimension, residualCountWeight);
for (std::size_t d = 0; d < dimension; ++d) {
if (m_TrendModel[d]->seasonalComponents().empty()) {
trendCountWeights[d] /= countVarianceScale;
}
residualCountWeights[d] *= m_TrendModel[d]->countWeight(time);
}
maths_t::addCount(trendCountWeights, trendWeights);
maths_t::addCount(residualCountWeights, residuaWeights);
}
void CMultivariateTimeSeriesModel::seasonalWeight(double confidence,
core_t::TTime time,
TDouble2Vec& weight) const {
std::size_t dimension{this->dimension()};
weight.resize(dimension);
TDouble10Vec variances(m_ResidualModel->marginalLikelihoodVariances());
for (std::size_t d = 0; d < dimension; ++d) {
double scale{m_TrendModel[d]->varianceScaleWeight(time, variances[d],
confidence)(1)};
weight[d] = std::max(scale, this->params().minimumSeasonalVarianceScale());
}
}
std::uint64_t CMultivariateTimeSeriesModel::checksum(std::uint64_t seed) const {
seed = common::CChecksum::calculate(seed, m_IsNonNegative);
seed = common::CChecksum::calculate(seed, m_Controllers);
seed = common::CChecksum::calculate(seed, m_TrendModel);
seed = common::CChecksum::calculate(seed, m_ResidualModel);
seed = common::CChecksum::calculate(seed, m_MultibucketFeature);
seed = common::CChecksum::calculate(seed, m_MultibucketFeatureModel);
return common::CChecksum::calculate(seed, m_AnomalyModel);
}
void CMultivariateTimeSeriesModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CMultivariateTimeSeriesModel");
core::memory_debug::dynamicSize("m_Controllers", m_Controllers, mem);
core::memory_debug::dynamicSize("m_TrendModel", m_TrendModel, mem);
core::memory_debug::dynamicSize("m_ResidualModel", m_ResidualModel, mem);
core::memory_debug::dynamicSize("m_MultibucketFeature", m_MultibucketFeature, mem);
core::memory_debug::dynamicSize("m_MultibucketFeatureModel",
m_MultibucketFeatureModel, mem);
core::memory_debug::dynamicSize("m_AnomalyModel", m_AnomalyModel, mem);
}
std::size_t CMultivariateTimeSeriesModel::memoryUsage() const {
return core::memory::dynamicSize(m_Controllers) +
core::memory::dynamicSize(m_TrendModel) +
core::memory::dynamicSize(m_ResidualModel) +
core::memory::dynamicSize(m_MultibucketFeature) +
core::memory::dynamicSize(m_MultibucketFeatureModel) +
core::memory::dynamicSize(m_AnomalyModel);
}
bool CMultivariateTimeSeriesModel::acceptRestoreTraverser(const common::SModelRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
if (traverser.name() == VERSION_7_11_TAG) {
while (traverser.next()) {
const std::string& name{traverser.name()};
RESTORE_BOOL(IS_NON_NEGATIVE_6_3_TAG, m_IsNonNegative)
RESTORE_SETUP_TEARDOWN(
CONTROLLER_6_3_TAG,
m_Controllers = std::make_unique<TDecayRateController2Ary>(),
core::CPersistUtils::restore(CONTROLLER_6_3_TAG, *m_Controllers, traverser),
/**/)
RESTORE_SETUP_TEARDOWN(
TREND_MODEL_6_3_TAG, m_TrendModel.emplace_back(),
traverser.traverseSubLevel([
&, serialiser = CTimeSeriesDecompositionStateSerialiser{}
](auto& traverser_) {
return serialiser(params.s_DecompositionParams, m_TrendModel.back(), traverser_);
}),
/**/)
RESTORE(RESIDUAL_MODEL_6_3_TAG,
traverser.traverseSubLevel(
[&, serialiser = common::CPriorStateSerialiser{} ](auto& traverser_) {
return serialiser(params.s_DistributionParams,
m_ResidualModel, traverser_);
}))
RESTORE(MULTIBUCKET_FEATURE_6_3_TAG, traverser.traverseSubLevel([
this, serialiser = CTimeSeriesMultibucketFeatureSerialiser{}
](auto& traverser_) {
return serialiser(m_MultibucketFeature, traverser_);
}))
RESTORE(MULTIBUCKET_FEATURE_MODEL_6_3_TAG,
traverser.traverseSubLevel(
[&, serialiser = common::CPriorStateSerialiser{} ](auto& traverser_) {
return serialiser(params.s_DistributionParams,
m_MultibucketFeatureModel, traverser_);
}))
RESTORE_SETUP_TEARDOWN(
ANOMALY_MODEL_6_3_TAG,
m_AnomalyModel = std::make_unique<CTimeSeriesAnomalyModel>(),
traverser.traverseSubLevel([&](auto& traverser_) {
return m_AnomalyModel->acceptRestoreTraverser(params, traverser_);
}),
/**/)
}
} else {
LOG_ERROR(<< "Unsupported version '" << traverser.name() << "'");
return false;
}
this->checkRestoredInvariants();
return true;
}
void CMultivariateTimeSeriesModel::checkRestoredInvariants() const {
for (const auto& trendModel : m_TrendModel) {
VIOLATES_INVARIANT_NO_EVALUATION(trendModel, ==, nullptr);
}
VIOLATES_INVARIANT_NO_EVALUATION(m_ResidualModel, ==, nullptr);
VIOLATES_INVARIANT(m_TrendModel.size(), !=, this->dimension());
VIOLATES_INVARIANT_NO_EVALUATION(m_Controllers, ==, nullptr);
VIOLATES_INVARIANT((*m_Controllers)[E_TrendControl].dimension(), !=, this->dimension());
VIOLATES_INVARIANT((*m_Controllers)[E_ResidualControl].dimension(), !=,
this->dimension());
}
void CMultivariateTimeSeriesModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
// Note that we don't persist this->params() because that state
// is reinitialized.
inserter.insertValue(VERSION_7_11_TAG, "");
inserter.insertValue(IS_NON_NEGATIVE_6_3_TAG, static_cast<int>(m_IsNonNegative));
if (m_Controllers) {
core::CPersistUtils::persist(CONTROLLER_6_3_TAG, *m_Controllers, inserter);
}
for (const auto& trend : m_TrendModel) {
inserter.insertLevel(TREND_MODEL_6_3_TAG, [
&trend, serialiser = CTimeSeriesDecompositionStateSerialiser{}
](auto& inserter_) { serialiser(*trend, inserter_); });
}
if (m_ResidualModel != nullptr) {
inserter.insertLevel(RESIDUAL_MODEL_6_3_TAG, [
this, serialiser = common::CPriorStateSerialiser{}
](auto& inserter_) { serialiser(*m_ResidualModel, inserter_); });
}
if (m_MultibucketFeature != nullptr) {
inserter.insertLevel(MULTIBUCKET_FEATURE_6_3_TAG, [
this, serialiser = CTimeSeriesMultibucketFeatureSerialiser{}
](auto& inserter_) { serialiser(m_MultibucketFeature, inserter_); });
}
if (m_MultibucketFeatureModel != nullptr) {
inserter.insertLevel(MULTIBUCKET_FEATURE_MODEL_6_3_TAG, [
this, serialiser = common::CPriorStateSerialiser{}
](auto& inserter_) { serialiser(*m_MultibucketFeatureModel, inserter_); });
}
if (m_AnomalyModel != nullptr) {
inserter.insertLevel(ANOMALY_MODEL_6_3_TAG, [this](auto& inserter_) {
m_AnomalyModel->acceptPersistInserter(inserter_);
});
}
}
void CMultivariateTimeSeriesModel::persistModelsState(core::CStatePersistInserter& /* inserter*/) const {
// NO-OP
}
maths_t::EDataType CMultivariateTimeSeriesModel::dataType() const {
return m_ResidualModel->dataType();
}
CMultivariateTimeSeriesModel::TDouble10VecWeightsAry
CMultivariateTimeSeriesModel::unpack(const TDouble2VecWeightsAry& weights) {
TDouble10VecWeightsAry result{maths_t::CUnitWeights::unit<TDouble10Vec>(weights[0])};
for (std::size_t i = 0; i < weights.size(); ++i) {
result[i] = weights[i];
}
return result;
}
const CMultivariateTimeSeriesModel::TDecompositionPtr10Vec&
CMultivariateTimeSeriesModel::trendModel() const {
return m_TrendModel;
}
const common::CMultivariatePrior& CMultivariateTimeSeriesModel::residualModel() const {
return *m_ResidualModel;
}
const CMultivariateTimeSeriesModel::TDecayRateController2Ary*
CMultivariateTimeSeriesModel::decayRateControllers() const {
return m_Controllers.get();
}
CMultivariateTimeSeriesModel::EUpdateResult
CMultivariateTimeSeriesModel::updateTrend(const common::CModelAddSamplesParams& params,
const TTimeDouble2VecSizeTrVec& samples) {
std::size_t dimension{this->dimension()};
for (const auto& sample : samples) {
if (sample.second.size() != dimension) {
LOG_ERROR(<< "Dimension mismatch: '" << sample.second.size()
<< " != " << m_TrendModel.size() << "'");
return E_Failure;
}
}
// Time order is not a total order, for example if the data are polled
// the times of all samples will be the same. So break ties using the
// sample value.
TSize1Vec timeorder(samples.size());
std::iota(timeorder.begin(), timeorder.end(), 0);
std::stable_sort(timeorder.begin(), timeorder.end(),
[&samples](std::size_t lhs, std::size_t rhs) {
return common::COrderings::lexicographicalCompare(
samples[lhs].first, samples[lhs].second,
samples[rhs].first, samples[rhs].second);
});
// Do the update.
EUpdateResult result{E_Success};
auto componentChangeCallback = [&result](TFloatMeanAccumulatorVec) {
result = E_Reset;
};
const auto& weights = params.trendWeights();
const auto& modelAnnotationCallback = params.annotationCallback();
double occupancy{params.bucketOccupancy()};
core_t::TTime firstValueTime{params.firstValueTime()};
maths_t::TDoubleWeightsAry weight;
for (auto i : timeorder) {
core_t::TTime time{samples[i].first};
TDouble10Vec value(samples[i].second);
for (std::size_t d = 0; d < dimension; ++d) {
for (std::size_t j = 0; j < maths_t::NUMBER_WEIGHT_STYLES; ++j) {
weight[j] = weights[i][j][d];
}
m_TrendModel[d]->addPoint(time, value[d], params.memoryCircuitBreaker(),
weight, componentChangeCallback,
modelAnnotationCallback, occupancy, firstValueTime);
}
}
if (result == E_Reset) {
TFloatMeanAccumulatorVec10Vec window(dimension);
for (std::size_t d = 0; d < dimension; ++d) {
window[d] = m_TrendModel[d]->residuals(m_IsNonNegative);
}
this->reinitializeStateGivenNewComponent(params, std::move(window));
}
return result;
}
void CMultivariateTimeSeriesModel::updateResidualModels(const common::CModelAddSamplesParams& params,
TTimeDouble2VecSizeTrVec samples) {
std::size_t dimension{this->dimension()};
for (auto& residual : samples) {
core_t::TTime time{residual.first};
for (std::size_t d = 0; d < dimension; ++d) {
residual.second[d] = m_TrendModel[d]->detrend(time, residual.second[d],
0.0, m_IsNonNegative);
}
}
// We add the samples in value order since it makes clustering more stable.
TSize1Vec valueorder(samples.size());
std::iota(valueorder.begin(), valueorder.end(), 0);
std::stable_sort(valueorder.begin(), valueorder.end(),
[&](std::size_t lhs, std::size_t rhs) {
return samples[lhs].second < samples[rhs].second;
});
TDouble10Vec1Vec residuals;
maths_t::TDouble10VecWeightsAry1Vec weights;
samples.reserve(samples.size());
weights.reserve(samples.size());
TMeanAccumulator averageTimeAccumulator;
for (auto i : valueorder) {
core_t::TTime time{samples[i].first};
auto weight = unpack(params.priorWeights()[i]);
residuals.push_back(samples[i].second);
weights.push_back(weight);
averageTimeAccumulator.add(static_cast<double>(time));
}
core_t::TTime averageTime{static_cast<core_t::TTime>(
common::CBasicStatistics::mean(averageTimeAccumulator))};
m_ResidualModel->addSamples(residuals, weights);
m_ResidualModel->propagateForwardsByTime(params.propagationInterval());
if (m_MultibucketFeatureModel != nullptr) {
TDouble2Vec seasonalWeight;
for (std::size_t i = 0; i < valueorder.size(); ++i) {
core_t::TTime time{samples[valueorder[i]].first};
this->seasonalWeight(0.0, time, seasonalWeight);
maths_t::setSeasonalVarianceScale(TDouble10Vec(seasonalWeight), weights[i]);
}
m_MultibucketFeature->add(averageTime, this->params().bucketLength(),
residuals, weights);
const auto & [ feature, featureWeight ] = m_MultibucketFeature->value();
if (feature.empty() == false) {
m_MultibucketFeatureModel->addSamples(feature, featureWeight);
m_MultibucketFeatureModel->propagateForwardsByTime(params.propagationInterval());
}
}
this->updateDecayRates(params, averageTime, residuals);
}
void CMultivariateTimeSeriesModel::updateDecayRates(const common::CModelAddSamplesParams& params,
core_t::TTime time,
const TDouble10Vec1Vec& samples) {
if (m_Controllers != nullptr) {
TDouble1VecVec errors[2];
errors[0].reserve(samples.size());
errors[1].reserve(samples.size());
for (const auto& sample : samples) {
this->appendPredictionErrors(params.propagationInterval(), sample, errors);
}
{
CDecayRateController& controller{(*m_Controllers)[E_TrendControl]};
TDouble1Vec trendMean(this->dimension());
for (std::size_t d = 0; d < trendMean.size(); ++d) {
trendMean[d] = m_TrendModel[d]->meanValue(time);
}
double multiplier{controller.multiplier(
trendMean, errors[E_TrendControl], this->params().bucketLength(),
this->params().learnRate(), this->params().decayRate())};
if (multiplier != 1.0) {
for (const auto& trend : m_TrendModel) {
trend->decayRate(multiplier * trend->decayRate());
}
LOG_TRACE(<< "trend decay rate = " << m_TrendModel[0]->decayRate());
}
}
{
CDecayRateController& controller{(*m_Controllers)[E_ResidualControl]};
TDouble1Vec residualMean(m_ResidualModel->marginalLikelihoodMean());
double multiplier{controller.multiplier(
residualMean, errors[E_ResidualControl], this->params().bucketLength(),
this->params().learnRate(), this->params().decayRate())};
if (multiplier != 1.0) {
m_ResidualModel->decayRate(multiplier * m_ResidualModel->decayRate());
LOG_TRACE(<< "prior decay rate = " << m_ResidualModel->decayRate());
}
}
}
}
void CMultivariateTimeSeriesModel::appendPredictionErrors(double interval,
const TDouble10Vec& sample,
TDouble1VecVec (&result)[2]) {
if (auto error = predictionError(interval, m_ResidualModel, sample)) {
result[E_ResidualControl].push_back(*error);
}
if (auto error = predictionError(m_TrendModel, sample)) {
result[E_TrendControl].push_back(*error);
}
}
void CMultivariateTimeSeriesModel::reinitializeStateGivenNewComponent(
const common::CModelAddSamplesParams& params,
TFloatMeanAccumulatorVec10Vec residuals) {
if (m_Controllers != nullptr) {
m_ResidualModel->decayRate(m_ResidualModel->decayRate() /
(*m_Controllers)[E_ResidualControl].multiplier());
for (auto& trend : m_TrendModel) {
trend->decayRate(trend->decayRate() /
(*m_Controllers)[E_TrendControl].multiplier());
}
for (auto& controller : *m_Controllers) {
controller.reset();
}
}
// Reinitialize the residual model with any values we've been given. Note
// that if we have sparse data we reduce the sample weights so we smoothly
// transition to modelling only non-empty values. This must be reflected
// in the sample weights when reinitialising the residual model.
m_ResidualModel->setToNonInformative(0.0, m_ResidualModel->decayRate());
if (residuals.empty() == false) {
std::size_t dimension{this->dimension()};
double emptyBucketWeight{CModel::emptyBucketWeight(params.bucketOccupancy())};
TDouble10VecVec samples;
TDoubleVec weights;
double time{0.0};
for (std::size_t d = 0; d < dimension; ++d) {
samples.resize(residuals[d].size(), TDouble10Vec(dimension));
weights.resize(residuals[d].size(), std::numeric_limits<double>::max());
double buckets{
std::accumulate(residuals[d].begin(), residuals[d].end(), 0.0,
[](auto partialBuckets, const auto& residual) {
return partialBuckets +
common::CBasicStatistics::count(residual);
}) /
this->params().learnRate()};
time += buckets / static_cast<double>(residuals.size());
for (std::size_t i = 0; i < residuals[d].size(); ++i) {
samples[i][d] = common::CBasicStatistics::mean(residuals[d][i]);
weights[i] = std::min(
weights[i], emptyBucketWeight * static_cast<double>(common::CBasicStatistics::count(
residuals[d][i])));
}
}
time *= emptyBucketWeight / static_cast<double>(dimension);
maths_t::TDouble10VecWeightsAry1Vec weight(1);
for (std::size_t i = 0; i < samples.size(); ++i) {
if (weights[i] > 0.0) {
weight[0] = maths_t::countWeight(weights[i], dimension);
m_ResidualModel->addSamples({samples[i]}, weight);
m_ResidualModel->propagateForwardsByTime(time);
}
}
}
// Reset the multi-bucket residual model. We can't reinitialize this from
// the initial values because they are not typically at the granularity of
// the job's bucket length.
if (m_MultibucketFeature != nullptr) {
m_MultibucketFeature->clear();
}
if (m_MultibucketFeatureModel != nullptr) {
m_MultibucketFeatureModel->setToNonInformative(0.0, m_ResidualModel->decayRate());
}
if (m_AnomalyModel != nullptr) {
m_AnomalyModel->reset();
}
}
std::size_t CMultivariateTimeSeriesModel::dimension() const {
return m_ResidualModel->dimension();
}
void CMultivariateTimeSeriesModel::shiftTime(core_t::TTime time, core_t::TTime shift) {
for (auto& trend : m_TrendModel) {
trend->shiftTime(time, shift);
static_cast<CTimeSeriesDecomposition*>(trend.get())->resetChangePointTest(time);
}
}
void CUnivariateTimeSeriesModel::shiftTime(core_t::TTime time, core_t::TTime shift) {
m_TrendModel->shiftTime(time, shift);
static_cast<CTimeSeriesDecomposition*>(m_TrendModel.get())->resetChangePointTest(time);
}
}
}
}