lib/maths/time_series/CTimeSeriesDecompositionDetail.cc (2,576 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <maths/time_series/CTimeSeriesDecompositionDetail.h> #include <core/CIEEE754.h> #include <core/CLogger.h> #include <core/CMemoryDef.h> #include <core/CPersistUtils.h> #include <core/CStatePersistInserter.h> #include <core/CStateRestoreTraverser.h> #include <core/CTimeUtils.h> #include <core/CTimezone.h> #include <core/Constants.h> #include <core/RestoreMacros.h> #include <maths/common/CBasicStatistics.h> #include <maths/common/CBasicStatisticsPersist.h> #include <maths/common/CChecksum.h> #include <maths/common/CIntegerTools.h> #include <maths/common/CLeastSquaresOnlineRegressionDetail.h> #include <maths/common/CLinearAlgebra.h> #include <maths/common/CLinearAlgebraPersist.h> #include <maths/common/COrderings.h> #include <maths/common/COrderingsSimultaneousSort.h> #include <maths/common/CSampling.h> #include <maths/common/CSetTools.h> #include <maths/common/CStatisticalTests.h> #include <maths/common/CTools.h> #include <maths/common/Constants.h> #include <maths/time_series/CCalendarComponent.h> #include <maths/time_series/CExpandingWindow.h> #include <maths/time_series/CSeasonalComponentAdaptiveBucketing.h> #include <maths/time_series/CSeasonalTime.h> #include <maths/time_series/CTimeSeriesDecomposition.h> #include <maths/time_series/CTimeSeriesSegmentation.h> #include <maths/time_series/CTimeSeriesTestForChange.h> #include <maths/time_series/CTimeSeriesTestForSeasonality.h> #include <boost/container/flat_map.hpp> #include <boost/container/flat_set.hpp> #include <boost/unordered_map.hpp> #include <algorithm> #include <cmath> #include <cstddef> #include <map> #include <numeric> #include <string> #include <utility> #include <vector> namespace ml { namespace maths { namespace time_series { namespace { using TSeasonalComponentVec = maths_t::TSeasonalComponentVec; using TCalendarComponentVec = maths_t::TCalendarComponentVec; using TBoolVec = std::vector<bool>; using TDoubleVec = std::vector<double>; using TSizeVec = std::vector<std::size_t>; using TSizeVecVec = std::vector<TSizeVec>; using TSizeSizeMap = std::map<std::size_t, std::size_t>; using TStrVec = std::vector<std::string>; using TTimeVec = std::vector<core_t::TTime>; using TTimeTimePr = std::pair<core_t::TTime, core_t::TTime>; using TTimeTimePrVec = std::vector<TTimeTimePr>; using TTimeTimePrDoubleFMap = boost::container::flat_map<TTimeTimePr, double>; using TTimeTimePrSizeFMap = boost::container::flat_map<TTimeTimePr, std::size_t>; using TMeanAccumulator = common::CBasicStatistics::SSampleMean<double>::TAccumulator; using TFloatMeanAccumulator = common::CBasicStatistics::SSampleMean<common::CFloatStorage>::TAccumulator; using TFloatMeanAccumulatorVec = std::vector<TFloatMeanAccumulator>; using TSeasonalComponentPtrVec = std::vector<CSeasonalComponent*>; using TCalendarComponentPtrVec = std::vector<CCalendarComponent*>; const core_t::TTime DAY{core::constants::DAY}; const core_t::TTime WEEK{core::constants::WEEK}; const core_t::TTime MONTH{4 * WEEK}; const std::ptrdiff_t MAXIMUM_COMPONENTS{8}; const TSeasonalComponentVec NO_SEASONAL_COMPONENTS; const TCalendarComponentVec NO_CALENDAR_COMPONENTS; //! We scale the time used for the regression model to improve //! the condition of the design matrix. double scaleTime(core_t::TTime time, core_t::TTime origin) { return static_cast<double>(time - origin) / static_cast<double>(WEEK); } //! Get the aging factor to apply for \p dt elapsed time. double ageFactor(double decayRate, core_t::TTime dt, core_t::TTime scale = DAY) { return std::exp(-decayRate * static_cast<double>(dt) / static_cast<double>(scale)); } //! Compute the mean of \p mean of \p components. template<typename MEAN_FUNCTION> double meanOf(MEAN_FUNCTION mean, const TSeasonalComponentVec& components) { // We can choose to partition the trend model into windows. // In particular, we check for the presence of weekday/end // patterns. In this function we want to compute the sum of // the mean average of the different components: we use an // additive decomposition of the trend. However, if we have // detected a partition we want to average the models for // the different windows. double unwindowed{0.0}; TTimeTimePrDoubleFMap windows; windows.reserve(components.size()); for (const auto& component : components) { if (component.initialized()) { TTimeTimePr window{component.time().window()}; if (window.second - window.first == component.time().windowRepeat()) { unwindowed += (component.*mean)(); } else { windows[window] += (component.*mean)(); } } } TMeanAccumulator windowed; for (const auto& window : windows) { double weight{static_cast<double>(window.first.second - window.first.first)}; windowed.add(window.second, weight); } return unwindowed + common::CBasicStatistics::mean(windowed); } //! Compute the values to add to the trend and each component. //! //! \param[in] trend The long term trend prediction. //! \param[in] seasonal The seasonal components. //! \param[in] calendar The calendar components. //! \param[in] time The time of value to decompose. //! \param[in] deltas The delta offset to apply to the difference //! between each component value and its mean, used to minimize //! slope in the longer periods. //! \param[in,out] decomposition Updated to contain the value to //! add to each by component. //! \param[out] predictions Filled in with the component predictions. //! \param[out] referenceError Filled in with the error w.r.t. the trend. //! \param[out] error Filled in with the prediction error. //! \param[out] scale Filled in with the normalization scaling. void decompose(double trend, const TSeasonalComponentPtrVec& seasonal, const TCalendarComponentPtrVec& calendar, const core_t::TTime time, const TDoubleVec& deltas, double gain, TDoubleVec& decomposition, TDoubleVec& predictions, double& referenceError, double& error, double& scale) { std::size_t m{seasonal.size()}; std::size_t n{calendar.size()}; double x0{trend}; TDoubleVec x(m + n); double xhat{x0}; for (std::size_t i = 0; i < m; ++i) { x[i] = seasonal[i]->value(time, 0.0).mean(); xhat += x[i]; } for (std::size_t i = m; i < m + n; ++i) { x[i] = calendar[i - m]->value(time, 0.0).mean(); xhat += x[i]; } // Note we are adding on the a proportion of the error to the // target value for each component. This constant controls the // proportion of the overall error we add. There is no need // to arrange for the sum error added to all components to be // equal to the actual error to avoid bias: noise will still // average down to zero (since the errors will be both positive // and negative). It will however affect the variance in the // limit the trend has been fit. This can be thought of as a // trade off between the rate at which each component reacts // to errors verses the error variance in the steady state with // smaller values of Z corresponding to greater responsiveness. double Z{std::max(static_cast<double>(m + n + 1) / gain, 1.0)}; error = decomposition[0] - xhat; referenceError = decomposition[0] - x0; decomposition[0] = x0 + (decomposition[0] - xhat) / Z; for (std::size_t i = 0; i < m; ++i) { predictions[i] = x[i] - seasonal[i]->meanValue(); decomposition[i + 1] = x[i] + (decomposition[i + 1] - xhat) / Z + deltas[i]; } for (std::size_t i = m; i < m + n; ++i) { predictions[i] = x[i] - calendar[i - m]->meanValue(); decomposition[i + 1] = x[i] + (decomposition[i + 1] - xhat) / Z; } // Because we add in more than the prediction error across the // different components, i.e. because Z < m + n + 1, we end up // with a bias in our variance estimates. We can mostly correct // the bias by scaling the variance estimate, but need to calculate // the scale. scale = Z / static_cast<double>(m + n + 1); } //! Propagate \p target forwards to account for \p end - \p start elapsed //! time in steps or size \p step. template<typename F> void stepwisePropagateForwards(core_t::TTime start, core_t::TTime end, core_t::TTime step, const F& propagateForwardsByTime) { start = common::CIntegerTools::floor(start, step); end = common::CIntegerTools::floor(end, step); if (end > start) { double time{static_cast<double>(end - start) / static_cast<double>(step)}; propagateForwardsByTime(time); } } //! Add on mean zero \p variance normally distributed noise to \p values. void addMeanZeroNormalNoise(double variance, TFloatMeanAccumulatorVec& values) { if (variance > 0.0) { common::CPRNG::CXorOShiro128Plus rng; for (auto& value : values) { common::CBasicStatistics::moment<0>(value) += common::CSampling::normalSample(rng, 0.0, variance); } } } // Change Detector Test State Machine // States const std::size_t CD_TEST = 0; const std::size_t CD_NOT_TESTING = 1; const std::size_t CD_ERROR = 2; const TStrVec CD_STATES{"TEST", "NOT_TESTING", "ERROR"}; // Alphabet const std::size_t CD_DISABLE = 0; const std::size_t CD_RESET = 1; const TStrVec CD_ALPHABET{"DISABLE", "RESET"}; // Transition Function const TSizeVecVec CD_TRANSITION_FUNCTION{{CD_NOT_TESTING, CD_NOT_TESTING, CD_ERROR}, {CD_TEST, CD_NOT_TESTING, CD_TEST}}; // Seasonality Test State Machine // States const std::size_t PT_INITIAL = 0; const std::size_t PT_TEST = 1; const std::size_t PT_NOT_TESTING = 2; const std::size_t PT_ERROR = 3; const TStrVec PT_STATES{"INITIAL", "TEST", "NOT_TESTING", "ERROR"}; // Alphabet const std::size_t PT_NEW_VALUE = 0; const std::size_t PT_RESET = 1; const TStrVec PT_ALPHABET{"NEW_VALUE", "RESET"}; // Transition Function const TSizeVecVec PT_TRANSITION_FUNCTION{{PT_TEST, PT_TEST, PT_NOT_TESTING, PT_ERROR}, {PT_INITIAL, PT_INITIAL, PT_NOT_TESTING, PT_INITIAL}}; // Calendar Cyclic Test State Machine // States const std::size_t CC_INITIAL = 0; const std::size_t CC_TEST = 1; const std::size_t CC_NOT_TESTING = 2; const std::size_t CC_ERROR = 3; const TStrVec CC_STATES{"INITIAL", "TEST", "NOT_TESTING", "ERROR"}; // Alphabet const std::size_t CC_NEW_VALUE = 0; const std::size_t CC_RESET = 1; const TStrVec CC_ALPHABET{"NEW_VALUE", "RESET"}; // Transition Function const TSizeVecVec CC_TRANSITION_FUNCTION{ TSizeVec{CC_TEST, CC_TEST, CC_NOT_TESTING, CC_ERROR}, TSizeVec{CC_INITIAL, CC_INITIAL, CC_NOT_TESTING, CC_INITIAL}}; // Components State Machine // States const std::size_t SC_NEW_COMPONENTS = 0; const std::size_t SC_NORMAL = 1; const std::size_t SC_DISABLED = 2; const std::size_t SC_ERROR = 3; const TStrVec SC_STATES{"NEW_COMPONENTS", "NORMAL", "DISABLED", "ERROR"}; // Alphabet const std::size_t SC_ADDED_COMPONENTS = 0; const std::size_t SC_INTERPOLATED = 1; const std::size_t SC_RESET = 2; const TStrVec SC_ALPHABET{"ADDED_COMPONENTS", "INTERPOLATED", "RESET"}; // Transition Function const TSizeVecVec SC_TRANSITION_FUNCTION{ TSizeVec{SC_NEW_COMPONENTS, SC_NEW_COMPONENTS, SC_DISABLED, SC_ERROR}, TSizeVec{SC_NORMAL, SC_NORMAL, SC_DISABLED, SC_ERROR}, TSizeVec{SC_NORMAL, SC_NORMAL, SC_NORMAL, SC_NORMAL}}; const std::string VERSION_6_3_TAG("6.3"); const std::string VERSION_6_4_TAG("6.4"); // Change Detector Test Tags // Version 7.11 const core::TPersistenceTag CHANGE_DETECTOR_TEST_MACHINE_7_11_TAG{"a", "change_detector_test_machine"}; const core::TPersistenceTag SLIDING_WINDOW_7_11_TAG{"b", "sliding_window"}; const core::TPersistenceTag MEAN_OFFSET_7_11_TAG{"c", "mean_offset"}; const core::TPersistenceTag RESIDUAL_MOMENTS_7_11_TAG{"d", "residual_moments"}; const core::TPersistenceTag LARGE_ERROR_FRACTION_7_11_TAG{"e", "large_error_fraction"}; const core::TPersistenceTag TOTAL_COUNT_WEIGHT_ADJUSTMENT_7_11_TAG{ "f", "total_count_weight_adjustment"}; const core::TPersistenceTag MINIMUM_TOTAL_COUNT_WEIGHT_ADJUSTMENT_7_11_TAG{ "g", "minimum_total_count_weight_adjustment"}; const core::TPersistenceTag LAST_TEST_TIME_7_11_TAG{"h", "last_test_time"}; const core::TPersistenceTag LAST_CHANGE_POINT_TIME_7_11_TAG{"i", "last_change_point_time"}; const core::TPersistenceTag LAST_CANDIDATE_CHANGE_POINT_TIME_7_11_TAG{ "j", "last_candidate_change_point_time"}; const core::TPersistenceTag LAST_CHANGE_POINT_7_11_TAG{"k", "last_change_point"}; // Version 8.3 const core::TPersistenceTag OUTLIER_WEIGHT_DERATE_8_3_TAG{"l", "winsorization_derate"}; // Seasonality Test Tags // Version 6.3 const core::TPersistenceTag SEASONALITY_TEST_MACHINE_6_3_TAG{"a", "periodicity_test_machine"}; // Version 7.9 const core::TPersistenceTag SHORT_WINDOW_7_9_TAG{"e", "short_window_7_9"}; const core::TPersistenceTag LONG_WINDOW_7_9_TAG{"f", "long_window_7_9"}; // Old versions can't be restored. // Calendar Cyclic Test Tags // Version 6.3 const core::TPersistenceTag CALENDAR_TEST_MACHINE_6_3_TAG{"a", "calendar_test_machine"}; const core::TPersistenceTag LAST_MONTH_6_3_TAG{"b", "last_month"}; const core::TPersistenceTag CALENDAR_TEST_6_3_TAG{"c", "calendar_test"}; // These work for all versions. // Components Tags // Version 6.3 const core::TPersistenceTag COMPONENTS_MACHINE_6_3_TAG{"a", "components_machine"}; const core::TPersistenceTag DECAY_RATE_6_3_TAG{"b", "decay_rate"}; const core::TPersistenceTag TREND_6_3_TAG{"c", "trend"}; const core::TPersistenceTag SEASONAL_6_3_TAG{"d", "seasonal"}; const core::TPersistenceTag CALENDAR_6_3_TAG{"e", "calendar"}; const core::TPersistenceTag COMPONENT_6_3_TAG{"f", "component"}; const core::TPersistenceTag MEAN_VARIANCE_SCALE_6_3_TAG{"h", "mean_variance_scale"}; const core::TPersistenceTag MOMENTS_6_3_TAG{"i", "moments"}; const core::TPersistenceTag MOMENTS_MINUS_TREND_6_3_TAG{"j", "moments_minus_trend"}; const core::TPersistenceTag USING_TREND_FOR_PREDICTION_6_3_TAG{"k", "using_trend_for_prediction"}; const core::TPersistenceTag GAIN_CONTROLLER_6_3_TAG{"l", "gain_controller"}; // Version 6.4 const core::TPersistenceTag COMPONENT_6_4_TAG{"f", "component"}; const core::TPersistenceTag ERRORS_6_4_TAG{"g", "errors"}; const core::TPersistenceTag REGRESSION_ORIGIN_6_4_TAG{"a", "regression_origin"}; const core::TPersistenceTag MEAN_SUM_AMPLITUDES_6_4_TAG{"b", "mean_sum_amplitudes"}; const core::TPersistenceTag MEAN_SUM_AMPLITUDES_TREND_6_4_TAG{"c", "mean_sum_amplitudes_trend"}; // This implements the mapping from restored states to their best // equivalents; specifically: // SC_NEW_COMPONENTS |-> SC_NEW_COMPONENTS // SC_NORMAL |-> SC_NORMAL // SC_FORECASTING |-> SC_NORMAL // SC_DISABLED |-> SC_DISABLED // SC_ERROR |-> SC_ERROR // Note that we don't try and restore the periodicity test state // (see CTimeSeriesDecomposition::acceptRestoreTraverser) and the // calendar test state is unchanged. const TSizeSizeMap SC_STATES_UPGRADING_TO_VERSION_6_3{{0, 0}, {1, 1}, {2, 1}, {3, 2}, {4, 3}}; //////////////////////////////////////////////////////////////////////// } //////// SMessage //////// CTimeSeriesDecompositionDetail::SMessage::SMessage(core_t::TTime time, core_t::TTime lastTime, const TMemoryCircuitBreaker& memoryCircuitBreaker) : s_Time{time}, s_LastTime{lastTime}, s_MemoryCircuitBreaker{memoryCircuitBreaker} { } //////// SAddValue //////// CTimeSeriesDecompositionDetail::SAddValue::SAddValue( core_t::TTime time, core_t::TTime lastTime, core_t::TTime timeShift, double value, const maths_t::TDoubleWeightsAry& weights, double occupancy, core_t::TTime firstValueTime, double trend, double seasonal, double calendar, CTimeSeriesDecomposition& decomposition, const TMakePredictor& makePredictor, const TMakeFilteredPredictor& makeSeasonalityTestPreconditioner, const TMakeTestForSeasonality& makeTestForSeasonality, const TMemoryCircuitBreaker& memoryCircuitBreaker) : SMessage{time, lastTime, memoryCircuitBreaker}, s_TimeShift{timeShift}, s_Value{value}, s_Weights{weights}, s_Occupancy{occupancy}, s_FirstValueTime{firstValueTime}, s_Trend{trend}, s_Seasonal{seasonal}, s_Calendar{calendar}, s_Decomposition{&decomposition}, s_MakePredictor{makePredictor}, s_MakeSeasonalityTestPreconditioner{makeSeasonalityTestPreconditioner}, s_MakeTestForSeasonality{makeTestForSeasonality} { } //////// SDetectedSeasonal //////// CTimeSeriesDecompositionDetail::SDetectedSeasonal::SDetectedSeasonal( core_t::TTime time, core_t::TTime lastTime, CSeasonalDecomposition components, const TMemoryCircuitBreaker& memoryCircuitBreaker) : SMessage{time, lastTime, memoryCircuitBreaker}, s_Components{std::move(components)} { } //////// SDetectedCalendar //////// CTimeSeriesDecompositionDetail::SDetectedCalendar::SDetectedCalendar( core_t::TTime time, core_t::TTime lastTime, CCalendarFeature feature, core_t::TTime timeZoneOffset, const TMemoryCircuitBreaker& memoryCircuitBreaker) : SMessage{time, lastTime, memoryCircuitBreaker}, s_Feature{feature}, s_TimeZoneOffset{timeZoneOffset} { } //////// SDetectedTrend //////// CTimeSeriesDecompositionDetail::SDetectedTrend::SDetectedTrend( const TPredictor& predictor, const TComponentChangeCallback& componentChangeCallback, const TMemoryCircuitBreaker& memoryCircuitBreaker) : SMessage{0, 0, memoryCircuitBreaker}, s_Predictor{predictor}, s_ComponentChangeCallback{componentChangeCallback} { } //////// SDetectedChangePoint //////// CTimeSeriesDecompositionDetail::SDetectedChangePoint::SDetectedChangePoint( core_t::TTime time, core_t::TTime lastTime, TChangePointUPtr change, const TMemoryCircuitBreaker& memoryCircuitBreaker) : SMessage{time, lastTime, memoryCircuitBreaker}, s_Change{std::move(change)} { } //////// CHandler //////// void CTimeSeriesDecompositionDetail::CHandler::handle(const SAddValue&) { } void CTimeSeriesDecompositionDetail::CHandler::handle(const SDetectedSeasonal&) { } void CTimeSeriesDecompositionDetail::CHandler::handle(const SDetectedCalendar&) { } void CTimeSeriesDecompositionDetail::CHandler::handle(const SDetectedTrend&) { } void CTimeSeriesDecompositionDetail::CHandler::handle(const SDetectedChangePoint&) { } void CTimeSeriesDecompositionDetail::CHandler::mediator(CMediator* mediator) { m_Mediator = mediator; } CTimeSeriesDecompositionDetail::CMediator* CTimeSeriesDecompositionDetail::CHandler::mediator() const { return m_Mediator; } //////// CMediator //////// template<typename M> void CTimeSeriesDecompositionDetail::CMediator::forward(const M& message) const { for (CHandler& handler : m_Handlers) { handler.handle(message); } } void CTimeSeriesDecompositionDetail::CMediator::registerHandler(CHandler& handler) { m_Handlers.push_back(std::ref(handler)); handler.mediator(this); } void CTimeSeriesDecompositionDetail::CMediator::debugMemoryUsage( const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CMediator"); core::memory_debug::dynamicSize("m_Handlers", m_Handlers, mem); } std::size_t CTimeSeriesDecompositionDetail::CMediator::memoryUsage() const { return core::memory::dynamicSize(m_Handlers); } //////// CChangePointTest //////// CTimeSeriesDecompositionDetail::CChangePointTest::CChangePointTest(double decayRate, core_t::TTime bucketLength) : m_Machine{core::CStateMachine::create(CD_ALPHABET, CD_STATES, CD_TRANSITION_FUNCTION, CD_TEST)}, m_DecayRate{decayRate}, m_BucketLength{bucketLength}, m_Window(this->windowSize(), TFloatMeanAccumulator{}), m_LastTestTime{std::numeric_limits<core_t::TTime>::min() / 2}, m_LastChangePointTime{std::numeric_limits<core_t::TTime>::min() / 2}, m_LastCandidateChangePointTime{std::numeric_limits<core_t::TTime>::min() / 2} { } CTimeSeriesDecompositionDetail::CChangePointTest::CChangePointTest(const CChangePointTest& other, bool isForForecast) : m_Machine{other.m_Machine}, m_DecayRate{other.m_DecayRate}, m_BucketLength{other.m_BucketLength}, m_Window{other.m_Window}, m_MeanOffset{other.m_MeanOffset}, m_ResidualMoments{other.m_ResidualMoments}, m_LargeErrorFraction{other.m_LargeErrorFraction}, m_TotalCountWeightAdjustment{other.m_TotalCountWeightAdjustment}, m_MinimumTotalCountWeightAdjustment{other.m_MinimumTotalCountWeightAdjustment}, m_LastTestTime{other.m_LastTestTime}, m_LastChangePointTime{other.m_LastChangePointTime}, m_LastCandidateChangePointTime{other.m_LastCandidateChangePointTime}, m_LastChangeOutlierWeightDerate{other.m_LastChangeOutlierWeightDerate} { if (isForForecast) { this->apply(CD_DISABLE); } else if (m_UndoableLastChange != nullptr) { m_UndoableLastChange = other.m_UndoableLastChange->undoable(); } } bool CTimeSeriesDecompositionDetail::CChangePointTest::acceptRestoreTraverser( core::CStateRestoreTraverser& traverser) { do { const std::string& name{traverser.name()}; RESTORE(CHANGE_DETECTOR_TEST_MACHINE_7_11_TAG, traverser.traverseSubLevel([this](core::CStateRestoreTraverser& traverser_) { return m_Machine.acceptRestoreTraverser(traverser_); })) RESTORE(SLIDING_WINDOW_7_11_TAG, core::CPersistUtils::restore(SLIDING_WINDOW_7_11_TAG, m_Window, traverser)) RESTORE(MEAN_OFFSET_7_11_TAG, m_MeanOffset.fromDelimited(traverser.value())) RESTORE(RESIDUAL_MOMENTS_7_11_TAG, m_ResidualMoments.fromDelimited(traverser.value())) RESTORE_BUILT_IN(LARGE_ERROR_FRACTION_7_11_TAG, m_LargeErrorFraction) RESTORE_BUILT_IN(TOTAL_COUNT_WEIGHT_ADJUSTMENT_7_11_TAG, m_TotalCountWeightAdjustment) RESTORE_BUILT_IN(MINIMUM_TOTAL_COUNT_WEIGHT_ADJUSTMENT_7_11_TAG, m_MinimumTotalCountWeightAdjustment) RESTORE_BUILT_IN(LAST_TEST_TIME_7_11_TAG, m_LastTestTime) RESTORE_BUILT_IN(LAST_CHANGE_POINT_TIME_7_11_TAG, m_LastChangePointTime) RESTORE_BUILT_IN(LAST_CANDIDATE_CHANGE_POINT_TIME_7_11_TAG, m_LastCandidateChangePointTime) RESTORE(LAST_CHANGE_POINT_7_11_TAG, traverser.traverseSubLevel([ this, serializer = CUndoableChangePointStateSerializer{} ](auto& traverser_) { return serializer(m_UndoableLastChange, traverser_); })) RESTORE(OUTLIER_WEIGHT_DERATE_8_3_TAG, traverser.traverseSubLevel([this](core::CStateRestoreTraverser& traverser_) { return m_LastChangeOutlierWeightDerate.acceptRestoreTraverser(traverser_); })) } while (traverser.next()); return true; } void CTimeSeriesDecompositionDetail::CChangePointTest::acceptPersistInserter( core::CStatePersistInserter& inserter) const { inserter.insertLevel(CHANGE_DETECTOR_TEST_MACHINE_7_11_TAG, [this](auto& inserter_) { return m_Machine.acceptPersistInserter(inserter_); }); core::CPersistUtils::persist(SLIDING_WINDOW_7_11_TAG, m_Window, inserter); inserter.insertValue(MEAN_OFFSET_7_11_TAG, m_MeanOffset.toDelimited()); inserter.insertValue(RESIDUAL_MOMENTS_7_11_TAG, m_ResidualMoments.toDelimited()); inserter.insertValue(LARGE_ERROR_FRACTION_7_11_TAG, m_LargeErrorFraction, core::CIEEE754::E_DoublePrecision); inserter.insertValue(TOTAL_COUNT_WEIGHT_ADJUSTMENT_7_11_TAG, m_TotalCountWeightAdjustment, core::CIEEE754::E_DoublePrecision); inserter.insertValue(MINIMUM_TOTAL_COUNT_WEIGHT_ADJUSTMENT_7_11_TAG, m_MinimumTotalCountWeightAdjustment, core::CIEEE754::E_DoublePrecision); inserter.insertValue(LAST_TEST_TIME_7_11_TAG, m_LastTestTime); inserter.insertValue(LAST_CHANGE_POINT_TIME_7_11_TAG, m_LastChangePointTime); inserter.insertValue(LAST_CANDIDATE_CHANGE_POINT_TIME_7_11_TAG, m_LastCandidateChangePointTime); if (m_UndoableLastChange != nullptr) { inserter.insertLevel(LAST_CHANGE_POINT_7_11_TAG, [ this, serializer = CUndoableChangePointStateSerializer{} ](auto& inserter_) { serializer(*m_UndoableLastChange, inserter_); }); } inserter.insertLevel(OUTLIER_WEIGHT_DERATE_8_3_TAG, [this](auto& inserter_) { return m_LastChangeOutlierWeightDerate.acceptPersistInserter(inserter_); }); } void CTimeSeriesDecompositionDetail::CChangePointTest::swap(CChangePointTest& other) { std::swap(m_Machine, other.m_Machine); std::swap(m_DecayRate, other.m_DecayRate); std::swap(m_BucketLength, other.m_BucketLength); m_Window.swap(other.m_Window); std::swap(m_MeanOffset, other.m_MeanOffset); std::swap(m_ResidualMoments, other.m_ResidualMoments); std::swap(m_LargeErrorFraction, other.m_LargeErrorFraction); std::swap(m_TotalCountWeightAdjustment, other.m_TotalCountWeightAdjustment); std::swap(m_MinimumTotalCountWeightAdjustment, other.m_MinimumTotalCountWeightAdjustment); std::swap(m_LastTestTime, other.m_LastTestTime); std::swap(m_LastChangePointTime, other.m_LastChangePointTime); std::swap(m_LastCandidateChangePointTime, other.m_LastCandidateChangePointTime); std::swap(m_UndoableLastChange, other.m_UndoableLastChange); std::swap(m_LastChangeOutlierWeightDerate, other.m_LastChangeOutlierWeightDerate); } void CTimeSeriesDecompositionDetail::CChangePointTest::handle(const SAddValue& message) { core_t::TTime lastTime{message.s_LastTime}; core_t::TTime time{message.s_Time}; double value{message.s_Value}; double prediction{message.s_Trend + message.s_Seasonal + message.s_Calendar}; // We have explicit handling of outliers in CTimeSeriesTestForChange. double weight{maths_t::count(message.s_Weights)}; double weightForResidualMoments{maths_t::countForUpdate(message.s_Weights)}; std::size_t steps{ std::min(static_cast<std::size_t>((this->startOfWindowBucket(time) - this->startOfWindowBucket(lastTime)) / this->windowBucketLength()), m_Window.size())}; switch (m_Machine.state()) { case CD_TEST: for (std::size_t i = 0; i < steps; ++i) { m_Window.push_back(TFloatMeanAccumulator{}); } m_Window.back().add(value, weight); m_MeanOffset.add(static_cast<double>(time % m_BucketLength), weight); m_ResidualMoments.add(value - prediction, weightForResidualMoments); this->updateTotalCountWeights(message); this->testForCandidateChange(message); this->testUndoLastChange(message); this->testForChange(message); break; case CD_NOT_TESTING: break; default: LOG_ERROR(<< "Test in a bad state: " << m_Machine.state()); this->apply(CD_RESET); break; } } void CTimeSeriesDecompositionDetail::CChangePointTest::handle(const SDetectedSeasonal& message) { this->reset(message.s_Time); } void CTimeSeriesDecompositionDetail::CChangePointTest::reset(core_t::TTime time) { if (m_Window.empty() == false) { m_Window.assign(m_Window.size(), TFloatMeanAccumulator{}); } m_ResidualMoments = TMeanVarAccumulator{}; m_LargeErrorFraction = 0.0; m_TotalCountWeightAdjustment = 0.0; m_MinimumTotalCountWeightAdjustment = 0.0; m_LastCandidateChangePointTime = time - 4 * this->maximumIntervalToDetectChange(1.0); } double CTimeSeriesDecompositionDetail::CChangePointTest::countWeight(core_t::TTime) const { // We shape the count weight we apply initially using a small weight after // detecting a candidate change before switching to a large weight after // accepting a change or waiting maximumIntervalToDetectChange. We arrange // for the integral of the adjusted weight over time to be one. if (m_TotalCountWeightAdjustment > m_MinimumTotalCountWeightAdjustment && m_LargeErrorFraction > 0.25) { return CHANGE_COUNT_WEIGHT; } return 1.0 + std::min(1.0, -m_TotalCountWeightAdjustment); } double CTimeSeriesDecompositionDetail::CChangePointTest::outlierWeightDerate(core_t::TTime time, double error) const { return std::max(1.0 - static_cast<double>(time - m_LastChangePointTime) / static_cast<double>(3 * DAY), 0.0) * m_LastChangeOutlierWeightDerate.value(error); } void CTimeSeriesDecompositionDetail::CChangePointTest::propagateForwards(core_t::TTime start, core_t::TTime end) { stepwisePropagateForwards(start, end, DAY, [this](double time) { m_ResidualMoments.age(std::exp(-m_DecayRate * time / 8.0)); }); } std::uint64_t CTimeSeriesDecompositionDetail::CChangePointTest::checksum(std::uint64_t seed) const { seed = common::CChecksum::calculate(seed, m_Machine); seed = common::CChecksum::calculate(seed, m_DecayRate); seed = common::CChecksum::calculate(seed, m_BucketLength); seed = common::CChecksum::calculate(seed, m_Window); seed = common::CChecksum::calculate(seed, m_MeanOffset); seed = common::CChecksum::calculate(seed, m_ResidualMoments); seed = common::CChecksum::calculate(seed, m_LargeErrorFraction); seed = common::CChecksum::calculate(seed, m_TotalCountWeightAdjustment); seed = common::CChecksum::calculate(seed, m_MinimumTotalCountWeightAdjustment); seed = common::CChecksum::calculate(seed, m_LastTestTime); seed = common::CChecksum::calculate(seed, m_LastChangePointTime); seed = common::CChecksum::calculate(seed, m_LastCandidateChangePointTime); seed = common::CChecksum::calculate(seed, m_UndoableLastChange); return common::CChecksum::calculate(seed, m_LastChangeOutlierWeightDerate); } void CTimeSeriesDecompositionDetail::CChangePointTest::debugMemoryUsage( const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CChangePointTest"); core::memory_debug::dynamicSize("m_Window", m_Window, mem); core::memory_debug::dynamicSize("m_UndoableLastChange", m_UndoableLastChange, mem); } std::size_t CTimeSeriesDecompositionDetail::CChangePointTest::memoryUsage() const { return core::memory::dynamicSize(m_Window) + core::memory::dynamicSize(m_UndoableLastChange); } void CTimeSeriesDecompositionDetail::CChangePointTest::apply(std::size_t symbol) { std::size_t old{m_Machine.state()}; m_Machine.apply(symbol); std::size_t state{m_Machine.state()}; if (state != old) { LOG_TRACE(<< CD_STATES[old] << "," << CD_ALPHABET[symbol] << " -> " << CD_STATES[state]); switch (state) { case CD_TEST: m_Window = TFloatMeanAccumulatorCBuf(this->windowSize(), TFloatMeanAccumulator{}); m_MeanOffset = TFloatMeanAccumulator{}; m_LargeErrorFraction = 0.0; break; case CD_NOT_TESTING: m_Window = TFloatMeanAccumulatorCBuf{}; m_MeanOffset = TFloatMeanAccumulator{}; m_LargeErrorFraction = 0.0; break; default: LOG_ERROR(<< "Test in a bad state: " << state); this->apply(CD_RESET); break; } } } void CTimeSeriesDecompositionDetail::CChangePointTest::updateTotalCountWeights(const SAddValue& message) { core_t::TTime lastTime{message.s_LastTime}; core_t::TTime time{message.s_Time}; double occupancy{message.s_Occupancy}; m_TotalCountWeightAdjustment += static_cast<double>(time - lastTime) / static_cast<double>(m_BucketLength) * (this->countWeight(time) - 1.0); m_TotalCountWeightAdjustment = std::min(m_TotalCountWeightAdjustment, 0.0); if (m_TotalCountWeightAdjustment == 0.0) { m_MinimumTotalCountWeightAdjustment = (CHANGE_COUNT_WEIGHT - 1.0) * static_cast<double>(this->maximumIntervalToDetectChange(occupancy)) / static_cast<double>(m_BucketLength); } if (m_TotalCountWeightAdjustment < m_MinimumTotalCountWeightAdjustment) { m_MinimumTotalCountWeightAdjustment = 0.0; } } void CTimeSeriesDecompositionDetail::CChangePointTest::testForCandidateChange(const SAddValue& message) { core_t::TTime firstValueTime{message.s_FirstValueTime}; core_t::TTime time{message.s_Time}; // We're prone to detect changes at model startup before, for example, we // detect and model seasonality. Since the most common seasonality in the // data we model is daily, this delays detecting changes until we've had // the chance to see several repeats. if (time < firstValueTime + 3 * DAY) { return; } double occupancy{message.s_Occupancy}; double value{message.s_Value}; double prediction{message.s_Trend + message.s_Seasonal + message.s_Calendar}; double error{std::fabs(value - prediction)}; double beta{static_cast<double>(m_BucketLength) / (4.0 * static_cast<double>(this->windowBucketLength()))}; double alpha{1.0 - beta}; bool mayHaveChangedBefore{this->mayHaveChanged()}; m_LargeErrorFraction = alpha * m_LargeErrorFraction + beta * (error > this->largeError() ? 1.0 : 0.0); if (this->mayHaveChanged() && mayHaveChangedBefore == false && time > m_LastCandidateChangePointTime + 2 * this->maximumIntervalToDetectChange(occupancy)) { m_LastCandidateChangePointTime = time; } LOG_TRACE(<< "large error fraction = " << m_LargeErrorFraction << ", error = " << error << ", large error = " << this->largeError()); } void CTimeSeriesDecompositionDetail::CChangePointTest::testForChange(const SAddValue& message) { core_t::TTime time{message.s_Time}; double occupancy{message.s_Occupancy}; if (this->shouldTest(time, occupancy) == false) { return; } core_t::TTime lastTime{message.s_LastTime}; core_t::TTime timeShift{message.s_TimeShift}; bool seasonal{message.s_Decomposition->seasonalComponents().empty() == false}; const auto& makePredictor = message.s_MakePredictor; CTimeSeriesDecomposition& decomposition{*message.s_Decomposition}; auto begin = std::find_if(m_Window.begin(), m_Window.end(), [](const auto& bucket) { return common::CBasicStatistics::count(bucket) > 0.0; }); std::ptrdiff_t length{std::distance(begin, m_Window.end())}; if (this->windowBucketLength() * length <= this->minimumChangeLength(occupancy)) { return; } LOG_TRACE(<< "Testing for change at " << time); int testFor{seasonal ? CTimeSeriesTestForChange::E_All : CTimeSeriesTestForChange::E_LevelShift}; TPredictor predictor{makePredictor()}; core_t::TTime bucketsStartTime{this->bucketsStartTime(time, length)}; core_t::TTime valuesStartTime{this->valuesStartTime(bucketsStartTime)}; TFloatMeanAccumulatorVec values{begin, m_Window.end()}; LOG_TRACE(<< "buckets start time = " << bucketsStartTime << ", values start time = " << valuesStartTime << ", last candidate time = " << m_LastCandidateChangePointTime); CTimeSeriesTestForChange changeTest( testFor, valuesStartTime - timeShift, bucketsStartTime - timeShift, this->windowBucketLength(), m_BucketLength, predictor, std::move(values), 0.0, CTimeSeriesTestForChange::OUTLIER_FRACTION * occupancy); auto change = changeTest.test(); m_LastTestTime = time; if (change != nullptr && // did we detect a change at all change->largeEnough(this->largeError()) && change->longEnough(time, this->minimumChangeLength(occupancy))) { addMeanZeroNormalNoise(common::CBasicStatistics::variance(m_ResidualMoments), change->residuals()); change->apply(decomposition); m_LargeErrorFraction = 0.0; m_LastChangePointTime = time; m_LastCandidateChangePointTime = std::min(m_LastCandidateChangePointTime, time - this->maximumIntervalToDetectChange(occupancy)); m_UndoableLastChange = change->undoable(); m_LastChangeOutlierWeightDerate = change->outlierWeightDerate(bucketsStartTime, time, predictor); this->mediator()->forward(SDetectedChangePoint{ time, lastTime, std::move(change), message.s_MemoryCircuitBreaker}); } else if (change != nullptr) { m_LastCandidateChangePointTime = change->time(); } LOG_TRACE(<< (change != nullptr ? "maybe " + change->print() : "no change")); } void CTimeSeriesDecompositionDetail::CChangePointTest::testUndoLastChange(const SAddValue& message) { if (m_UndoableLastChange == nullptr) { return; } core_t::TTime time{message.s_Time}; core_t::TTime timeShift{message.s_TimeShift}; core_t::TTime lastTime{message.s_LastTime}; double occupancy{message.s_Occupancy}; double value{message.s_Value}; double weight{maths_t::count(message.s_Weights)}; const auto& makePredictor = message.s_MakePredictor; CTimeSeriesDecomposition& decomposition{*message.s_Decomposition}; m_UndoableLastChange->add(time - timeShift, lastTime - timeShift, value, weight, makePredictor()); if (time - m_LastChangePointTime > this->minimumChangeLength(occupancy) / 10 && m_UndoableLastChange->shouldUndo()) { m_UndoableLastChange->apply(decomposition); this->mediator()->forward(SDetectedChangePoint{ time, lastTime, std::move(m_UndoableLastChange), message.s_MemoryCircuitBreaker}); m_UndoableLastChange.reset(); return; } if (time - m_LastChangePointTime > this->maximumIntervalToDetectChange(occupancy)) { m_UndoableLastChange.reset(); } } bool CTimeSeriesDecompositionDetail::CChangePointTest::mayHaveChanged() const { return m_LargeErrorFraction > 0.5; } double CTimeSeriesDecompositionDetail::CChangePointTest::largeError() const { return 3.0 * std::sqrt(common::CBasicStatistics::variance(m_ResidualMoments)); } bool CTimeSeriesDecompositionDetail::CChangePointTest::shouldTest(core_t::TTime time, double occupancy) const { return (m_UndoableLastChange == nullptr) && ((time > m_LastTestTime + this->minimumChangeLength(occupancy)) || (time > m_LastTestTime + 3 * this->windowBucketLength() && time < m_LastCandidateChangePointTime + this->maximumIntervalToDetectChange(occupancy) && time > m_LastCandidateChangePointTime + this->minimumChangeLength(occupancy))); } core_t::TTime CTimeSeriesDecompositionDetail::CChangePointTest::minimumChangeLength(double occupancy) const { // Transient changes tend to last 1 day. In such cases we do not want to // apply any change and mearly ignore the interval. By waiting 30 hours // we give ourselves a margin to see the revert before we commit to making // a change. Note for sparse data we delay detecting changes because we're // more prone to FP in this case, since we get less information per unit // time. core_t::TTime length{ std::max(30 * core::constants::HOUR, 5 * this->windowBucketLength())}; length = static_cast<core_t::TTime>( std::min(1.0 / occupancy, 2.0) * static_cast<double>(length) + 0.5); return common::CIntegerTools::ceil(length, this->windowBucketLength()); } core_t::TTime CTimeSeriesDecompositionDetail::CChangePointTest::maximumIntervalToDetectChange(double occupancy) const { return 5 * this->minimumChangeLength(occupancy) / 3; } core_t::TTime CTimeSeriesDecompositionDetail::CChangePointTest::bucketsStartTime(core_t::TTime time, core_t::TTime bucketsLength) const { return this->startOfWindowBucket(time) - static_cast<core_t::TTime>(bucketsLength - 1) * this->windowBucketLength(); } core_t::TTime CTimeSeriesDecompositionDetail::CChangePointTest::valuesStartTime(core_t::TTime bucketsStartTime) const { core_t::TTime bucketEndTime{bucketsStartTime + this->windowBucketLength() - 1}; core_t::TTime firstSampleInBucket{ common::CIntegerTools::ceil(bucketsStartTime, m_BucketLength)}; core_t::TTime lastSampleInBucket{common::CIntegerTools::floor(bucketEndTime, m_BucketLength)}; return firstSampleInBucket + (lastSampleInBucket - firstSampleInBucket) / 2 + static_cast<core_t::TTime>(common::CBasicStatistics::mean(m_MeanOffset) + 0.5); } core_t::TTime CTimeSeriesDecompositionDetail::CChangePointTest::startOfWindowBucket(core_t::TTime time) const { return common::CIntegerTools::floor(time, this->windowBucketLength()); } core_t::TTime CTimeSeriesDecompositionDetail::CChangePointTest::windowLength() const { return static_cast<core_t::TTime>(m_Window.size()) * this->windowBucketLength(); } core_t::TTime CTimeSeriesDecompositionDetail::CChangePointTest::windowBucketLength() const { return std::max(MINIMUM_WINDOW_BUCKET_LENGTH, m_BucketLength); } std::size_t CTimeSeriesDecompositionDetail::CChangePointTest::windowSize() const { return std::max(static_cast<std::size_t>((4 * core::constants::DAY) / this->windowBucketLength()), std::size_t{32}); } //////// CSeasonalityTest //////// namespace { using TTimeTimeVecPrVec = std::vector<std::pair<core_t::TTime, TTimeVec>>; //! \brief Manages the choice of the tests' window parameters as a function //! of the job's bucket length. //! //! DESCRIPTION:\n //! The exact choice of window parameters is a tradeoff between the number //! of points used in the test and how quickly it finds periodic components. //! The fewer points the higher the chance of the false positives, but for //! long bucket lengths using many buckets means it takes a long time to //! find significant periodic components. class CSeasonalityTestParameters { public: static bool test(core_t::TTime bucketLength) { return bucketLength <= 604800; } static std::size_t numberBuckets(int window, core_t::TTime bucketLength) { const auto* params = windowParameters(window, bucketLength); return params != nullptr ? params->s_NumberBuckets : 0; } static core_t::TTime maxBucketLength(int window, core_t::TTime bucketLength) { const auto* bucketLengths_ = bucketLengths(window, bucketLength); return bucketLengths_ != nullptr ? bucketLengths_->back() : 0; } static const TTimeVec* bucketLengths(int window, core_t::TTime bucketLength) { const auto* params = windowParameters(window, bucketLength); return params != nullptr ? &params->s_BucketLengths : nullptr; } static const TTimeVec& testSchedule(int window, core_t::TTime bucketLength) { const auto* params = windowParameters(window, bucketLength); return params != nullptr ? params->s_TestSchedule : EMPTY_TEST_SCHEDULE; } static core_t::TTime shortestComponent(int window, core_t::TTime bucketLength) { const auto* params = windowParameters(window, bucketLength); return params != nullptr ? params->s_ShortestComponent : 0; } static std::size_t minimumResolutionToTestModelledComponent(int window, core_t::TTime bucketLength, bool shorterWindowAvailable) { const auto* params = windowParameters(window, bucketLength); return params != nullptr && shorterWindowAvailable ? params->s_MinimumResolution : 2; } private: struct SParameters { SParameters() = default; SParameters(core_t::TTime bucketLength, core_t::TTime shortestComponent, std::size_t numberBuckets, std::size_t minimumResolution, const std::initializer_list<core_t::TTime>& bucketLengths, const std::initializer_list<core_t::TTime>& testSchedule) : s_BucketLength{bucketLength}, s_ShortestComponent{shortestComponent}, s_NumberBuckets{numberBuckets}, s_MinimumResolution{minimumResolution}, s_BucketLengths{bucketLengths}, s_TestSchedule{testSchedule} {} bool operator<(core_t::TTime rhs) const { return s_BucketLength < rhs; } core_t::TTime s_BucketLength{0}; core_t::TTime s_ShortestComponent{0}; std::size_t s_NumberBuckets{0}; std::size_t s_MinimumResolution{0}; TTimeVec s_BucketLengths; TTimeVec s_TestSchedule; }; using TParametersVecVec = std::vector<std::vector<SParameters>>; using TTimeParametersUMap = boost::unordered_map<core_t::TTime, SParameters>; private: static const SParameters* windowParameters(int window, core_t::TTime bucketLength) { auto result = std::lower_bound(WINDOW_PARAMETERS[window].begin(), WINDOW_PARAMETERS[window].end(), bucketLength); return result != WINDOW_PARAMETERS[window].end() ? &(*result) : nullptr; } private: static const TParametersVecVec WINDOW_PARAMETERS; static const TTimeVec EMPTY_TEST_SCHEDULE; }; // These parameterise the windows used to test for periodic components. From // left to right the parameters are: // 1. The job bucket length, // 2. The minimum period seasonal component we'll accept testing on the window, // 3. The number buckets in the window, // 4. The bucket lengths we'll cycle through as we test progressively longer // windows, // 5. The times, in addition to "number buckets" * "window bucket lengths", // when we'll test for seasonal components. const CSeasonalityTestParameters::TParametersVecVec CSeasonalityTestParameters::WINDOW_PARAMETERS{ /* SHORT WINDOW */ {{1, 1, 180, 10, {1, 5, 10, 30, 60, 300, 600}, {}}, {5, 1, 180, 10, {5, 10, 30, 60, 300, 600}, {}}, {10, 1, 180, 10, {10, 30, 60, 300, 600}, {}}, {30, 1, 180, 10, {30, 60, 300, 600}, {}}, {60, 1, 336, 12, {60, 300, 900, 3600, 7200}, {3 * 604800}}, {300, 1, 336, 12, {300, 900, 3600, 7200}, {3 * 604800}}, {600, 1, 336, 12, {600, 3600, 7200}, {3 * 604800}}, {900, 1, 336, 12, {900, 3600, 7200}, {3 * 604800}}, {1200, 1, 336, 12, {1200, 3600, 7200}, {3 * 86400, 3 * 604800}}, {1800, 1, 336, 12, {1800, 3600, 7200}, {3 * 86400, 3 * 604800}}, {3600, 1, 336, 12, {3600, 7200}, {3 * 86400, 604800, 3 * 604800}}, {7200, 1, 336, 12, {7200, 14400}, {3 * 86400, 604800, 3 * 604800}}, {14400, 1, 336, 6, {14400}, {604800, 3 * 604800}}, {21600, 1, 224, 6, {21600}, {604800, 3 * 604800}}, {28800, 1, 168, 6, {28800}, {3 * 604800}}, {43200, 1, 112, 6, {43200}, {4 * 604800}}, {86400, 1, 56, 6, {86400}, {}}}, /* LONG WINDOW */ {{1, 30601, 336, 12, {900, 3600, 7200}, {3 * 604800}}, {5, 30601, 336, 12, {900, 3600, 7200}, {3 * 604800}}, {10, 30601, 336, 12, {900, 3600, 7200}, {3 * 604800}}, {30, 30601, 336, 12, {900, 3600, 7200}, {3 * 604800}}, {60, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}, {300, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}, {600, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}, {900, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}, {1200, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}, {1800, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}, {3600, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}, {7200, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}, {14400, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}, {86400, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}, {604800, 648001, 156, 6, {43200, 86400, 604800}, {104 * 604800}}}}; } const TTimeVec CSeasonalityTestParameters::EMPTY_TEST_SCHEDULE; CTimeSeriesDecompositionDetail::CSeasonalityTest::CSeasonalityTest(double decayRate, core_t::TTime bucketLength) : m_Machine{core::CStateMachine::create( PT_ALPHABET, PT_STATES, PT_TRANSITION_FUNCTION, CSeasonalityTestParameters::test(bucketLength) ? PT_INITIAL : PT_NOT_TESTING)}, m_DecayRate{decayRate}, m_BucketLength{bucketLength} { } CTimeSeriesDecompositionDetail::CSeasonalityTest::CSeasonalityTest(const CSeasonalityTest& other, bool isForForecast) : m_Machine{other.m_Machine}, m_DecayRate{other.m_DecayRate}, m_BucketLength{ other.m_BucketLength} { if (isForForecast == false) { for (auto i : {E_Short, E_Long}) { if (other.m_Windows[i] != nullptr) { m_Windows[i] = std::make_unique<CExpandingWindow>(*other.m_Windows[i]); } } } } bool CTimeSeriesDecompositionDetail::CSeasonalityTest::acceptRestoreTraverser( core::CStateRestoreTraverser& traverser) { do { const std::string& name{traverser.name()}; RESTORE(SEASONALITY_TEST_MACHINE_6_3_TAG, traverser.traverseSubLevel([this](core::CStateRestoreTraverser& traverser_) { return m_Machine.acceptRestoreTraverser(traverser_); })) RESTORE_SETUP_TEARDOWN( SHORT_WINDOW_7_9_TAG, m_Windows[E_Short] = this->newWindow(E_Short), m_Windows[E_Short] && traverser.traverseSubLevel([this](auto& traverser_) { return m_Windows[E_Short]->acceptRestoreTraverser(traverser_); }), /**/) RESTORE_SETUP_TEARDOWN( LONG_WINDOW_7_9_TAG, m_Windows[E_Long] = this->newWindow(E_Long), m_Windows[E_Long] && traverser.traverseSubLevel([this](auto& traverser_) { return m_Windows[E_Long]->acceptRestoreTraverser(traverser_); }), /**/) } while (traverser.next()); return true; } void CTimeSeriesDecompositionDetail::CSeasonalityTest::acceptPersistInserter( core::CStatePersistInserter& inserter) const { inserter.insertLevel(SEASONALITY_TEST_MACHINE_6_3_TAG, [this](auto& inserter_) { m_Machine.acceptPersistInserter(inserter_); }); if (m_Windows[E_Short] != nullptr) { inserter.insertLevel(SHORT_WINDOW_7_9_TAG, [this](auto& inserter_) { m_Windows[E_Short]->acceptPersistInserter(inserter_); }); } if (m_Windows[E_Long] != nullptr) { inserter.insertLevel(LONG_WINDOW_7_9_TAG, [this](auto& inserter_) { m_Windows[E_Long]->acceptPersistInserter(inserter_); }); } } void CTimeSeriesDecompositionDetail::CSeasonalityTest::swap(CSeasonalityTest& other) { std::swap(m_Machine, other.m_Machine); std::swap(m_DecayRate, other.m_DecayRate); std::swap(m_BucketLength, other.m_BucketLength); m_Windows[E_Short].swap(other.m_Windows[E_Short]); m_Windows[E_Long].swap(other.m_Windows[E_Long]); } void CTimeSeriesDecompositionDetail::CSeasonalityTest::handle(const SAddValue& message) { core_t::TTime time{message.s_Time}; double value{message.s_Value}; double prediction{message.s_Seasonal + message.s_Calendar}; // We have explicit handling of outliers and we can't accurately assess // them anyway before we've detected periodicity. double weight{maths_t::count(message.s_Weights)}; this->test(message); switch (m_Machine.state()) { case PT_TEST: // The seasonality test memory can increase as we add new values // so we stop updating it in hard limit. if (message.s_MemoryCircuitBreaker.areAllocationsAllowed() == false) { break; } for (auto& window : m_Windows) { if (window != nullptr) { window->add(time, value, prediction, weight); } } break; case PT_NOT_TESTING: break; case PT_INITIAL: this->apply(PT_NEW_VALUE, message); this->handle(message); break; default: LOG_ERROR(<< "Test in a bad state: " << m_Machine.state()); this->apply(PT_RESET, message); break; } } void CTimeSeriesDecompositionDetail::CSeasonalityTest::handle(const SDetectedTrend& message) { TPredictor predictor{message.s_Predictor}; TComponentChangeCallback componentChangeCallback{message.s_ComponentChangeCallback}; componentChangeCallback(this->residuals(predictor)); } void CTimeSeriesDecompositionDetail::CSeasonalityTest::test(const SAddValue& message) { core_t::TTime time{message.s_Time}; core_t::TTime lastTime{message.s_LastTime}; double occupancy{message.s_Occupancy}; const auto& makeTest = message.s_MakeTestForSeasonality; const auto& makePreconditioner = message.s_MakeSeasonalityTestPreconditioner; switch (m_Machine.state()) { case PT_TEST: for (auto i : {E_Short, E_Long}) { if (this->shouldTest(i, time)) { const auto& window = m_Windows[i]; core_t::TTime minimumPeriod{ CSeasonalityTestParameters::shortestComponent(i, m_BucketLength)}; std::size_t minimumResolutionToTestModelledComponent{ CSeasonalityTestParameters::minimumResolutionToTestModelledComponent( i, m_BucketLength, window->haveShorterWindows())}; auto seasonalityTest = makeTest(*window, minimumPeriod, minimumResolutionToTestModelledComponent, makePreconditioner(), occupancy); auto decomposition = seasonalityTest.decompose(); if (decomposition.componentsChanged()) { this->mediator()->forward( SDetectedSeasonal{time, lastTime, std::move(decomposition), message.s_MemoryCircuitBreaker}); } } } break; case PT_NOT_TESTING: case PT_INITIAL: break; default: LOG_ERROR(<< "Test in a bad state: " << m_Machine.state()); this->apply(PT_RESET, message); break; } } void CTimeSeriesDecompositionDetail::CSeasonalityTest::shiftTime(core_t::TTime time, core_t::TTime shift) { for (auto& window : m_Windows) { if (window != nullptr) { window->shiftTime(time, shift); } } } void CTimeSeriesDecompositionDetail::CSeasonalityTest::propagateForwards(core_t::TTime start, core_t::TTime end) { if (m_Windows[E_Short] != nullptr) { stepwisePropagateForwards(start, end, DAY, [this](double time) { m_Windows[E_Short]->propagateForwardsByTime(time / 8.0); }); } if (m_Windows[E_Long] != nullptr) { stepwisePropagateForwards(start, end, WEEK, [this](double time) { m_Windows[E_Long]->propagateForwardsByTime(time / 8.0); }); } } CTimeSeriesDecompositionDetail::TFloatMeanAccumulatorVec CTimeSeriesDecompositionDetail::CSeasonalityTest::residuals(const TPredictor& predictor) const { TFloatMeanAccumulatorVec result; for (auto i : {E_Short, E_Long}) { if (m_Windows[i] != nullptr) { // Add on any noise we smooth away by averaging over longer buckets. result = m_Windows[i]->valuesMinusPrediction(predictor); addMeanZeroNormalNoise(m_Windows[i]->withinBucketVariance(), result); break; } } return result; } std::uint64_t CTimeSeriesDecompositionDetail::CSeasonalityTest::checksum(std::uint64_t seed) const { seed = common::CChecksum::calculate(seed, m_Machine); seed = common::CChecksum::calculate(seed, m_DecayRate); seed = common::CChecksum::calculate(seed, m_BucketLength); return common::CChecksum::calculate(seed, m_Windows); } void CTimeSeriesDecompositionDetail::CSeasonalityTest::debugMemoryUsage( const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CSeasonalityTest"); core::memory_debug::dynamicSize("m_Windows", m_Windows, mem); } std::size_t CTimeSeriesDecompositionDetail::CSeasonalityTest::memoryUsage() const { std::size_t usage{core::memory::dynamicSize(m_Windows)}; if (m_Machine.state() == PT_INITIAL) { usage += this->extraMemoryOnInitialization(); } return usage; } std::size_t CTimeSeriesDecompositionDetail::CSeasonalityTest::extraMemoryOnInitialization() const { static std::size_t result{0}; if (result == 0) { for (auto i : {E_Short, E_Long}) { auto window = this->newWindow(i, false); // The 0.3 is a rule-of-thumb estimate of the worst case // compression ratio we achieve on the test state. result += static_cast<std::size_t>( 0.3 * static_cast<double>(core::memory::dynamicSize(window))); } } return result; } void CTimeSeriesDecompositionDetail::CSeasonalityTest::apply(std::size_t symbol, const SMessage& message) { core_t::TTime time{message.s_Time}; std::size_t old{m_Machine.state()}; m_Machine.apply(symbol); std::size_t state{m_Machine.state()}; if (state != old) { LOG_TRACE(<< PT_STATES[old] << "," << PT_ALPHABET[symbol] << " -> " << PT_STATES[state]); auto initialize = [time, this]() { for (auto i : {E_Short, E_Long}) { m_Windows[i] = this->newWindow(i); if (m_Windows[i] != nullptr) { m_Windows[i]->initialize(common::CIntegerTools::floor( time, CSeasonalityTestParameters::maxBucketLength(i, m_BucketLength))); } } }; switch (state) { case PT_TEST: if (std::all_of(m_Windows.begin(), m_Windows.end(), [](const auto& window) { return window == nullptr; })) { initialize(); } break; case PT_INITIAL: initialize(); break; case PT_NOT_TESTING: m_Windows[0].reset(); m_Windows[1].reset(); break; default: LOG_ERROR(<< "Test in a bad state: " << state); this->apply(PT_RESET, message); break; } } } CTimeSeriesDecompositionDetail::CSeasonalityTest::TExpandingWindowUPtr CTimeSeriesDecompositionDetail::CSeasonalityTest::newWindow(ETest test, bool deflate) const { using TTimeCRng = CExpandingWindow::TTimeCRng; std::size_t numberBuckets{CSeasonalityTestParameters::numberBuckets(test, m_BucketLength)}; const TTimeVec* bucketLengths{CSeasonalityTestParameters::bucketLengths(test, m_BucketLength)}; if (bucketLengths != nullptr) { return std::make_unique<CExpandingWindow>( m_BucketLength, TTimeCRng{*bucketLengths, 0, bucketLengths->size()}, numberBuckets, m_DecayRate, deflate); } return {}; } bool CTimeSeriesDecompositionDetail::CSeasonalityTest::shouldTest(ETest test, core_t::TTime time) const { // We need to test more frequently than we compress because it // would significantly delay when we first detect short periodic // components for longer bucket lengths otherwise. auto scheduledTest = [&]() { core_t::TTime length{time - m_Windows[test]->beginValuesTime()}; for (auto schedule : CSeasonalityTestParameters::testSchedule(test, m_BucketLength)) { if (length >= schedule && length < schedule + m_BucketLength) { return true; } } return false; }; return m_Windows[test] != nullptr && (m_Windows[test]->needToCompress(time) || scheduledTest()); } //////// CCalendarCyclic //////// CTimeSeriesDecompositionDetail::CCalendarTest::CCalendarTest(double decayRate, core_t::TTime bucketLength) : m_Machine{core::CStateMachine::create(CC_ALPHABET, CC_STATES, CC_TRANSITION_FUNCTION, bucketLength > DAY ? CC_NOT_TESTING : CC_INITIAL)}, m_DecayRate{decayRate}, m_BucketLength{bucketLength}, m_LastMonth{} { } CTimeSeriesDecompositionDetail::CCalendarTest::CCalendarTest(const CCalendarTest& other, bool isForForecast) : m_Machine{other.m_Machine}, m_DecayRate{other.m_DecayRate}, m_BucketLength{other.m_BucketLength}, m_LastMonth{other.m_LastMonth}, m_Test{isForForecast == false && other.m_Test ? std::make_unique<CCalendarCyclicTest>(*other.m_Test) : nullptr} { } bool CTimeSeriesDecompositionDetail::CCalendarTest::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { do { const std::string& name{traverser.name()}; RESTORE(CALENDAR_TEST_MACHINE_6_3_TAG, traverser.traverseSubLevel([this](core::CStateRestoreTraverser& traverser_) { return m_Machine.acceptRestoreTraverser(traverser_); })) RESTORE_BUILT_IN(LAST_MONTH_6_3_TAG, m_LastMonth) RESTORE_SETUP_TEARDOWN( CALENDAR_TEST_6_3_TAG, m_Test = std::make_unique<CCalendarCyclicTest>(m_BucketLength, m_DecayRate), traverser.traverseSubLevel([this](auto& traverser_) { return m_Test->acceptRestoreTraverser(traverser_); }), /**/) } while (traverser.next()); return true; } void CTimeSeriesDecompositionDetail::CCalendarTest::acceptPersistInserter( core::CStatePersistInserter& inserter) const { inserter.insertLevel(CALENDAR_TEST_MACHINE_6_3_TAG, [this](auto& inserter_) { m_Machine.acceptPersistInserter(inserter_); }); inserter.insertValue(LAST_MONTH_6_3_TAG, m_LastMonth); if (m_Test != nullptr) { inserter.insertLevel(CALENDAR_TEST_6_3_TAG, [this](auto& inserter_) { m_Test->acceptPersistInserter(inserter_); }); } } void CTimeSeriesDecompositionDetail::CCalendarTest::swap(CCalendarTest& other) { std::swap(m_Machine, other.m_Machine); std::swap(m_DecayRate, other.m_DecayRate); std::swap(m_BucketLength, other.m_BucketLength); std::swap(m_LastMonth, other.m_LastMonth); m_Test.swap(other.m_Test); } void CTimeSeriesDecompositionDetail::CCalendarTest::handle(const SAddValue& message) { core_t::TTime time{message.s_Time}; double value{message.s_Value}; double error{message.s_Value - message.s_Trend - message.s_Seasonal - message.s_Calendar}; const maths_t::TDoubleWeightsAry& weights{message.s_Weights}; this->test(message); switch (m_Machine.state()) { case CC_TEST: // The calendar test memory can increase as we add new values // so we stop updating it in hard limit. if (message.s_MemoryCircuitBreaker.areAllocationsAllowed() == false) { break; } m_Test->add(time, value, error, maths_t::countForUpdate(weights)); break; case CC_NOT_TESTING: break; case CC_INITIAL: this->apply(CC_NEW_VALUE, message); this->handle(message); break; default: LOG_ERROR(<< "Test in a bad state: " << m_Machine.state()); this->apply(CC_RESET, message); break; } } void CTimeSeriesDecompositionDetail::CCalendarTest::handle(const SDetectedSeasonal& message) { switch (m_Machine.state()) { case CC_TEST: m_Test->forgetErrorDistribution(); break; case CC_NOT_TESTING: case CC_INITIAL: break; default: LOG_ERROR(<< "Test in a bad state: " << m_Machine.state()); this->apply(CC_RESET, message); break; } } void CTimeSeriesDecompositionDetail::CCalendarTest::test(const SMessage& message) { core_t::TTime time{message.s_Time}; core_t::TTime lastTime{message.s_LastTime}; if (this->shouldTest(time)) { switch (m_Machine.state()) { case CC_TEST: { auto result = m_Test->test(); for (auto component : result) { auto[feature, timeZoneOffset] = component; this->mediator()->forward(SDetectedCalendar( time, lastTime, feature, timeZoneOffset, message.s_MemoryCircuitBreaker)); } break; } case CC_NOT_TESTING: case CC_INITIAL: break; default: LOG_ERROR(<< "Test in a bad state: " << m_Machine.state()); this->apply(CC_RESET, message); break; } } } void CTimeSeriesDecompositionDetail::CCalendarTest::propagateForwards(core_t::TTime start, core_t::TTime end) { if (m_Test != nullptr) { stepwisePropagateForwards(start, end, DAY, [this](double time) { m_Test->propagateForwardsByTime(time / 8.0); }); } } std::uint64_t CTimeSeriesDecompositionDetail::CCalendarTest::checksum(std::uint64_t seed) const { seed = common::CChecksum::calculate(seed, m_Machine); seed = common::CChecksum::calculate(seed, m_DecayRate); seed = common::CChecksum::calculate(seed, m_BucketLength); seed = common::CChecksum::calculate(seed, m_LastMonth); return common::CChecksum::calculate(seed, m_Test); } void CTimeSeriesDecompositionDetail::CCalendarTest::debugMemoryUsage( const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CCalendarTest"); core::memory_debug::dynamicSize("m_Test", m_Test, mem); } std::size_t CTimeSeriesDecompositionDetail::CCalendarTest::memoryUsage() const { std::size_t usage{core::memory::dynamicSize(m_Test)}; if (m_Machine.state() == CC_INITIAL) { usage += this->extraMemoryOnInitialization(); } return usage; } std::size_t CTimeSeriesDecompositionDetail::CCalendarTest::extraMemoryOnInitialization() const { static std::size_t result{0}; if (result == 0) { TCalendarCyclicTestPtr test = std::make_unique<CCalendarCyclicTest>(m_BucketLength, m_DecayRate); result = core::memory::dynamicSize(test); } return result; } void CTimeSeriesDecompositionDetail::CCalendarTest::apply(std::size_t symbol, const SMessage& message) { core_t::TTime time{message.s_Time}; std::size_t old{m_Machine.state()}; m_Machine.apply(symbol); std::size_t state{m_Machine.state()}; if (state != old) { LOG_TRACE(<< CC_STATES[old] << "," << CC_ALPHABET[symbol] << " -> " << CC_STATES[state]); switch (state) { case CC_TEST: if (m_Test == nullptr) { m_Test = std::make_unique<CCalendarCyclicTest>(m_BucketLength, m_DecayRate); m_LastMonth = this->month(time) + 2; } break; case CC_NOT_TESTING: case CC_INITIAL: m_Test.reset(); m_LastMonth = int{}; break; default: LOG_ERROR(<< "Test in a bad state: " << state); this->apply(CC_RESET, message); break; } } } bool CTimeSeriesDecompositionDetail::CCalendarTest::shouldTest(core_t::TTime time) { int month{this->month(time)}; if (month == (m_LastMonth + 1) % 12) { m_LastMonth = month; return true; } return false; } int CTimeSeriesDecompositionDetail::CCalendarTest::month(core_t::TTime time) const { int dummy; int month; core::CTimezone::instance().dateFields(time, dummy, dummy, dummy, month, dummy, dummy); return month; } //////// CComponents //////// CTimeSeriesDecompositionDetail::CComponents::CComponents(double decayRate, core_t::TTime bucketLength, std::size_t seasonalComponentSize) : m_Machine{core::CStateMachine::create(SC_ALPHABET, SC_STATES, SC_TRANSITION_FUNCTION, SC_NORMAL)}, m_DecayRate{decayRate}, m_BucketLength{bucketLength}, m_SeasonalComponentSize{seasonalComponentSize}, m_CalendarComponentSize{seasonalComponentSize / 3}, m_Trend{decayRate}, m_ComponentChangeCallback{[](TFloatMeanAccumulatorVec) {}} { } CTimeSeriesDecompositionDetail::CComponents::CComponents(const CComponents& other) : m_Machine{other.m_Machine}, m_DecayRate{other.m_DecayRate}, m_BucketLength{other.m_BucketLength}, m_GainController{other.m_GainController}, m_SeasonalComponentSize{other.m_SeasonalComponentSize}, m_CalendarComponentSize{other.m_CalendarComponentSize}, m_Trend{other.m_Trend}, m_Seasonal{other.m_Seasonal ? std::make_unique<CSeasonal>(*other.m_Seasonal) : nullptr}, m_Calendar{other.m_Calendar ? std::make_unique<CCalendar>(*other.m_Calendar) : nullptr}, m_MeanVarianceScale{other.m_MeanVarianceScale}, m_PredictionErrorWithoutTrend{other.m_PredictionErrorWithoutTrend}, m_PredictionErrorWithTrend{other.m_PredictionErrorWithTrend}, m_ComponentChangeCallback{[](TFloatMeanAccumulatorVec) {}}, m_UsingTrendForPrediction{other.m_UsingTrendForPrediction} { } bool CTimeSeriesDecompositionDetail::CComponents::acceptRestoreTraverser( const common::SDistributionRestoreParams& params, core::CStateRestoreTraverser& traverser) { if (traverser.name() == VERSION_6_3_TAG) { while (traverser.next()) { const std::string& name{traverser.name()}; RESTORE(COMPONENTS_MACHINE_6_3_TAG, traverser.traverseSubLevel([this](core::CStateRestoreTraverser& traverser_) { return m_Machine.acceptRestoreTraverser(traverser_); })) RESTORE_BUILT_IN(DECAY_RATE_6_3_TAG, m_DecayRate) RESTORE(GAIN_CONTROLLER_6_3_TAG, traverser.traverseSubLevel([this](auto& traverser_) { return m_GainController.acceptRestoreTraverser(traverser_); })) RESTORE(TREND_6_3_TAG, traverser.traverseSubLevel([&](auto& traverser_) { return m_Trend.acceptRestoreTraverser(params, traverser_); })) RESTORE_SETUP_TEARDOWN( SEASONAL_6_3_TAG, m_Seasonal = std::make_unique<CSeasonal>(), traverser.traverseSubLevel([this](auto& traverser_) { return m_Seasonal->acceptRestoreTraverser( m_DecayRate, m_BucketLength, traverser_); }), /**/) RESTORE_SETUP_TEARDOWN( CALENDAR_6_3_TAG, m_Calendar = std::make_unique<CCalendar>(), traverser.traverseSubLevel([this](auto& traverser_) { return m_Calendar->acceptRestoreTraverser( m_DecayRate, m_BucketLength, traverser_); }), /**/) RESTORE(MEAN_VARIANCE_SCALE_6_3_TAG, m_MeanVarianceScale.fromDelimited(traverser.value())) RESTORE(MOMENTS_6_3_TAG, m_PredictionErrorWithoutTrend.fromDelimited(traverser.value())) RESTORE(MOMENTS_MINUS_TREND_6_3_TAG, m_PredictionErrorWithTrend.fromDelimited(traverser.value())) RESTORE_BUILT_IN(USING_TREND_FOR_PREDICTION_6_3_TAG, m_UsingTrendForPrediction) } this->decayRate(m_DecayRate); } else { return false; } return true; } void CTimeSeriesDecompositionDetail::CComponents::acceptPersistInserter( core::CStatePersistInserter& inserter) const { inserter.insertValue(VERSION_6_3_TAG, ""); inserter.insertLevel(COMPONENTS_MACHINE_6_3_TAG, [this](auto& inserter_) { m_Machine.acceptPersistInserter(inserter_); }); inserter.insertValue(DECAY_RATE_6_3_TAG, m_DecayRate, core::CIEEE754::E_SinglePrecision); inserter.insertLevel(GAIN_CONTROLLER_6_3_TAG, [this](auto& inserter_) { m_GainController.acceptPersistInserter(inserter_); }); inserter.insertLevel(TREND_6_3_TAG, [this](auto& inserter_) { m_Trend.acceptPersistInserter(inserter_); }); if (m_Seasonal != nullptr) { inserter.insertLevel(SEASONAL_6_3_TAG, [this](auto& inserter_) { m_Seasonal->acceptPersistInserter(inserter_); }); } if (m_Calendar != nullptr) { inserter.insertLevel(CALENDAR_6_3_TAG, [this](auto& inserter_) { m_Calendar->acceptPersistInserter(inserter_); }); } inserter.insertValue(MEAN_VARIANCE_SCALE_6_3_TAG, m_MeanVarianceScale.toDelimited()); inserter.insertValue(MOMENTS_6_3_TAG, m_PredictionErrorWithoutTrend.toDelimited()); inserter.insertValue(MOMENTS_MINUS_TREND_6_3_TAG, m_PredictionErrorWithTrend.toDelimited()); inserter.insertValue(USING_TREND_FOR_PREDICTION_6_3_TAG, m_UsingTrendForPrediction); } void CTimeSeriesDecompositionDetail::CComponents::swap(CComponents& other) { std::swap(m_Machine, other.m_Machine); std::swap(m_DecayRate, other.m_DecayRate); std::swap(m_BucketLength, other.m_BucketLength); std::swap(m_SeasonalComponentSize, other.m_SeasonalComponentSize); std::swap(m_CalendarComponentSize, other.m_CalendarComponentSize); m_Trend.swap(other.m_Trend); m_Seasonal.swap(other.m_Seasonal); m_Calendar.swap(other.m_Calendar); std::swap(m_GainController, other.m_GainController); std::swap(m_MeanVarianceScale, other.m_MeanVarianceScale); std::swap(m_PredictionErrorWithoutTrend, other.m_PredictionErrorWithoutTrend); std::swap(m_PredictionErrorWithTrend, other.m_PredictionErrorWithTrend); std::swap(m_UsingTrendForPrediction, other.m_UsingTrendForPrediction); } void CTimeSeriesDecompositionDetail::CComponents::handle(const SAddValue& message) { switch (m_Machine.state()) { case SC_NORMAL: case SC_NEW_COMPONENTS: { this->interpolate(message); core_t::TTime time{message.s_Time}; double value{message.s_Value}; double trend{message.s_Trend}; const maths_t::TDoubleWeightsAry& weights{message.s_Weights}; const TMakePredictor& makePredictor{message.s_MakePredictor}; TSeasonalComponentPtrVec seasonalComponents; TCalendarComponentPtrVec calendarComponents; TComponentErrorsPtrVec seasonalErrors; TComponentErrorsPtrVec calendarErrors; TDoubleVec deltas; if (m_Seasonal != nullptr) { m_Seasonal->componentsErrorsAndDeltas(time, seasonalComponents, seasonalErrors, deltas); } if (m_Calendar != nullptr) { m_Calendar->componentsAndErrors(time, calendarComponents, calendarErrors); } double weight{maths_t::countForUpdate(weights)}; double initialWeight{maths_t::count(weights)}; std::size_t m{seasonalComponents.size()}; std::size_t n{calendarComponents.size()}; TDoubleVec values(m + n + 1, value); TDoubleVec predictions(m + n, 0.0); double referenceError; double error; double scale; decompose(trend, seasonalComponents, calendarComponents, time, deltas, m_GainController.gain(), values, predictions, referenceError, error, scale); TDoubleVec variances(m + n + 1, 0.0); if (m_UsingTrendForPrediction) { variances[0] = m_Trend.variance(0.0).mean(); } for (std::size_t i = 1; i <= m; ++i) { variances[i] = seasonalComponents[i - 1]->variance(time, 0.0).mean(); } for (std::size_t i = m + 1; i <= m + n; ++i) { variances[i] = calendarComponents[i - m - 1]->variance(time, 0.0).mean(); } double variance{std::accumulate(variances.begin(), variances.end(), 0.0)}; double expectedVarianceIncrease{1.0 / static_cast<double>(m + n + 1)}; bool testForTrend{(m_UsingTrendForPrediction == false) && (m_Trend.observedInterval() > 6 * m_BucketLength)}; m_Trend.add(time, values[0], weight); m_Trend.dontShiftLevel(time, value); for (std::size_t i = 1; i <= m; ++i) { CSeasonalComponent* component{seasonalComponents[i - 1]}; CComponentErrors* error_{seasonalErrors[i - 1]}; double varianceIncrease{variance == 0.0 ? 1.0 : variances[i] / variance / expectedVarianceIncrease}; component->add(time, values[i], component->initialized() ? weight : initialWeight, 0.5); error_->add(referenceError, error, predictions[i - 1], varianceIncrease, weight); } for (std::size_t i = m + 1; i <= m + n; ++i) { CCalendarComponent* component{calendarComponents[i - m - 1]}; CComponentErrors* error_{calendarErrors[i - m - 1]}; double varianceIncrease{variance == 0.0 ? 1.0 : variances[i] / variance / expectedVarianceIncrease}; component->add(time, values[i], component->initialized() ? weight : initialWeight); error_->add(referenceError, error, predictions[i - 1], varianceIncrease, weight); } m_MeanVarianceScale.add(scale, weight); m_PredictionErrorWithoutTrend.add(error + trend, weight); m_PredictionErrorWithTrend.add(error, weight); m_GainController.add(time, predictions); if (testForTrend && this->shouldUseTrendForPrediction()) { LOG_DEBUG(<< "Detected trend at " << time); this->mediator()->forward(SDetectedTrend{makePredictor(), m_ComponentChangeCallback, message.s_MemoryCircuitBreaker}); m_ModelAnnotationCallback("Detected trend"); } } break; case SC_DISABLED: break; default: LOG_ERROR(<< "Components in a bad state: " << m_Machine.state()); this->apply(SC_RESET, message); break; } } void CTimeSeriesDecompositionDetail::CComponents::handle(const SDetectedSeasonal& message) { if (this->size() + m_SeasonalComponentSize > this->maxSize()) { return; } switch (m_Machine.state()) { case SC_NORMAL: case SC_NEW_COMPONENTS: { if (m_Seasonal == nullptr) { m_Seasonal = std::make_unique<CSeasonal>(); } core_t::TTime time{message.s_Time}; const auto& components = message.s_Components; LOG_DEBUG(<< "Detected change in seasonal components at " << time); this->addSeasonalComponents(components, message.s_MemoryCircuitBreaker); this->apply(SC_ADDED_COMPONENTS, message); break; } case SC_DISABLED: break; default: LOG_ERROR(<< "Components in a bad state: " << m_Machine.state()); this->apply(SC_RESET, message); break; } } void CTimeSeriesDecompositionDetail::CComponents::handle(const SDetectedCalendar& message) { if (this->size() + m_CalendarComponentSize > this->maxSize()) { return; } switch (m_Machine.state()) { case SC_NORMAL: case SC_NEW_COMPONENTS: { if (m_Calendar == nullptr) { m_Calendar = std::make_unique<CCalendar>(); } core_t::TTime time{message.s_Time}; CCalendarFeature feature{message.s_Feature}; core_t::TTime timeZoneOffset{message.s_TimeZoneOffset}; if (m_Calendar->haveComponent(feature)) { break; } LOG_DEBUG(<< "Detected feature '" << feature.print() << "' at " << time); this->addCalendarComponent(feature, message.s_MemoryCircuitBreaker, timeZoneOffset); this->apply(SC_ADDED_COMPONENTS, message); break; } case SC_DISABLED: break; default: LOG_ERROR(<< "Components in a bad state: " << m_Machine.state()); this->apply(SC_RESET, message); break; } } void CTimeSeriesDecompositionDetail::CComponents::handle(const SDetectedChangePoint& message) { core_t::TTime time{message.s_Time}; const auto& change = *message.s_Change; change.apply(m_Trend); if (m_Seasonal != nullptr) { m_Seasonal->apply(change); } if (m_Calendar != nullptr) { m_Calendar->apply(change); } if (m_UsingTrendForPrediction == false) { m_ComponentChangeCallback(change.residuals()); m_UsingTrendForPrediction = true; } LOG_DEBUG(<< "Detected " << change.print() << " at " << time); m_ModelAnnotationCallback("Detected " + change.print()); } void CTimeSeriesDecompositionDetail::CComponents::interpolateForForecast(core_t::TTime time) { if (this->shouldInterpolate(time)) { if (m_Seasonal != nullptr) { m_Seasonal->interpolate(time, false); } if (m_Calendar != nullptr) { m_Calendar->interpolate(time, true); } } } void CTimeSeriesDecompositionDetail::CComponents::dataType(maths_t::EDataType dataType) { m_Trend.dataType(dataType); } void CTimeSeriesDecompositionDetail::CComponents::decayRate(double decayRate) { m_DecayRate = decayRate; m_Trend.decayRate(decayRate); if (m_Seasonal != nullptr) { m_Seasonal->decayRate(decayRate); } if (m_Calendar != nullptr) { m_Calendar->decayRate(decayRate); } } double CTimeSeriesDecompositionDetail::CComponents::decayRate() const { return m_DecayRate; } void CTimeSeriesDecompositionDetail::CComponents::propagateForwards(core_t::TTime start, core_t::TTime end) { m_Trend.propagateForwardsByTime(end - start); if (m_Seasonal != nullptr) { m_Seasonal->propagateForwards(start, end); } if (m_Calendar != nullptr) { m_Calendar->propagateForwards(start, end); } double factor{ageFactor(m_DecayRate, end - start)}; m_MeanVarianceScale.age(factor); m_PredictionErrorWithTrend.age(factor); m_PredictionErrorWithoutTrend.age(factor); m_GainController.age(factor); } bool CTimeSeriesDecompositionDetail::CComponents::initialized() const { return m_UsingTrendForPrediction && m_Trend.initialized() ? true : (m_Seasonal != nullptr && m_Calendar != nullptr ? m_Seasonal->initialized() || m_Calendar->initialized() : (m_Seasonal != nullptr ? m_Seasonal->initialized() : (m_Calendar ? m_Calendar->initialized() : false))); } const CTrendComponent& CTimeSeriesDecompositionDetail::CComponents::trend() const { return m_Trend; } const TSeasonalComponentVec& CTimeSeriesDecompositionDetail::CComponents::seasonal() const { return m_Seasonal != nullptr ? m_Seasonal->components() : NO_SEASONAL_COMPONENTS; } const maths_t::TCalendarComponentVec& CTimeSeriesDecompositionDetail::CComponents::calendar() const { return m_Calendar != nullptr ? m_Calendar->components() : NO_CALENDAR_COMPONENTS; } bool CTimeSeriesDecompositionDetail::CComponents::usingTrendForPrediction() const { return m_UsingTrendForPrediction; } void CTimeSeriesDecompositionDetail::CComponents::useTrendForPrediction() { m_UsingTrendForPrediction = true; } CTimeSeriesDecompositionDetail::TMakeTestForSeasonality CTimeSeriesDecompositionDetail::CComponents::makeTestForSeasonality( const TMakeFilteredPredictor& makePredictor) const { return [makePredictor, this](const CExpandingWindow& window, core_t::TTime minimumPeriod, std::size_t minimumResolutionToTestModelledComponent, const TFilteredPredictor& preconditioner, double occupancy) { core_t::TTime valuesStartTime{window.beginValuesTime()}; core_t::TTime windowBucketStartTime{window.bucketStartTime()}; core_t::TTime windowBucketLength{window.bucketLength()}; auto values = window.values(); TBoolVec testableMask; for (const auto& component : this->seasonal()) { testableMask.push_back(CTimeSeriesTestForSeasonality::canTestModelledComponent( values, windowBucketStartTime, windowBucketLength, minimumPeriod, minimumResolutionToTestModelledComponent, component.time())); } values = window.valuesMinusPrediction(std::move(values), [&](core_t::TTime time) { return preconditioner(time, testableMask); }); CTimeSeriesTestForSeasonality test(valuesStartTime, windowBucketStartTime, windowBucketLength, m_BucketLength, std::move(values), occupancy); test.minimumPeriod(minimumPeriod) .minimumModelSize(2 * m_SeasonalComponentSize / 3) .maximumModelSize(2 * m_SeasonalComponentSize) .sampleVariance(window.withinBucketVariance()) .modelledSeasonalityPredictor(makePredictor()); std::ptrdiff_t maximumNumberComponents{MAXIMUM_COMPONENTS}; for (const auto& component : this->seasonal()) { test.addModelledSeasonality(component.time(), minimumResolutionToTestModelledComponent, component.size()); --maximumNumberComponents; } test.maximumNumberOfComponents(maximumNumberComponents); test.prepareWindowForDecompose(); return test; }; } double CTimeSeriesDecompositionDetail::CComponents::meanValue(core_t::TTime time) const { return this->initialized() ? ((m_UsingTrendForPrediction ? m_Trend.value(time, 0.0).mean() : 0.0) + meanOf(&CSeasonalComponent::meanValue, this->seasonal())) : 0.0; } double CTimeSeriesDecompositionDetail::CComponents::meanVariance() const { return this->initialized() ? ((m_UsingTrendForPrediction ? this->trend().variance(0.0).mean() : 0.0) + meanOf(&CSeasonalComponent::meanVariance, this->seasonal())) : 0.0; } double CTimeSeriesDecompositionDetail::CComponents::meanVarianceScale() const { return common::CBasicStatistics::mean(m_MeanVarianceScale); } std::uint64_t CTimeSeriesDecompositionDetail::CComponents::checksum(std::uint64_t seed) const { seed = common::CChecksum::calculate(seed, m_Machine); seed = common::CChecksum::calculate(seed, m_DecayRate); seed = common::CChecksum::calculate(seed, m_BucketLength); seed = common::CChecksum::calculate(seed, m_SeasonalComponentSize); seed = common::CChecksum::calculate(seed, m_CalendarComponentSize); seed = common::CChecksum::calculate(seed, m_Trend); seed = common::CChecksum::calculate(seed, m_Seasonal); seed = common::CChecksum::calculate(seed, m_Calendar); seed = common::CChecksum::calculate(seed, m_MeanVarianceScale); seed = common::CChecksum::calculate(seed, m_PredictionErrorWithoutTrend); seed = common::CChecksum::calculate(seed, m_PredictionErrorWithTrend); seed = common::CChecksum::calculate(seed, m_GainController); return common::CChecksum::calculate(seed, m_UsingTrendForPrediction); } void CTimeSeriesDecompositionDetail::CComponents::debugMemoryUsage( const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CComponents"); core::memory_debug::dynamicSize("m_Trend", m_Trend, mem); core::memory_debug::dynamicSize("m_Seasonal", m_Seasonal, mem); core::memory_debug::dynamicSize("m_Calendar", m_Calendar, mem); } std::size_t CTimeSeriesDecompositionDetail::CComponents::memoryUsage() const { return core::memory::dynamicSize(m_Trend) + core::memory::dynamicSize(m_Seasonal) + core::memory::dynamicSize(m_Calendar); } std::size_t CTimeSeriesDecompositionDetail::CComponents::size() const { return (m_Seasonal ? m_Seasonal->size() : 0) + (m_Calendar ? m_Calendar->size() : 0); } std::size_t CTimeSeriesDecompositionDetail::CComponents::maxSize() const { return MAXIMUM_COMPONENTS * m_SeasonalComponentSize; } void CTimeSeriesDecompositionDetail::CComponents::addSeasonalComponents( const CSeasonalDecomposition& components, const TMemoryCircuitBreaker& memoryCircuitBreaker) { LOG_TRACE(<< "remove mask = " << components.seasonalToRemoveMask()); LOG_TRACE(<< "Estimate size change = " << m_Seasonal->estimateSizeChange(components, m_DecayRate, m_BucketLength)); if (memoryCircuitBreaker.areAllocationsAllowed() == false && m_Seasonal->estimateSizeChange(components, m_DecayRate, m_BucketLength) > 0) { // In the hard_limit state, we do not change the state of components if // adding new components will consume more memory than removing old ones. LOG_TRACE(<< "Not adding new seasonal components because we are in the hard limit state"); return; } if (m_Seasonal->remove(components.seasonalToRemoveMask()) == false) { // We don't know how to apply the changes so just bail. return; } if (components.seasonal().empty()) { LOG_DEBUG(<< "removed all seasonality"); m_ModelAnnotationCallback("removed all seasonality"); } for (const auto& component : components.seasonal()) { m_Seasonal->add(component.createSeasonalComponent( m_DecayRate, static_cast<double>(m_BucketLength))); m_ModelAnnotationCallback(component.annotationText()); } m_Seasonal->refreshForNewComponents(); this->clearComponentErrors(); core_t::TTime startTime{components.trend()->initialValuesStartTime()}; core_t::TTime endTime{components.trend()->initialValuesEndTime()}; core_t::TTime dt{components.trend()->bucketLength()}; auto initialValues = components.trend()->initialValues(); // Reinitialize the gain controller. TDoubleVec predictions; m_GainController.clear(); for (core_t::TTime time = startTime; time < endTime; time += m_BucketLength) { predictions.clear(); if (m_Seasonal != nullptr) { m_Seasonal->appendPredictions(time, predictions); } if (m_Calendar != nullptr) { m_Calendar->appendPredictions(time, predictions); } m_GainController.seed(predictions); m_GainController.age(ageFactor(m_DecayRate, m_BucketLength)); } // Fit a trend model. CTrendComponent newTrend{m_Trend.defaultDecayRate()}; this->fitTrend(startTime, dt, initialValues, newTrend); m_Trend.swap(newTrend); m_UsingTrendForPrediction = true; // Pass the residuals to the component changed callback. core_t::TTime time{startTime}; for (std::size_t i = 0; i < initialValues.size(); ++i, time += dt) { if (common::CBasicStatistics::count(initialValues[i]) > 0.0) { common::CBasicStatistics::moment<0>(initialValues[i]) -= m_Trend.value(time, 0.0).mean(); } } // We typically underestimate the values variance if the window bucket length // is longer than the job bucket length. This adds noise to the values we use // to reinitialize the residual model to compensate. addMeanZeroNormalNoise(components.withinBucketVariance(), initialValues); m_ComponentChangeCallback(std::move(initialValues)); } void CTimeSeriesDecompositionDetail::CComponents::addCalendarComponent( const CCalendarFeature& feature, const TMemoryCircuitBreaker& allocator, core_t::TTime timeZoneOffset) { if (allocator.areAllocationsAllowed() == false) { // In the hard_limit state we are not adding any new components to the model. LOG_TRACE(<< "Not adding new calendar component because we are in the hard limit state"); return; } double bucketLength{static_cast<double>(m_BucketLength)}; m_Calendar->add(feature, timeZoneOffset, m_CalendarComponentSize, m_DecayRate, bucketLength); m_ModelAnnotationCallback("Detected calendar feature: " + feature.print()); } void CTimeSeriesDecompositionDetail::CComponents::fitTrend(core_t::TTime startTime, core_t::TTime dt, const TFloatMeanAccumulatorVec& values, CTrendComponent& trend) const { core_t::TTime time{startTime}; for (const auto& value : values) { if (common::CBasicStatistics::count(value) > 0.0) { trend.add(time, common::CBasicStatistics::mean(value), common::CBasicStatistics::count(value)); trend.propagateForwardsByTime(dt); } time += dt; } } void CTimeSeriesDecompositionDetail::CComponents::clearComponentErrors() { if (m_Seasonal != nullptr) { m_Seasonal->clearPredictionErrors(); } if (m_Calendar != nullptr) { m_Calendar->clearPredictionErrors(); } } void CTimeSeriesDecompositionDetail::CComponents::apply(std::size_t symbol, const SMessage& message) { if (symbol == SC_RESET) { m_Trend.clear(); m_Seasonal.reset(); m_Calendar.reset(); } std::size_t old{m_Machine.state()}; m_Machine.apply(symbol); std::size_t state{m_Machine.state()}; if (state != old) { LOG_TRACE(<< SC_STATES[old] << "," << SC_ALPHABET[symbol] << " -> " << SC_STATES[state]); switch (state) { case SC_NORMAL: case SC_NEW_COMPONENTS: this->interpolate(message); break; case SC_DISABLED: m_Trend.clear(); m_Seasonal.reset(); m_Calendar.reset(); break; default: LOG_ERROR(<< "Components in a bad state: " << m_Machine.state()); this->apply(SC_RESET, message); break; } } } bool CTimeSeriesDecompositionDetail::CComponents::shouldUseTrendForPrediction() { double v0{common::CBasicStatistics::variance(m_PredictionErrorWithoutTrend)}; double v1{common::CBasicStatistics::variance(m_PredictionErrorWithTrend)}; double df0{common::CBasicStatistics::count(m_PredictionErrorWithoutTrend) - 1.0}; double df1{common::CBasicStatistics::count(m_PredictionErrorWithTrend) - m_Trend.parameters()}; if (df0 > 0.0 && df1 > 0.0 && v0 > 0.0) { double relativeLogSignificance{ common::CTools::fastLog(common::CStatisticalTests::leftTailFTest(v1, v0, df1, df0)) / common::CTools::fastLog(0.001)}; double vt{0.6 * v0}; double p{common::CTools::logisticFunction(relativeLogSignificance, 0.1, 1.0) * (vt > v1 ? common::CTools::logisticFunction(vt / v1, 1.0, 1.0, +1.0) : common::CTools::logisticFunction(v1 / vt, 0.1, 1.0, -1.0))}; m_UsingTrendForPrediction = (p >= 0.25); } return m_UsingTrendForPrediction; } bool CTimeSeriesDecompositionDetail::CComponents::shouldInterpolate(core_t::TTime time) { return m_Machine.state() == SC_NEW_COMPONENTS || (m_Seasonal && m_Seasonal->shouldInterpolate(time)) || (m_Calendar && m_Calendar->shouldInterpolate(time)); } void CTimeSeriesDecompositionDetail::CComponents::interpolate(const SMessage& message) { core_t::TTime time{message.s_Time}; std::size_t state{m_Machine.state()}; switch (state) { case SC_NORMAL: case SC_NEW_COMPONENTS: this->canonicalize(time); if (this->shouldInterpolate(time)) { LOG_TRACE(<< "Interpolating values at " << time); // As well as interpolating we also remove components that contain // invalid (not finite) values, along with the associated prediction // errors and signal that the set of components has been modified. if (m_Seasonal != nullptr) { if (m_Seasonal->removeComponentsWithBadValues(time)) { m_ComponentChangeCallback({}); } m_Seasonal->interpolate(time, true); } if (m_Calendar != nullptr) { if (m_Calendar->removeComponentsWithBadValues(time)) { m_ComponentChangeCallback({}); } m_Calendar->interpolate(time, true); } this->apply(SC_INTERPOLATED, message); } break; case SC_DISABLED: break; default: LOG_ERROR(<< "Components in a bad state: " << state); this->apply(SC_RESET, message); break; } } void CTimeSeriesDecompositionDetail::CComponents::shiftOrigin(core_t::TTime time) { time -= static_cast<core_t::TTime>(static_cast<double>(DAY) / m_DecayRate / 2.0); m_Trend.shiftOrigin(time); if (m_Seasonal != nullptr) { m_Seasonal->shiftOrigin(time); } m_GainController.shiftOrigin(time); } void CTimeSeriesDecompositionDetail::CComponents::canonicalize(core_t::TTime time) { // There is redundancy in the specification of the additive decomposition. For // any collection of models {m_i} then for any set of |{m_i}| constants {c_j} // satisfying sum_j c_j = 0 all models of the form m_i' = s_i + c_{j(i)} for // any permutation j(.) give the same predictions. Here we choose a canonical // form which minimises the values of the components to avoid issues with // cancellation errors. using TMinMaxAccumulator = common::CBasicStatistics::CMinMax<double>; this->shiftOrigin(time); if (m_Seasonal != nullptr && m_Seasonal->prune(time, m_BucketLength)) { m_Seasonal.reset(); } if (m_Calendar != nullptr && m_Calendar->prune(time, m_BucketLength)) { m_Calendar.reset(); } if (m_Seasonal != nullptr) { // Compute the sum level and slope for each separate window if the // components are time windowed. TTimeTimePrDoubleFMap levels; TTimeTimePrDoubleFMap slopes; TTimeTimePrDoubleFMap numberLevels; TTimeTimePrDoubleFMap numberSlopes; for (auto& component : m_Seasonal->components()) { auto window = component.time().windowed() ? component.time().window() : TTimeTimePr{0, 0}; levels[window] += component.meanValue(); numberLevels[window] += 1.0; if (component.slopeAccurate(time)) { slopes[window] += component.slope(); numberSlopes[window] += 1.0; } } TMinMaxAccumulator commonLevel; for (const auto& level : levels) { commonLevel.add(level.second); } if (commonLevel.signMargin() != 0.0) { for (auto& component : m_Seasonal->components()) { auto window = component.time().windowed() ? component.time().window() : TTimeTimePr{0, 0}; component.shiftLevel((levels[window] - commonLevel.signMargin()) / numberLevels[window] - component.meanValue()); } m_Trend.shiftLevel(commonLevel.signMargin()); } TMinMaxAccumulator commonSlope; for (const auto& slope : slopes) { commonSlope.add(slope.second); } if (commonSlope.signMargin() != 0.0) { for (auto& component : m_Seasonal->components()) { if (component.slopeAccurate(time)) { auto window = component.time().windowed() ? component.time().window() : TTimeTimePr{0, 0}; component.shiftSlope(time, (slopes[window] - commonSlope.signMargin()) / numberSlopes[window] - component.slope()); } } m_Trend.shiftSlope(time, commonSlope.signMargin()); } } } CTimeSeriesDecompositionDetail::CComponents::CScopeAttachComponentChangeCallback::CScopeAttachComponentChangeCallback( CComponents& components, TComponentChangeCallback componentChangeCallback, maths_t::TModelAnnotationCallback modelAnnotationCallback) : m_Components{components} { components.m_ComponentChangeCallback = std::move(componentChangeCallback); components.m_ModelAnnotationCallback = std::move(modelAnnotationCallback); } CTimeSeriesDecompositionDetail::CComponents::CScopeAttachComponentChangeCallback::~CScopeAttachComponentChangeCallback() { m_Components.m_ComponentChangeCallback = [](TFloatMeanAccumulatorVec) {}; m_Components.m_ModelAnnotationCallback = {}; } bool CTimeSeriesDecompositionDetail::CComponents::CGainController::acceptRestoreTraverser( core::CStateRestoreTraverser& traverser) { do { const std::string& name{traverser.name()}; RESTORE_BUILT_IN(REGRESSION_ORIGIN_6_4_TAG, m_RegressionOrigin) RESTORE(MEAN_SUM_AMPLITUDES_6_4_TAG, m_MeanSumAmplitudes.fromDelimited(traverser.value())) RESTORE(MEAN_SUM_AMPLITUDES_TREND_6_4_TAG, traverser.traverseSubLevel([this](auto& traverser_) { return m_MeanSumAmplitudesTrend.acceptRestoreTraverser(traverser_); })) } while (traverser.next()); return true; } void CTimeSeriesDecompositionDetail::CComponents::CGainController::acceptPersistInserter( core::CStatePersistInserter& inserter) const { inserter.insertValue(REGRESSION_ORIGIN_6_4_TAG, m_RegressionOrigin); inserter.insertValue(MEAN_SUM_AMPLITUDES_6_4_TAG, m_MeanSumAmplitudes.toDelimited()); inserter.insertLevel(MEAN_SUM_AMPLITUDES_TREND_6_4_TAG, [this](auto& inserter_) { m_MeanSumAmplitudesTrend.acceptPersistInserter(inserter_); }); } void CTimeSeriesDecompositionDetail::CComponents::CGainController::clear() { m_RegressionOrigin = 0; m_MeanSumAmplitudes = TFloatMeanAccumulator{}; m_MeanSumAmplitudesTrend = TRegression{}; } double CTimeSeriesDecompositionDetail::CComponents::CGainController::gain() const { if (m_MeanSumAmplitudesTrend.count() > 0.0) { TRegression::TArray params; m_MeanSumAmplitudesTrend.parameters(params); if (params[1] > 0.01 * common::CBasicStatistics::mean(m_MeanSumAmplitudes)) { // Anything less than one is sufficient to ensure that the basic update // dynamics are stable (poles of the Z-transform inside the unit circle). // There are however other factors at play which are hard to quantify // such as the sample weight and the fact that there's a lag detecting // instability. This gives us a margin for error. return 0.8; } } return 3.0; } void CTimeSeriesDecompositionDetail::CComponents::CGainController::seed(const TDoubleVec& predictions) { m_MeanSumAmplitudes.add(std::accumulate( predictions.begin(), predictions.end(), 0.0, [](double sum, double prediction) { return sum + std::fabs(prediction); })); } void CTimeSeriesDecompositionDetail::CComponents::CGainController::add(core_t::TTime time, const TDoubleVec& predictions) { if (predictions.empty() == false) { m_MeanSumAmplitudes.add(std::accumulate( predictions.begin(), predictions.end(), 0.0, [](double sum, double prediction) { return sum + std::fabs(prediction); })); m_MeanSumAmplitudesTrend.add(scaleTime(time, m_RegressionOrigin), common::CBasicStatistics::mean(m_MeanSumAmplitudes), common::CBasicStatistics::count(m_MeanSumAmplitudes)); } } void CTimeSeriesDecompositionDetail::CComponents::CGainController::age(double factor) { m_MeanSumAmplitudes.age(factor); m_MeanSumAmplitudesTrend.age(factor); } void CTimeSeriesDecompositionDetail::CComponents::CGainController::shiftOrigin(core_t::TTime time) { time = common::CIntegerTools::floor(time, WEEK); if (time > m_RegressionOrigin) { m_MeanSumAmplitudesTrend.shiftAbscissa(-scaleTime(time, m_RegressionOrigin)); m_RegressionOrigin = time; } } std::uint64_t CTimeSeriesDecompositionDetail::CComponents::CGainController::checksum(std::uint64_t seed) const { seed = common::CChecksum::calculate(seed, m_RegressionOrigin); seed = common::CChecksum::calculate(seed, m_MeanSumAmplitudes); return common::CChecksum::calculate(seed, m_MeanSumAmplitudesTrend); } bool CTimeSeriesDecompositionDetail::CComponents::CComponentErrors::fromDelimited(const std::string& str_) { std::string str{str_}; std::size_t n{str.find(common::CBasicStatistics::EXTERNAL_DELIMITER)}; if (m_MeanErrors.fromDelimited(str.substr(0, n)) == false) { LOG_ERROR(<< "Failed to parse '" << str << "'"); return false; } str = str.substr(n + 1); if (m_MaxVarianceIncrease.fromDelimited(str) == false) { LOG_ERROR(<< "Failed to parse '" << str << "'"); return false; } return true; } std::string CTimeSeriesDecompositionDetail::CComponents::CComponentErrors::toDelimited() const { return m_MeanErrors.toDelimited() + common::CBasicStatistics::EXTERNAL_DELIMITER + m_MaxVarianceIncrease.toDelimited(); } void CTimeSeriesDecompositionDetail::CComponents::CComponentErrors::add(double referenceError, double error, double prediction, double varianceIncrease, double weight) { TVector errors; errors(0) = common::CTools::pow2(referenceError); errors(1) = common::CTools::pow2(error); errors(2) = common::CTools::pow2(error + prediction); m_MeanErrors.add(this->winsorise(errors), weight); m_MaxVarianceIncrease.add(varianceIncrease); } void CTimeSeriesDecompositionDetail::CComponents::CComponentErrors::clear() { m_MeanErrors = TVectorMeanAccumulator{}; m_MaxVarianceIncrease = TMaxAccumulator{}; } bool CTimeSeriesDecompositionDetail::CComponents::CComponentErrors::remove(core_t::TTime bucketLength, core_t::TTime period) const { double history{common::CBasicStatistics::count(m_MeanErrors) * static_cast<double>(bucketLength)}; double errorWithNoComponents{common::CBasicStatistics::mean(m_MeanErrors)(0)}; double errorWithComponent{common::CBasicStatistics::mean(m_MeanErrors)(1)}; double errorWithoutComponent{common::CBasicStatistics::mean(m_MeanErrors)(2)}; return (history > static_cast<double>(WEEK) && errorWithComponent > errorWithNoComponents) || (history > 5.0 * static_cast<double>(period) && m_MaxVarianceIncrease[0] < 1.2 && errorWithoutComponent <= errorWithComponent); } void CTimeSeriesDecompositionDetail::CComponents::CComponentErrors::age(double factor) { m_MeanErrors.age(factor); m_MaxVarianceIncrease.age(factor); } std::uint64_t CTimeSeriesDecompositionDetail::CComponents::CComponentErrors::checksum(std::uint64_t seed) const { seed = common::CChecksum::calculate(seed, m_MeanErrors); return common::CChecksum::calculate(seed, m_MaxVarianceIncrease); } CTimeSeriesDecompositionDetail::CComponents::CComponentErrors::TVector CTimeSeriesDecompositionDetail::CComponents::CComponentErrors::winsorise(const TVector& squareError) const { return common::CBasicStatistics::count(m_MeanErrors) > 10.0 ? min(squareError, common::CFloatStorage{36} * common::CBasicStatistics::mean(m_MeanErrors)) : squareError; } bool CTimeSeriesDecompositionDetail::CComponents::CSeasonal::acceptRestoreTraverser( double decayRate, core_t::TTime bucketLength_, core::CStateRestoreTraverser& traverser) { double bucketLength{static_cast<double>(bucketLength_)}; if (traverser.name() == VERSION_6_4_TAG) { while (traverser.next()) { const std::string& name{traverser.name()}; RESTORE_NO_ERROR(COMPONENT_6_4_TAG, m_Components.emplace_back(decayRate, bucketLength, traverser)) RESTORE(ERRORS_6_4_TAG, core::CPersistUtils::restore(ERRORS_6_4_TAG, m_PredictionErrors, traverser)) } } else { LOG_ERROR(<< "Input error: unsupported state serialization version '" << traverser.name() << "'. Currently supported minimum version: " << VERSION_6_4_TAG); return false; } return true; } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::acceptPersistInserter( core::CStatePersistInserter& inserter) const { inserter.insertValue(VERSION_6_4_TAG, ""); for (const auto& component : m_Components) { inserter.insertLevel(COMPONENT_6_4_TAG, [&component](auto& inserter_) { component.acceptPersistInserter(inserter_); }); } core::CPersistUtils::persist(ERRORS_6_4_TAG, m_PredictionErrors, inserter); } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::decayRate(double decayRate) { for (auto& component : m_Components) { component.decayRate(decayRate); } } bool CTimeSeriesDecompositionDetail::CComponents::CSeasonal::removeComponentsWithBadValues(core_t::TTime time) { TBoolVec remove(m_Components.size(), false); bool anyBadComponentsFound{false}; for (std::size_t i = 0; i < m_Components.size(); ++i) { const CSeasonalTime& time_{m_Components[i].time()}; if (m_Components[i].isBad()) { LOG_DEBUG(<< "Removing seasonal component" << " with period '" << core::CTimeUtils::durationToString(time_.period()) << "' at " << time << ". Invalid values detected."); remove[i] = true; anyBadComponentsFound |= true; } } if (anyBadComponentsFound) { common::CSetTools::simultaneousRemoveIf( [](bool remove_) { return remove_; }, remove, m_Components, m_PredictionErrors); } return anyBadComponentsFound; } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::propagateForwards(core_t::TTime start, core_t::TTime end) { for (std::size_t i = 0; i < m_Components.size(); ++i) { core_t::TTime period{m_Components[i].time().period()}; stepwisePropagateForwards(start, end, period, [&](double time) { m_Components[i].propagateForwardsByTime(time / 6.0, 0.25); m_PredictionErrors[i].age(std::exp(-m_Components[i].decayRate() * time)); }); } } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::clearPredictionErrors() { for (auto& errors : m_PredictionErrors) { errors.clear(); } } std::size_t CTimeSeriesDecompositionDetail::CComponents::CSeasonal::size() const { std::size_t result{0}; for (const auto& component : m_Components) { result += component.size(); } return result; } std::ptrdiff_t CTimeSeriesDecompositionDetail::CComponents::CSeasonal::estimateSizeChange( const CSeasonalDecomposition& components, double decayRate, double bucketLength) const { // loop over components that will be removed and compute total size const CSeasonalDecomposition::TBoolVec& removeComponentsMask = components.seasonalToRemoveMask(); if (removeComponentsMask.size() != m_Components.size()) { LOG_ERROR(<< "Unexpected seasonal components to remove " << removeComponentsMask << ". Have " << m_Components.size() << " components."); return 0; //The size change is 0 because the attempt to remove seasonal components will fail. } std::ptrdiff_t removeComponentsSize{0}; for (std::size_t i = 0; i < removeComponentsMask.size(); ++i) { if (removeComponentsMask[i] == true) { removeComponentsSize += core::memory::dynamicSize(m_Components[i]); } } // loop over components that will be added and compute total size std::ptrdiff_t addComponentsSize{0}; for (const auto& component : components.seasonal()) { addComponentsSize += core::memory::dynamicSize( component.createSeasonalComponent(decayRate, bucketLength)); } LOG_TRACE(<< "Add components size: " << addComponentsSize << ", remove components size: " << removeComponentsSize << ", difference: " << addComponentsSize - removeComponentsSize << "."); // compute difference between components to be added and removed return addComponentsSize - removeComponentsSize; } const maths_t::TSeasonalComponentVec& CTimeSeriesDecompositionDetail::CComponents::CSeasonal::components() const { return m_Components; } maths_t::TSeasonalComponentVec& CTimeSeriesDecompositionDetail::CComponents::CSeasonal::components() { return m_Components; } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::componentsErrorsAndDeltas( core_t::TTime time, TSeasonalComponentPtrVec& components, TComponentErrorsPtrVec& errors, TDoubleVec& deltas) { std::size_t n{m_Components.size()}; components.reserve(n); errors.reserve(n); for (std::size_t i = 0; i < n; ++i) { if (m_Components[i].time().inWindow(time)) { components.push_back(&m_Components[i]); errors.push_back(&m_PredictionErrors[i]); } } deltas.resize(components.size(), 0.0); for (std::size_t i = 1; i < components.size(); ++i) { core_t::TTime period{components[i]->time().period()}; for (int j{static_cast<int>(i - 1)}; j > -1; --j) { core_t::TTime period_{components[j]->time().period()}; if (period % period_ == 0) { double value{components[j]->value(time, 0.0).mean() - components[j]->meanValue()}; double delta{0.1 * components[i]->delta(time, period_, value)}; deltas[j] += delta; deltas[i] -= delta; break; } } } } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::appendPredictions( core_t::TTime time, TDoubleVec& predictions) const { predictions.reserve(predictions.size() + m_Components.size()); for (const auto& component : m_Components) { if (component.time().inWindow(time)) { predictions.push_back(component.value(time, 0.0).mean() - component.meanValue()); } } } bool CTimeSeriesDecompositionDetail::CComponents::CSeasonal::shouldInterpolate(core_t::TTime time) const { for (const auto& component : m_Components) { if (component.shouldInterpolate(time)) { return true; } } return false; } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::interpolate(core_t::TTime time, bool refine) { for (auto& component : m_Components) { if (component.shouldInterpolate(time)) { component.interpolate(time, refine); } } } bool CTimeSeriesDecompositionDetail::CComponents::CSeasonal::initialized() const { for (const auto& component : m_Components) { if (component.initialized()) { return true; } } return false; } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::add(CSeasonalComponent&& component) { m_Components.push_back(std::move(component)); m_PredictionErrors.emplace_back(); } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::apply(const CChangePoint& change) { for (std::size_t i = 0; i < m_Components.size(); ++i) { if (change.apply(m_Components[i])) { m_PredictionErrors[i].clear(); } } } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::refreshForNewComponents() { common::COrderings::simultaneousSortWith( [](const CSeasonalComponent& lhs, const CSeasonalComponent& rhs) { return lhs.time() < rhs.time(); }, m_Components, m_PredictionErrors); } bool CTimeSeriesDecompositionDetail::CComponents::CSeasonal::remove(const TBoolVec& removeComponentsMask) { if (removeComponentsMask.size() != m_Components.size()) { LOG_ERROR(<< "Unexpected seasonal components to remove " << removeComponentsMask << ". Have " << m_Components.size() << " components."); return false; } std::size_t end{0}; for (std::size_t i = 0; i < removeComponentsMask.size(); end += removeComponentsMask[i++] ? 0 : 1) { if (i != end) { std::swap(m_Components[end], m_Components[i]); std::swap(m_PredictionErrors[end], m_PredictionErrors[i]); } } m_Components.erase(m_Components.begin() + end, m_Components.end()); m_PredictionErrors.erase(m_PredictionErrors.begin() + end, m_PredictionErrors.end()); return true; } bool CTimeSeriesDecompositionDetail::CComponents::CSeasonal::prune(core_t::TTime time, core_t::TTime bucketLength) { std::size_t n{m_Components.size()}; if (n > 1) { TTimeTimePrSizeFMap windowed; windowed.reserve(n); for (const auto& component : m_Components) { const CSeasonalTime& time_{component.time()}; if (time_.windowed()) { ++windowed[time_.window()]; } } TBoolVec remove(n, false); TTimeTimePrDoubleFMap shifts; shifts.reserve(n); for (std::size_t i = 0; i < n; ++i) { const CSeasonalTime& time_{m_Components[i].time()}; auto j = windowed.find(time_.window()); if (j == windowed.end() || j->second > 1) { if (m_PredictionErrors[i].remove(bucketLength, time_.period())) { LOG_DEBUG(<< "Removing seasonal component" << " with period '" << core::CTimeUtils::durationToString(time_.period()) << "' at " << time); remove[i] = true; shifts[time_.window()] += m_Components[i].meanValue(); --j->second; } } } common::CSetTools::simultaneousRemoveIf( [](bool remove_) { return remove_; }, remove, m_Components, m_PredictionErrors); for (auto& shift : shifts) { if (windowed.count(shift.first) > 0) { for (auto& component : m_Components) { if (shift.first == component.time().window()) { component.shiftLevel(shift.second); break; } } } else { bool fallback = true; for (auto& component : m_Components) { if (component.time().windowed() == false) { component.shiftLevel(shift.second); fallback = false; break; } } if (fallback) { TTimeTimePrVec shifted; shifted.reserve(m_Components.size()); for (auto& component : m_Components) { const CSeasonalTime& time_ = component.time(); auto containsWindow = [&time_](const TTimeTimePr& window) { return (time_.windowEnd() <= window.first || time_.windowStart() >= window.second) == false; }; if (std::find_if(shifted.begin(), shifted.end(), containsWindow) == shifted.end()) { component.shiftLevel(shift.second); } } } } } } return m_Components.empty(); } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::shiftOrigin(core_t::TTime time) { for (auto& component : m_Components) { component.shiftOrigin(time); } } std::uint64_t CTimeSeriesDecompositionDetail::CComponents::CSeasonal::checksum(std::uint64_t seed) const { seed = common::CChecksum::calculate(seed, m_Components); return common::CChecksum::calculate(seed, m_PredictionErrors); } void CTimeSeriesDecompositionDetail::CComponents::CSeasonal::debugMemoryUsage( const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CSeasonal"); core::memory_debug::dynamicSize("m_Components", m_Components, mem); core::memory_debug::dynamicSize("m_PredictionErrors", m_PredictionErrors, mem); } std::size_t CTimeSeriesDecompositionDetail::CComponents::CSeasonal::memoryUsage() const { return core::memory::dynamicSize(m_Components) + core::memory::dynamicSize(m_PredictionErrors); } bool CTimeSeriesDecompositionDetail::CComponents::CCalendar::acceptRestoreTraverser( double decayRate, core_t::TTime bucketLength_, core::CStateRestoreTraverser& traverser) { double bucketLength{static_cast<double>(bucketLength_)}; if (traverser.name() == VERSION_6_4_TAG) { while (traverser.next()) { const std::string& name{traverser.name()}; RESTORE_NO_ERROR(COMPONENT_6_4_TAG, m_Components.emplace_back(decayRate, bucketLength, traverser)) RESTORE(ERRORS_6_4_TAG, core::CPersistUtils::restore(ERRORS_6_4_TAG, m_PredictionErrors, traverser)) } } else { LOG_ERROR(<< "Input error: unsupported state serialization version '" << traverser.name() << "'. Currently supported minimum version: " << VERSION_6_4_TAG); return false; } return true; } void CTimeSeriesDecompositionDetail::CComponents::CCalendar::acceptPersistInserter( core::CStatePersistInserter& inserter) const { inserter.insertValue(VERSION_6_4_TAG, ""); for (const auto& component : m_Components) { inserter.insertLevel(COMPONENT_6_4_TAG, [&component](auto& inserter_) { return component.acceptPersistInserter(inserter_); }); } core::CPersistUtils::persist(ERRORS_6_4_TAG, m_PredictionErrors, inserter); } void CTimeSeriesDecompositionDetail::CComponents::CCalendar::decayRate(double decayRate) { for (auto& component : m_Components) { component.decayRate(decayRate); } } void CTimeSeriesDecompositionDetail::CComponents::CCalendar::propagateForwards(core_t::TTime start, core_t::TTime end) { for (std::size_t i = 0; i < m_Components.size(); ++i) { stepwisePropagateForwards(start, end, MONTH, [&](double time) { m_Components[i].propagateForwardsByTime(time / 6.0); m_PredictionErrors[i].age(std::exp(-m_Components[i].decayRate() * time)); }); } } void CTimeSeriesDecompositionDetail::CComponents::CCalendar::clearPredictionErrors() { for (auto& errors : m_PredictionErrors) { errors.clear(); } } std::size_t CTimeSeriesDecompositionDetail::CComponents::CCalendar::size() const { std::size_t result{0}; for (const auto& component : m_Components) { result += component.size(); } return result; } const maths_t::TCalendarComponentVec& CTimeSeriesDecompositionDetail::CComponents::CCalendar::components() const { return m_Components; } bool CTimeSeriesDecompositionDetail::CComponents::CCalendar::haveComponent(CCalendarFeature feature) const { for (const auto& component : m_Components) { if (component.feature() == feature) { return true; } } return false; } void CTimeSeriesDecompositionDetail::CComponents::CCalendar::componentsAndErrors( core_t::TTime time, TCalendarComponentPtrVec& components, TComponentErrorsPtrVec& errors) { std::size_t n = m_Components.size(); components.reserve(n); errors.reserve(n); for (std::size_t i = 0; i < n; ++i) { if (m_Components[i].feature().inWindow(time)) { components.push_back(&m_Components[i]); errors.push_back(&m_PredictionErrors[i]); } } } void CTimeSeriesDecompositionDetail::CComponents::CCalendar::appendPredictions( core_t::TTime time, TDoubleVec& predictions) const { predictions.reserve(predictions.size() + m_Components.size()); for (const auto& component : m_Components) { if (component.feature().inWindow(time)) { predictions.push_back(component.value(time, 0.0).mean() - component.meanValue()); } } } bool CTimeSeriesDecompositionDetail::CComponents::CCalendar::shouldInterpolate(core_t::TTime time) const { return std::any_of(m_Components.begin(), m_Components.end(), [&](const auto& component) { return component.shouldInterpolate(time); }); } void CTimeSeriesDecompositionDetail::CComponents::CCalendar::interpolate(core_t::TTime time, bool refine) { for (auto& component : m_Components) { if (component.shouldInterpolate(time)) { component.interpolate(time, refine); } } } bool CTimeSeriesDecompositionDetail::CComponents::CCalendar::initialized() const { return std::any_of(m_Components.begin(), m_Components.end(), [&](const auto& component) { return component.initialized(); }); } void CTimeSeriesDecompositionDetail::CComponents::CCalendar::add(const CCalendarFeature& feature, core_t::TTime timeZoneOffset, std::size_t size, double decayRate, double bucketLength) { m_Components.emplace_back(feature, timeZoneOffset, size, decayRate, bucketLength, common::CSplineTypes::E_Natural); m_Components.back().initialize(); m_PredictionErrors.resize(m_Components.size()); } void CTimeSeriesDecompositionDetail::CComponents::CCalendar::apply(const CChangePoint& change) { for (auto& component : m_Components) { change.apply(component); } } bool CTimeSeriesDecompositionDetail::CComponents::CCalendar::prune(core_t::TTime time, core_t::TTime bucketLength) { TBoolVec remove(m_Components.size(), false); for (std::size_t i = 0; i < m_Components.size(); ++i) { if (m_PredictionErrors[i].remove(bucketLength, m_Components[i].feature().window())) { LOG_DEBUG(<< "Removing calendar component" << " '" << m_Components[i].feature().print() << "' at " << time); remove[i] = true; } } common::CSetTools::simultaneousRemoveIf([](bool remove_) { return remove_; }, remove, m_Components, m_PredictionErrors); return m_Components.empty(); } bool CTimeSeriesDecompositionDetail::CComponents::CCalendar::removeComponentsWithBadValues(core_t::TTime time) { TBoolVec remove(m_Components.size(), false); bool anyBadComponentsFound{false}; for (std::size_t i = 0; i < m_Components.size(); ++i) { if (m_Components[i].isBad()) { LOG_DEBUG(<< "Removing calendar component" << " '" << m_Components[i].feature().print() << "' at " << time << ". Invalid value detected."); remove[i] = true; anyBadComponentsFound |= true; } } if (anyBadComponentsFound) { common::CSetTools::simultaneousRemoveIf( [](bool remove_) { return remove_; }, remove, m_Components, m_PredictionErrors); return true; } return false; } std::uint64_t CTimeSeriesDecompositionDetail::CComponents::CCalendar::checksum(std::uint64_t seed) const { seed = common::CChecksum::calculate(seed, m_Components); return common::CChecksum::calculate(seed, m_PredictionErrors); } void CTimeSeriesDecompositionDetail::CComponents::CCalendar::debugMemoryUsage( const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CCalendar"); core::memory_debug::dynamicSize("m_Components", m_Components, mem); core::memory_debug::dynamicSize("m_PredictionErrors", m_PredictionErrors, mem); } std::size_t CTimeSeriesDecompositionDetail::CComponents::CCalendar::memoryUsage() const { return core::memory::dynamicSize(m_Components) + core::memory::dynamicSize(m_PredictionErrors); } } } }