lib/maths/time_series/CTimeSeriesTestForChange.cc (730 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <maths/time_series/CTimeSeriesTestForChange.h> #include <core/CContainerPrinter.h> #include <core/CIEEE754.h> #include <core/CLogger.h> #include <core/CStatePersistInserter.h> #include <core/CStateRestoreTraverser.h> #include <core/Constants.h> #include <core/RestoreMacros.h> #include <maths/common/CBasicStatistics.h> #include <maths/common/CChecksum.h> #include <maths/common/CInformationCriteria.h> #include <maths/common/CLeastSquaresOnlineRegression.h> #include <maths/common/CLeastSquaresOnlineRegressionDetail.h> #include <maths/common/CLinearAlgebra.h> #include <maths/common/CMathsFuncs.h> #include <maths/common/CStatisticalTests.h> #include <maths/common/CTools.h> #include <maths/time_series/CCalendarComponent.h> #include <maths/time_series/CSeasonalComponent.h> #include <maths/time_series/CTimeSeriesDecomposition.h> #include <maths/time_series/CTimeSeriesSegmentation.h> #include <maths/time_series/CTrendComponent.h> #include <algorithm> #include <limits> #include <memory> #include <vector> namespace ml { namespace maths { namespace time_series { namespace { using TDoubleVec = std::vector<double>; using TSegmentation = CTimeSeriesSegmentation; using TMeanAccumulator = common::CBasicStatistics::SSampleMean<double>::TAccumulator; constexpr double EPS{0.1}; constexpr core_t::TTime HALF_HOUR{core::constants::HOUR / 2}; constexpr core_t::TTime HOUR{core::constants::HOUR}; constexpr core_t::TTime WEEK{core::constants::WEEK}; // The time shift test is a lot less powerful if we don't see much variation // in values since shifting a flat segment of a time series leaves it largely // unchanged. However, the test can still have high significance. We use a // simple criterion to mitigate: we require that we see a significant fraction // of the predictions' total variance before we trust the result. constexpr double MIN_VARIANCE_FOR_SIGNIFICANCE{0.25}; const double CHANGE_POINT_DECAY_CONSTANT{3.0 * std::log(0.95) / static_cast<double>(HOUR)}; std::size_t largestShift(const TDoubleVec& shifts) { std::size_t result{0}; double largest{0.0}; for (std::size_t i = 1; i < shifts.size(); ++i) { double shift{std::fabs(shifts[i] - shifts[i - 1])}; // We prefer the earliest shift which is within 10% of the maximum. if (shift > (1.0 + EPS) * largest) { largest = shift; result = i; } } return result; } std::size_t largestScale(const TDoubleVec& scales) { std::size_t result{0}; double largest{0.0}; for (std::size_t i = 1; i < scales.size(); ++i) { double scale{std::fabs(scales[i] - scales[i - 1])}; // We prefer the earliest scale which is within 10% of the maximum. if (scale > (1.0 + EPS) * largest) { largest = scale; result = i; } } return result; } const core::TPersistenceTag TYPE_TAG{"a", "change_type"}; const core::TPersistenceTag TIME_TAG{"b", "change_time"}; const core::TPersistenceTag VALUE_TAG{"c", "change_value"}; const core::TPersistenceTag SIGNIFICANT_P_VALUE_TAG{"d", "significant_p_value"}; const core::TPersistenceTag MAGNITUDE_TAG{"e", "magnitude"}; const core::TPersistenceTag PREDICTION_VARIANCE_TAG{"f", "prediction_variance"}; } COutlierWeightDerate::COutlierWeightDerate(double magnitude) : m_Magnitude{std::fabs(magnitude)} { } double COutlierWeightDerate::value(double error) const { // We do not want to model every change for a signal which flip-flops back // and forward between similar values generating anomalies every time it // changes. Therefore, we derate the weight we apply to outlying values if // they are similar in magnitude to any change we just detected. However, // extreme outliers after a change can significantly pollute the model if // we apply a blanket derate. Any choice here which forces the error to // grow is sufficient to prevent flip-flopping. return std::max(1.0 - 0.5 * std::fabs(error) / m_Magnitude, 0.0); } bool COutlierWeightDerate::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { do { const std::string& name{traverser.name()}; RESTORE_BUILT_IN(MAGNITUDE_TAG, m_Magnitude) } while (traverser.next()); return true; } void COutlierWeightDerate::acceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertValue(MAGNITUDE_TAG, m_Magnitude, core::CIEEE754::E_DoublePrecision); } std::uint64_t COutlierWeightDerate::checksum(std::uint64_t seed) const { return common::CChecksum::calculate(seed, m_Magnitude); } CChangePoint::CChangePoint(core_t::TTime time, TFloatMeanAccumulatorVec residuals, double significantPValue) : m_Time{time}, m_SignificantPValue{significantPValue}, m_Residuals{std::move(residuals)} { } CChangePoint::~CChangePoint() = default; std::uint64_t CChangePoint::checksum(std::uint64_t seed) const { seed = common::CChecksum::calculate(seed, this->type()); seed = common::CChecksum::calculate(seed, m_Time); seed = common::CChecksum::calculate(seed, m_SignificantPValue); seed = common::CChecksum::calculate(seed, m_Residuals); seed = common::CChecksum::calculate(seed, m_Mse); return common::CChecksum::calculate(seed, m_UndoneMse); } void CChangePoint::add(core_t::TTime time, core_t::TTime lastTime, double value, double weight, const TPredictor& predictor) { double factor{this->decayFactor(time, lastTime)}; m_Mse.add(common::CTools::pow2(value - predictor(time)), weight); m_Mse.age(factor); m_UndoneMse.add(common::CTools::pow2(value - this->undonePredict(predictor, time)), weight); m_UndoneMse.age(factor); } double CChangePoint::predictionVariance() const { return 0.0; } bool CChangePoint::shouldUndo() const { return common::CStatisticalTests::rightTailFTest( common::CBasicStatistics::mean(m_Mse), common::CBasicStatistics::mean(m_UndoneMse), common::CBasicStatistics::count(m_Mse), common::CBasicStatistics::count(m_UndoneMse)) < m_SignificantPValue; } double CChangePoint::decayFactor(core_t::TTime time, core_t::TTime lastTime) const { return std::exp(CHANGE_POINT_DECAY_CONSTANT * static_cast<double>(time - lastTime)); } CLevelShift::CLevelShift(core_t::TTime time, double shift, core_t::TTime valuesStartTime, core_t::TTime bucketLength, TFloatMeanAccumulatorVec values, TSizeVec segments, TDoubleVec shifts, TFloatMeanAccumulatorVec residuals, double significantPValue) : CChangePoint{time, std::move(residuals), significantPValue}, m_Shift{shift}, m_ValuesStartTime{valuesStartTime}, m_BucketLength{bucketLength}, m_Values{std::move(values)}, m_Segments{std::move(segments)}, m_Shifts{std::move(shifts)} { } CLevelShift::TChangePointUPtr CLevelShift::undoable() const { return {}; } bool CLevelShift::largeEnough(double threshold) const { return std::fabs(m_Shift) > threshold; } bool CLevelShift::longEnough(core_t::TTime time, core_t::TTime minimumDuration) const { return time >= this->time() + minimumDuration; } bool CLevelShift::apply(CTrendComponent& component) const { component.shiftLevel(m_Shift, m_ValuesStartTime, m_BucketLength, m_Values, m_Segments, m_Shifts); return true; } const std::string& CLevelShift::type() const { return TYPE; } std::string CLevelShift::print() const { return "level shift by " + core::CStringUtils::typeToString(m_Shift); } std::uint64_t CLevelShift::checksum(std::uint64_t seed) const { seed = this->CChangePoint::checksum(seed); seed = common::CChecksum::calculate(seed, m_Shift); seed = common::CChecksum::calculate(seed, m_ValuesStartTime); seed = common::CChecksum::calculate(seed, m_BucketLength); seed = common::CChecksum::calculate(seed, m_Values); seed = common::CChecksum::calculate(seed, m_Segments); return common::CChecksum::calculate(seed, m_Shifts); } const std::string CLevelShift::TYPE{"level shift"}; CScale::CScale(core_t::TTime time, double scale, double magnitude, double minimumDurationScale, TFloatMeanAccumulatorVec residuals, double significantPValue) : CChangePoint{time, std::move(residuals), significantPValue}, m_Scale{scale}, m_Magnitude{magnitude}, m_MinimumDurationScale{minimumDurationScale} { } CScale::TChangePointUPtr CScale::undoable() const { return {}; } bool CScale::largeEnough(double threshold) const { return m_Magnitude > threshold; } bool CScale::longEnough(core_t::TTime time, core_t::TTime minimumDuration) const { minimumDuration = static_cast<core_t::TTime>( static_cast<double>(minimumDuration) / m_MinimumDurationScale + 0.5); return time >= this->time() + minimumDuration; } bool CScale::apply(CTrendComponent& component) const { component.linearScale(this->time(), m_Scale); return true; } bool CScale::apply(CSeasonalComponent& component) const { component.linearScale(this->time(), m_Scale); return true; } bool CScale::apply(CCalendarComponent& component) const { component.linearScale(this->time(), m_Scale); return true; } const std::string& CScale::type() const { return TYPE; } std::string CScale::print() const { return "linear scale by " + core::CStringUtils::typeToString(m_Scale); } std::uint64_t CScale::checksum(std::uint64_t seed) const { seed = this->CChangePoint::checksum(seed); seed = common::CChecksum::calculate(seed, m_Scale); return common::CChecksum::calculate(seed, m_Magnitude); } const std::string CScale::TYPE{"scale"}; CTimeShift::CTimeShift(core_t::TTime time, core_t::TTime shift, TFloatMeanAccumulatorVec residuals, double significantPValue, double predictionVariance) : CChangePoint{time, std::move(residuals), significantPValue}, m_Shift{shift}, m_PredictionVariance{predictionVariance} { } CTimeShift::CTimeShift(core_t::TTime time, core_t::TTime shift, double significantPValue, double predictionVariance) : CChangePoint{time, {}, significantPValue}, m_Shift{shift}, m_PredictionVariance{predictionVariance} { } CTimeShift::TChangePointUPtr CTimeShift::undoable() const { return std::make_unique<CTimeShift>( this->time(), -m_Shift, this->significantPValue(), m_PredictionVariance); } COutlierWeightDerate CTimeShift::outlierWeightDerate(core_t::TTime startTime, core_t::TTime endTime, const TPredictor& predictor) const { TMeanAccumulator result; for (core_t::TTime t = startTime, dt = (endTime - startTime) / 20; t <= endTime; t += dt) { result.add(std::fabs(predictor(t) - predictor(t + m_Shift))); } return COutlierWeightDerate{common::CBasicStatistics::mean(result)}; } bool CTimeShift::longEnough(core_t::TTime time, core_t::TTime minimumDuration) const { return time >= this->time() + minimumDuration; } bool CTimeShift::apply(CTimeSeriesDecomposition& decomposition) const { decomposition.shiftTime(this->time(), m_Shift); return true; } const std::string& CTimeShift::type() const { return TYPE; } std::string CTimeShift::print() const { return "time shift by " + core::CStringUtils::typeToString(m_Shift) + "s"; } std::uint64_t CTimeShift::checksum(std::uint64_t seed) const { seed = this->CChangePoint::checksum(seed); seed = common::CChecksum::calculate(seed, m_Shift); seed = common::CChecksum::calculate(seed, m_PredictionVariance); return common::CChecksum::calculate(seed, m_ValueMoments); } double CTimeShift::predictionVariance() const { return m_PredictionVariance; } bool CTimeShift::shouldUndo() const { // See MIN_VARIANCE_FOR_SIGNIFICANCE for more info on this condition. return common::CBasicStatistics::variance(m_ValueMoments) >= MIN_VARIANCE_FOR_SIGNIFICANCE * m_PredictionVariance && this->CChangePoint::shouldUndo(); } void CTimeShift::add(core_t::TTime time, core_t::TTime lastTime, double value, double weight, const TPredictor& predictor) { double factor{this->decayFactor(time, lastTime)}; m_ValueMoments.add(value, weight); m_ValueMoments.age(factor); this->CChangePoint::add(time, lastTime, value, weight, predictor); } double CTimeShift::undonePredict(const TPredictor& predictor, core_t::TTime time) const { return predictor(time + m_Shift); } const std::string CTimeShift::TYPE{"time shift"}; bool CUndoableChangePointStateSerializer:: operator()(TChangePointUPtr& result, core::CStateRestoreTraverser& traverser) const { std::string type; core_t::TTime time{std::numeric_limits<core_t::TTime>::min()}; double value{std::numeric_limits<double>::quiet_NaN()}; double significantPValue{std::numeric_limits<double>::quiet_NaN()}; double predictionVariance{0.0}; do { const std::string& name{traverser.name()}; RESTORE_NO_ERROR(TYPE_TAG, type = traverser.value()) RESTORE_BUILT_IN(TIME_TAG, time) RESTORE_BUILT_IN(VALUE_TAG, value) RESTORE_BUILT_IN(SIGNIFICANT_P_VALUE_TAG, significantPValue) RESTORE_BUILT_IN(PREDICTION_VARIANCE_TAG, predictionVariance) } while (traverser.next()); if (time == std::numeric_limits<core_t::TTime>::min()) { LOG_ERROR(<< "Missing '" << TIME_TAG << "'"); return false; } if (common::CMathsFuncs::isFinite(value) == false) { LOG_ERROR(<< "Missing '" << VALUE_TAG << "'"); return false; } if (common::CMathsFuncs::isFinite(significantPValue) == false) { LOG_ERROR(<< "Missing '" << SIGNIFICANT_P_VALUE_TAG << "'"); return false; } if (type == CLevelShift::TYPE || type == CScale::TYPE) { LOG_ERROR(<< "Unexpected type '" << type << "'"); return false; } if (type == CTimeShift::TYPE) { result = std::make_unique<CTimeShift>(time, static_cast<core_t::TTime>(value), significantPValue, predictionVariance); return true; } LOG_ERROR(<< "Missing '" << TYPE_TAG << "'"); return false; } void CUndoableChangePointStateSerializer:: operator()(const CChangePoint& changePoint, core::CStatePersistInserter& inserter) const { inserter.insertValue(TYPE_TAG, changePoint.type()); inserter.insertValue(TIME_TAG, changePoint.time()); inserter.insertValue(VALUE_TAG, changePoint.value(), core::CIEEE754::E_DoublePrecision); inserter.insertValue(SIGNIFICANT_P_VALUE_TAG, changePoint.significantPValue(), core::CIEEE754::E_DoublePrecision); inserter.insertValue(PREDICTION_VARIANCE_TAG, changePoint.predictionVariance(), core::CIEEE754::E_DoublePrecision); } CTimeSeriesTestForChange::CTimeSeriesTestForChange(int testFor, core_t::TTime valuesStartTime, core_t::TTime bucketsStartTime, core_t::TTime bucketLength, core_t::TTime sampleInterval, TPredictor predictor, TFloatMeanAccumulatorVec values, double sampleVariance, double outlierFraction) : m_TestFor{testFor}, m_ValuesStartTime{valuesStartTime}, m_BucketsStartTime{bucketsStartTime}, m_BucketLength{bucketLength}, m_SampleInterval{sampleInterval}, m_SampleVariance{sampleVariance}, m_OutlierFraction{outlierFraction}, m_Predictor{std::move(predictor)}, m_Values{std::move(values)}, m_Outliers{static_cast<std::size_t>(std::max( outlierFraction * static_cast<double>(CSignal::countNotMissing(m_Values)) + 0.5, 1.0))} { TMeanVarAccumulator moments{this->truncatedMoments(m_OutlierFraction, m_Values)}; TMeanVarAccumulator meanAbs{this->truncatedMoments( m_OutlierFraction, m_Values, [](const TFloatMeanAccumulator& value) { return std::fabs(common::CBasicStatistics::mean(value)); })}; // Note we don't bother modelling changes whose size is too small compared // to the absolute values. We won't raise anomalies for differences from our // predictions which are smaller than this anyway. m_EpsVariance = std::max( common::CTools::pow2(1000.0 * std::numeric_limits<double>::epsilon()) * common::CBasicStatistics::maximumLikelihoodVariance(moments), common::CTools::pow2(common::MINIMUM_COEFFICIENT_OF_VARIATION * common::CBasicStatistics::mean(meanAbs))); LOG_TRACE(<< "eps variance = " << m_EpsVariance); TMeanVarAccumulator predictionMoments; auto bucketPredictor = this->bucketPredictor(); for (core_t::TTime time = 0; time < WEEK; time += m_BucketLength) { predictionMoments.add(bucketPredictor(time)); } m_PredictionVariance = common::CBasicStatistics::variance(predictionMoments); LOG_TRACE(<< "prediction variance = " << m_PredictionVariance); } CTimeSeriesTestForChange::TChangePointUPtr CTimeSeriesTestForChange::test() const { using TChangePointVec = std::vector<SChangePoint>; double variance; double truncatedVariance; double parameters; std::tie(variance, truncatedVariance, parameters) = this->quadraticTrend(); TChangePointVec changes; changes.reserve(3); if ((m_TestFor & E_LevelShift) != 0) { changes.push_back(this->levelShift(variance, truncatedVariance, parameters)); } if ((m_TestFor & E_LinearScale) != 0) { changes.push_back(this->scale(variance, truncatedVariance, parameters)); } if ((m_TestFor & E_TimeShift) != 0) { changes.push_back(this->timeShift(variance, truncatedVariance, parameters)); } changes.erase(std::remove_if(changes.begin(), changes.end(), [](const auto& change) { return change.s_ChangePoint == nullptr; }), changes.end()); LOG_TRACE(<< "# changes = " << changes.size()); if (changes.empty() == false) { std::stable_sort(changes.begin(), changes.end(), [](const auto& lhs, const auto& rhs) { return lhs.s_NumberParameters < rhs.s_NumberParameters; }); // If the simpler hypothesis is strongly selected by the raw variance then // prefer it. Otherwise, if there is strong evidence for a more complex // explanation select that otherwise fallback to AIC. double selectedEvidence{aic(changes[0])}; std::size_t selected{0}; LOG_TRACE(<< changes[0].s_ChangePoint->print() << " evidence = " << selectedEvidence); double n{static_cast<double>(CSignal::countNotMissing(m_Values))}; for (std::size_t candidate = 1; candidate < changes.size(); ++candidate) { double pValue{this->pValue(changes[candidate].s_ResidualVariance, changes[candidate].s_NumberParameters, changes[selected].s_ResidualVariance, changes[selected].s_NumberParameters, n)}; if (pValue < m_SignificantPValue) { continue; } pValue = this->pValue(changes[selected].s_ResidualVariance, changes[selected].s_TruncatedResidualVariance, changes[selected].s_NumberParameters, changes[candidate].s_ResidualVariance, changes[candidate].s_TruncatedResidualVariance, changes[candidate].s_NumberParameters, n); double evidence{aic(changes[candidate])}; LOG_TRACE(<< changes[candidate].s_ChangePoint->print() << " p-value = " << pValue << ", evidence = " << evidence); if (pValue < m_SignificantPValue || evidence < selectedEvidence) { std::tie(selectedEvidence, selected) = std::make_pair(evidence, candidate); } } return std::move(changes[selected].s_ChangePoint); } return {}; } CTimeSeriesTestForChange::TDoubleDoubleDoubleTr CTimeSeriesTestForChange::quadraticTrend() const { using TRegression = common::CLeastSquaresOnlineRegression<2, double>; m_ValuesMinusPredictions = removePredictions(this->bucketIndexPredictor(), m_Values); TRegression trend; TRegression::TArray parameters; parameters.fill(0.0); auto predictor = [&](std::size_t i) { return TRegression::predict(parameters, static_cast<double>(i)); }; for (std::size_t i = 0; i < 2; ++i) { CSignal::reweightOutliers(predictor, m_OutlierFraction, m_ValuesMinusPredictions); for (std::size_t j = 0; j < m_ValuesMinusPredictions.size(); ++j) { trend.add(static_cast<double>(j), common::CBasicStatistics::mean(m_ValuesMinusPredictions[j]), common::CBasicStatistics::count(m_ValuesMinusPredictions[j])); } trend.parameters(parameters); } m_ValuesMinusPredictions = removePredictions(predictor, std::move(m_ValuesMinusPredictions)); double variance; double truncatedVariance; std::tie(variance, truncatedVariance) = this->variances(m_ValuesMinusPredictions); LOG_TRACE(<< "variance = " << variance << ", truncated variance = " << truncatedVariance); return {variance, truncatedVariance, 3.0}; } CTimeSeriesTestForChange::SChangePoint CTimeSeriesTestForChange::levelShift(double varianceH0, double truncatedVarianceH0, double parametersH0) const { // Test for piecewise linear shift. We use a hypothesis test against a null // hypothesis that there is a quadratic trend. m_ValuesMinusPredictions = removePredictions(this->bucketIndexPredictor(), m_Values); TSizeVec segments{TSegmentation::piecewiseLinear( m_ValuesMinusPredictions, m_SignificantPValue, m_OutlierFraction, 3)}; if (segments.size() > 2) { TDoubleVec shifts; auto residuals = TSegmentation::removePiecewiseLinear( m_ValuesMinusPredictions, segments, m_OutlierFraction, shifts); double varianceH1; double truncatedVarianceH1; std::tie(varianceH1, truncatedVarianceH1) = this->variances(residuals); std::size_t shiftIndex{largestShift(shifts)}; std::size_t changeIndex{segments[shiftIndex]}; std::size_t lastChangeIndex{segments[segments.size() - 2]}; LOG_TRACE(<< "trend segments = " << segments); LOG_TRACE(<< "shifts = " << shifts << ", shift index = " << shiftIndex); LOG_TRACE(<< "variance = " << varianceH1 << ", truncated variance = " << truncatedVarianceH1 << ", sample variance = " << m_SampleVariance); LOG_TRACE(<< "change index = " << changeIndex); double n{static_cast<double>(CSignal::countNotMissing(m_ValuesMinusPredictions))}; double parametersH1{2.0 * static_cast<double>(segments.size() - 1)}; double pValue{this->pValue(varianceH0, truncatedVarianceH0, parametersH0, varianceH1, truncatedVarianceH1, parametersH1, n)}; LOG_TRACE(<< "shift p-value = " << pValue); if (pValue < m_AcceptedFalsePostiveRate) { TMeanAccumulator shift; double weight{1.0}; for (std::size_t i = residuals.size(); i > lastChangeIndex; --i, weight *= 0.85) { shift.add(common::CBasicStatistics::mean(m_ValuesMinusPredictions[i - 1]), weight * common::CBasicStatistics::count(residuals[i - 1])); } auto changePoint = std::make_unique<CLevelShift>( this->changeTime(changeIndex), common::CBasicStatistics::mean(shift), m_ValuesStartTime, m_BucketLength, m_Values, std::move(segments), std::move(shifts), std::move(residuals), m_SignificantPValue); return {varianceH1, truncatedVarianceH1, parametersH1, std::move(changePoint)}; } } return {}; } CTimeSeriesTestForChange::SChangePoint CTimeSeriesTestForChange::scale(double varianceH0, double truncatedVarianceH0, double parametersH0) const { // Test for linear scales of the base predictor. We use a hypothesis test // against a null hypothesis that there is a quadratic trend. auto predictor = this->bucketIndexPredictor(); TSizeVec segments{TSegmentation::piecewiseLinearScaledSeasonal( m_Values, predictor, m_SignificantPValue, 3)}; if (segments.size() > 2) { TDoubleVec scales; auto residuals = TSegmentation::removePiecewiseLinearScaledSeasonal( m_Values, predictor, segments, m_OutlierFraction, scales); double varianceH1; double truncatedVarianceH1; std::tie(varianceH1, truncatedVarianceH1) = this->variances(residuals); std::size_t scaleIndex{largestScale(scales)}; std::size_t changeIndex{segments[scaleIndex]}; std::size_t lastChangeIndex{segments[segments.size() - 2]}; LOG_TRACE(<< "scale segments = " << segments); LOG_TRACE(<< "scales = " << scales << ", scale index = " << scaleIndex); LOG_TRACE(<< "variance = " << varianceH1 << ", truncated variance = " << truncatedVarianceH1 << ", sample variance = " << m_SampleVariance); LOG_TRACE(<< "change index = " << changeIndex); double n{static_cast<double>(CSignal::countNotMissing(m_ValuesMinusPredictions))}; double parametersH1{static_cast<double>(segments.size() - 1)}; double pValue{this->pValue(varianceH0, truncatedVarianceH0, parametersH0, varianceH1, truncatedVarianceH1, parametersH1, n)}; LOG_TRACE(<< "scale p-value = " << pValue); if (pValue < m_AcceptedFalsePostiveRate) { TMeanAccumulator projection; TMeanAccumulator Z; double weight{1.0}; for (std::size_t i = residuals.size(); i > lastChangeIndex; --i, weight *= 0.85) { double x{common::CBasicStatistics::mean(m_Values[i - 1])}; double p{predictor(i - 1)}; double w{weight * common::CBasicStatistics::count(residuals[i - 1]) * std::fabs(p)}; if (w > 0.0) { projection.add(x * p, w); Z.add(p * p, w); } } double scale{common::CBasicStatistics::mean(Z) == 0.0 ? 1.0 : std::max(common::CBasicStatistics::mean(projection) / common::CBasicStatistics::mean(Z), 0.0)}; LOG_TRACE(<< "scale = " << scale); // The impact of applying a scale is less clear for small values. // We therefore wait to see more data if the predicted absolute // values we've observed to change are relatively small. TMeanAccumulator averagePredictionBeforeChange; TMeanAccumulator averagePredictionAfterChange; for (std::size_t i = 0; i < changeIndex; ++i) { averagePredictionBeforeChange.add(std::fabs(predictor(i))); } for (std::size_t i = changeIndex; i < m_Values.size(); ++i) { averagePredictionAfterChange.add(std::fabs(predictor(i))); } double minimumDurationScale{std::min( common::CBasicStatistics::mean(averagePredictionAfterChange) / common::CBasicStatistics::mean(averagePredictionBeforeChange), 1.0)}; LOG_TRACE(<< "minimum duration scale = " << minimumDurationScale); auto changePoint = std::make_unique<CScale>( this->changeTime(changeIndex), scale, std::fabs(scale - 1.0) * std::sqrt(common::CBasicStatistics::mean(Z)), minimumDurationScale, std::move(residuals), m_SignificantPValue); return {varianceH1, truncatedVarianceH1, parametersH1, std::move(changePoint)}; } } return {}; } CTimeSeriesTestForChange::SChangePoint CTimeSeriesTestForChange::timeShift(double varianceH0, double truncatedVarianceH0, double parametersH0) const { // Test for time shifts of the base predictor. We use a hypothesis test // against a null hypothesis that there is a quadratic trend. auto predictor = this->bucketPredictor(); TMeanVarAccumulator valueMoments; for (std::size_t i = 0; i < m_Values.size(); ++i) { if (common::CBasicStatistics::count(m_Values[i]) > 0.0) { valueMoments.add(common::CBasicStatistics::mean(m_Values[i])); } } // See MIN_VARIANCE_FOR_SIGNIFICANCE for more info on this condition. if (common::CBasicStatistics::variance(valueMoments) < MIN_VARIANCE_FOR_SIGNIFICANCE * m_PredictionVariance) { return {}; } TSegmentation::TTimeVec candidateShifts; for (core_t::TTime shift = -6 * HOUR; shift < 0; shift += HALF_HOUR) { candidateShifts.push_back(shift); } for (core_t::TTime shift = HALF_HOUR; shift <= 6 * HOUR; shift += HALF_HOUR) { candidateShifts.push_back(shift); } TSegmentation::TTimeVec shifts; TSizeVec segments{TSegmentation::piecewiseTimeShifted( m_Values, m_BucketLength, candidateShifts, predictor, m_SignificantPValue, 3, &shifts)}; core_t::TTime shift{shifts.back()}; TFloatMeanAccumulatorVec residuals; std::size_t changeIndex{0}; double n{static_cast<double>(CSignal::countNotMissing(m_ValuesMinusPredictions))}; double parametersH1{static_cast<double>(segments.size() - 1)}; double varianceH1{std::numeric_limits<double>::max()}; double truncatedVarianceH1{std::numeric_limits<double>::max()}; if (segments.size() > 2) { auto shiftedPredictor = [&](std::size_t i) { return predictor(m_BucketLength * static_cast<core_t::TTime>(i) + TSegmentation::shiftAt(i, segments, shifts)); }; changeIndex = segments[segments.size() - 2]; residuals = removePredictions(shiftedPredictor, m_Values); std::tie(varianceH1, truncatedVarianceH1) = this->variances(residuals); } else if (shift != 0) { // The entire window is shifted. This can happen if the time series // is flat in parts. auto shiftedPredictor = [&, bucketPredictor = this->bucketPredictor() ](std::size_t i) { return bucketPredictor(shift + m_BucketLength * static_cast<core_t::TTime>(i)); }; residuals = removePredictions(shiftedPredictor, m_Values); std::tie(varianceH1, truncatedVarianceH1) = this->variances(residuals); } double pValue{this->pValue(varianceH0, truncatedVarianceH0, parametersH0, varianceH1, truncatedVarianceH1, parametersH1, n)}; LOG_TRACE(<< "shift segments = " << segments); LOG_TRACE(<< "shifts = " << shifts); LOG_TRACE(<< "variance = " << varianceH1 << ", truncated variance = " << truncatedVarianceH1 << ", sample variance = " << m_SampleVariance); LOG_TRACE(<< "change index = " << changeIndex); LOG_TRACE(<< "time shift p-value = " << pValue); if (pValue < m_AcceptedFalsePostiveRate) { auto changePoint = std::make_unique<CTimeShift>( this->changeTime(changeIndex), shift, std::move(residuals), m_SignificantPValue, m_PredictionVariance); return {varianceH1, truncatedVarianceH1, parametersH1, std::move(changePoint)}; } return {}; } CTimeSeriesTestForChange::TBucketIndexPredictor CTimeSeriesTestForChange::bucketIndexPredictor() const { auto bucketPredictor = this->bucketPredictor(); return [ predictor = std::move(bucketPredictor), this ](std::size_t i) { return predictor(m_BucketLength * static_cast<core_t::TTime>(i)); }; } CTimeSeriesTestForChange::TPredictor CTimeSeriesTestForChange::bucketPredictor() const { return CSignal::bucketPredictor(m_Predictor, m_BucketsStartTime, m_BucketLength, m_ValuesStartTime - m_BucketsStartTime, m_SampleInterval); } CTimeSeriesTestForChange::TMeanVarAccumulator CTimeSeriesTestForChange::truncatedMoments(double outlierFraction, const TFloatMeanAccumulatorVec& residuals, const TTransform& transform) const { double cutoff{std::numeric_limits<double>::max()}; std::size_t count{CSignal::countNotMissing(residuals)}; std::size_t numberOutliers{ static_cast<std::size_t>(outlierFraction * static_cast<double>(count) + 0.5)}; if (numberOutliers > 0) { m_Outliers.clear(); m_Outliers.resize(numberOutliers); for (const auto& value : residuals) { if (common::CBasicStatistics::count(value) > 0.0) { m_Outliers.add(std::fabs(transform(value))); } } cutoff = m_Outliers.biggest(); count -= m_Outliers.count(); } LOG_TRACE(<< "cutoff = " << cutoff << ", count = " << count); TMeanVarAccumulator moments; for (const auto& value : residuals) { if (common::CBasicStatistics::count(value) > 0.0 && std::fabs(transform(value)) < cutoff) { moments.add(transform(value)); } } if (numberOutliers > 0) { moments.add(cutoff, static_cast<double>(count) - common::CBasicStatistics::count(moments)); } common::CBasicStatistics::moment<1>(moments) += m_EpsVariance; return moments; } core_t::TTime CTimeSeriesTestForChange::changeTime(std::size_t changeIndex) const { return m_ValuesStartTime + m_BucketLength * static_cast<core_t::TTime>(changeIndex); } CTimeSeriesTestForChange::TDoubleDoublePr CTimeSeriesTestForChange::variances(const TFloatMeanAccumulatorVec& residuals) const { return {common::CBasicStatistics::maximumLikelihoodVariance( this->truncatedMoments(0.0 /*all residuals*/, residuals)), common::CBasicStatistics::maximumLikelihoodVariance( this->truncatedMoments(m_OutlierFraction, residuals))}; } double CTimeSeriesTestForChange::pValue(double varianceH0, double truncatedVarianceH0, double parametersH0, double varianceH1, double truncatedVarianceH1, double parametersH1, double n) const { return std::min(this->pValue(varianceH0, parametersH0, // H0 varianceH1, parametersH1, // H1 n), // # values this->pValue(truncatedVarianceH0, parametersH0, // H0 truncatedVarianceH1, parametersH1, // H1 (1.0 - m_OutlierFraction) * n)); // # values } double CTimeSeriesTestForChange::pValue(double varianceH0, double parametersH0, double varianceH1, double parametersH1, double n) const { return common::CStatisticalTests::rightTailFTest( varianceH0 + m_SampleVariance, varianceH1 + m_SampleVariance, n - parametersH0, n - parametersH1); } double CTimeSeriesTestForChange::aic(const SChangePoint& change) const { using TVector1x1 = common::CVectorNx1<double, 1>; auto akaike = [&](const TMeanVarAccumulator& moments) { common::CSphericalGaussianInfoCriterion<TVector1x1, common::E_AICc> result; result.add(common::CBasicStatistics::momentsAccumulator( common::CBasicStatistics::count(moments), TVector1x1{common::CBasicStatistics::mean(moments)}, TVector1x1{common::CBasicStatistics::maximumLikelihoodVariance(moments)})); return result.calculate(change.s_NumberParameters); }; TMeanVarAccumulator moments{ this->truncatedMoments(0.0, change.s_ChangePoint->residuals())}; TMeanVarAccumulator truncatedMoments{this->truncatedMoments( m_OutlierFraction, change.s_ChangePoint->residuals())}; return akaike(moments) + akaike(truncatedMoments); } CTimeSeriesTestForChange::TFloatMeanAccumulatorVec CTimeSeriesTestForChange::removePredictions(const TBucketIndexPredictor& predictor, TFloatMeanAccumulatorVec values) { for (std::size_t i = 0; i < values.size(); ++i) { if (common::CBasicStatistics::count(values[i]) > 0.0) { common::CBasicStatistics::moment<0>(values[i]) -= predictor(i); } } return values; } std::size_t CTimeSeriesTestForChange::buckets(core_t::TTime bucketLength, core_t::TTime interval) { return static_cast<std::size_t>((interval + bucketLength / 2) / bucketLength); } } } }