lib/maths/time_series/CSignal.cc (980 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <maths/time_series/CSignal.h> #include <core/CLogger.h> #include <core/CVectorRange.h> #include <maths/common/CBasicStatisticsPersist.h> #include <maths/common/CIntegerTools.h> #include <maths/common/CStatisticalTests.h> #include <maths/common/CTools.h> #include <boost/circular_buffer.hpp> #include <boost/math/constants/constants.hpp> #include <algorithm> #include <cmath> #include <cstdint> #include <initializer_list> #include <limits> #include <numeric> #include <tuple> namespace ml { namespace maths { namespace time_series { namespace { using TSizeVec = std::vector<std::size_t>; using TComplex = std::complex<double>; using TComplexVec = std::vector<TComplex>; using TMeanAccumulator = common::CBasicStatistics::SSampleMean<double>::TAccumulator; using TMeanVarAccumulator = common::CBasicStatistics::SSampleMeanVar<double>::TAccumulator; constexpr double LOG_SMALL_OUTLIER_SCALE{0.6931471805599453}; constexpr double LOG_LARGE_OUTLIER_SCALE{2.772588722239781}; //! Scale \p f by \p scale. void scale(double scale, TComplexVec& f) { for (std::size_t i = 0; i < f.size(); ++i) { f[i] *= scale; } } //! Compute the radix 2 FFT of \p f in-place. void radix2fft(TComplexVec& f) { // Perform the appropriate permutation of f(x) by swapping each i in [0, N] // with its bit reversal. std::uint64_t bits = common::CIntegerTools::nextPow2(f.size()) - 1; for (std::uint64_t i = 0; i < f.size(); ++i) { std::uint64_t j{common::CIntegerTools::reverseBits(i) >> (64 - bits)}; if (j > i) { LOG_TRACE(<< j << " -> " << i); std::swap(f[i], f[j]); } } // Apply the twiddle factors. for (std::size_t stride = 1; stride < f.size(); stride <<= 1) { for (std::size_t k = 0; k < stride; ++k) { double t{boost::math::double_constants::pi * static_cast<double>(k) / static_cast<double>(stride)}; TComplex w(std::cos(t), std::sin(t)); for (std::size_t start = k; start + stride < f.size(); start += 2 * stride) { TComplex fs{f[start]}; TComplex tw{w * f[start + stride]}; f[start] = fs + tw; f[start + stride] = fs - tw; } } } std::reverse(f.begin() + 1, f.end()); } } void CSignal::conj(TComplexVec& f) { for (std::size_t i = 0; i < f.size(); ++i) { f[i] = std::conj(f[i]); } } void CSignal::hadamard(const TComplexVec& fx, TComplexVec& fy) { for (std::size_t i = 0; i < fx.size(); ++i) { fy[i] *= fx[i]; } } void CSignal::fft(TComplexVec& f) { std::size_t n{f.size()}; std::size_t p{common::CIntegerTools::nextPow2(n)}; std::size_t m{std::size_t{1} << p}; if ((m >> 1) == n) { radix2fft(f); } else { // We use Bluestein's trick to reformulate as a convolution which can be // computed by padding to a power of 2. LOG_TRACE(<< "Using Bluestein's trick"); m = 2 * n - 1; p = common::CIntegerTools::nextPow2(m); m = std::size_t{1} << p; TComplexVec chirp; chirp.reserve(n); TComplexVec a(m, TComplex{0.0, 0.0}); TComplexVec b(m, TComplex{0.0, 0.0}); chirp.emplace_back(1.0, 0.0); a[0] = f[0] * chirp[0]; b[0] = chirp[0]; for (std::size_t i = 1; i < n; ++i) { double t = boost::math::double_constants::pi * static_cast<double>(i * i) / static_cast<double>(n); chirp.emplace_back(std::cos(t), std::sin(t)); a[i] = f[i] * std::conj(chirp[i]); b[i] = b[m - i] = chirp[i]; } fft(a); fft(b); hadamard(a, b); ifft(b); for (std::size_t i = 0; i < n; ++i) { f[i] = std::conj(chirp[i]) * b[i]; } } } void CSignal::ifft(TComplexVec& f) { conj(f); fft(f); conj(f); scale(1.0 / static_cast<double>(f.size()), f); } double CSignal::cyclicAutocorrelation(const SSeasonalComponentSummary& period, const TFloatMeanAccumulatorVec& values, const TMomentTransformFunc& tranform, const TMomentWeightFunc& weight, double eps) { return cyclicAutocorrelation(period, TFloatMeanAccumulatorCRng(values, 0, values.size()), tranform, weight, eps); } double CSignal::cyclicAutocorrelation(const SSeasonalComponentSummary& period, const TFloatMeanAccumulatorCRng& values, const TMomentTransformFunc& transform, const TMomentWeightFunc& weight, double eps) { TMeanVarAccumulator moments; for (std::size_t i = 0; i < values.size(); ++i) { if (period.contains(i) && common::CBasicStatistics::count(values[i]) > 0.0) { moments.add(transform(values[i]), weight(values[i])); } } double mean{common::CBasicStatistics::mean(moments)}; TMeanAccumulator autocorrelation; for (std::size_t i = 0; i < values.size(); ++i) { if (period.contains(i)) { std::size_t j{period.nextRepeat(i) % values.size()}; if (common::CBasicStatistics::count(values[i]) > 0.0 && common::CBasicStatistics::count(values[j]) > 0.0) { double avgWeight{std::sqrt(weight(values[i]) * weight(values[j]))}; autocorrelation.add( (transform(values[i]) - mean) * (transform(values[j]) - mean), avgWeight); } } } double a{common::CBasicStatistics::mean(autocorrelation)}; double v{common::CBasicStatistics::maximumLikelihoodVariance(moments) + eps}; return a == v ? 1.0 : a / v; } void CSignal::autocorrelations(const TFloatMeanAccumulatorVec& values, TComplexVec& f, TDoubleVec& result) { result.clear(); if (values.empty()) { return; } std::size_t n{values.size()}; TMeanVarAccumulator moments; for (const auto& value : values) { if (common::CBasicStatistics::count(value) > 0.0) { moments.add(common::CBasicStatistics::mean(value)); } } double mean{common::CBasicStatistics::mean(moments)}; double variance{common::CBasicStatistics::maximumLikelihoodVariance(moments)}; if (variance == 0.0) { // The autocorrelation of a constant is zero. result.resize(n, 0.0); return; } f.assign(n, TComplex{0.0, 0.0}); for (std::size_t i = 0; i < n; ++i) { std::size_t j{i}; while (j < n && common::CBasicStatistics::count(values[j]) == 0.0) { ++j; } if (i < j) { // Infer missing values by linearly interpolating. if (j == n) { break; } else if (i > 0) { for (std::size_t k = i; k < j; ++k) { double alpha{static_cast<double>(k - i + 1) / static_cast<double>(j - i + 1)}; double real{common::CBasicStatistics::mean(values[j]) - mean}; f[k] = (1.0 - alpha) * f[i - 1] + alpha * TComplex{real, 0.0}; } } i = j; } f[i] = TComplex{common::CBasicStatistics::mean(values[i]) - mean, 0.0}; } fft(f); TComplexVec fConj(f); conj(fConj); hadamard(fConj, f); ifft(f); result.reserve(n); for (std::size_t i = 1; i < n; ++i) { result.push_back(f[i].real() / variance / static_cast<double>(n)); } } CSignal::SSeasonalComponentSummary CSignal::seasonalComponentSummary(std::size_t period) { return {period, 0, period, TSizeSizePr{0, period}}; } void CSignal::appendSeasonalComponentSummary(std::size_t period, TSeasonalComponentVec& periods) { periods.emplace_back(period, 0, period, TSizeSizePr{0, period}); } CSignal::TSeasonalComponentVec CSignal::seasonalDecomposition(const TFloatMeanAccumulatorVec& values, double outlierFraction, const TSizeSizeSizeTr& diurnal, TOptionalSize startOfWeekOverride, double significantPValue, std::size_t maxComponents) { if (CSignal::countNotMissing(values) < 10) { return {}; } TSeasonalComponentVec result; std::size_t day, week, year; std::tie(day, week, year) = diurnal; std::size_t n{values.size()}; std::size_t missingAtEnd( std::find_if(values.rbegin(), values.rend(), [](const auto& value) { return common::CBasicStatistics::count(value) > 0.0; }) - values.rbegin()); n -= missingAtEnd; std::size_t pad{n / 4}; TSizeVec periods; periods.reserve(pad + 2); periods.resize(pad - 1); std::iota(periods.begin(), periods.end(), 2); for (auto period : {day, week, year}) { if (periods.back() < period && 100 * n > 190 * period) { periods.push_back(period); } } TDoubleVec correlations; TComplexVec placeholder; TFloatMeanAccumulatorVec valuesToTest{values.begin(), values.begin() + n}; TSeasonalComponentVec decomposition; TMeanAccumulatorVecVec components; TSizeVec candidatePeriods; TSizeVec selectedPeriods; std::size_t sizeWithoutComponent{0}; double varianceWithComponent{0.0}; double varianceWithoutComponent{0.0}; double pValue{1.0}; double eps{common::CTools::pow2( 1000.0 * static_cast<double>(std::numeric_limits<float>::epsilon()))}; result.push_back(seasonalComponentSummary(1)); fitSeasonalComponentsRobust(result, outlierFraction, valuesToTest, components); auto H0 = residualVarianceStats(valuesToTest, result, components); result.clear(); do { // Centre the data. double mean{common::CBasicStatistics::mean(std::accumulate( valuesToTest.begin(), valuesToTest.end(), TMeanAccumulator{}, [](auto partialMean, const auto& value) { partialMean.add(common::CBasicStatistics::mean(value)); return partialMean; }))}; for (auto& value : valuesToTest) { common::CBasicStatistics::moment<0>(value) -= mean; } // Compute the serial autocorrelations padding to the maximum offset to // avoid windowing effects. valuesToTest.resize(n + 3 * pad); autocorrelations(valuesToTest, placeholder, correlations); valuesToTest.resize(n); correlations.resize(std::max(3 * pad, periods.back())); // In order to handle with smooth varying functions whose autocorrelation // is high for small offsets we perform an average the serial correlations // of each component over offets P, 2P, ..., mP for mP < n. Note that we // need to correct the correlation for longer offsets for the zero pad we // append. for (auto period : periods) { TMeanAccumulator meanCorrelation; for (std::size_t offset = period; offset < 3 * pad; offset += period) { meanCorrelation.add(static_cast<double>(n) / static_cast<double>(n - offset) * correlations[offset - 1]); } correlations[period - 1] = common::CBasicStatistics::mean(meanCorrelation); LOG_TRACE(<< "correlation(" << period << ") = " << correlations[period - 1]); } auto correlationLess = [&](std::size_t lhs, std::size_t rhs) { return correlations[lhs - 1] < correlations[rhs - 1]; }; std::size_t maxCorrelationPeriod{ *std::max_element(periods.begin(), periods.end(), correlationLess)}; double maxCorrelation{correlations[maxCorrelationPeriod - 1]}; LOG_TRACE(<< "max correlation(" << maxCorrelationPeriod << ") = " << maxCorrelation); sizeWithoutComponent = result.size(); // Exclude if there is a similar period which has significantly higher // correlation for which we haven't yet seen enough repeats. if (maxCorrelation < 0.8 * *std::max_element(correlations.begin() + pad, correlations.begin() + 3 * pad / 2)) { break; } candidatePeriods.clear(); // Prefer shorter periods if the decision is close because the model will // be more accurate. In particular, if we have a divisor of the selected // period whose autocorrelation is within epsilon we'll select that one. double cutoff{0.8 * maxCorrelation}; LOG_TRACE(<< "cutoff = " << cutoff); for (std::size_t i = maxCorrelationPeriod / 2; i >= 2; --i) { std::size_t period{maxCorrelationPeriod / i}; if (maxCorrelationPeriod % period == 0 && correlations[period - 1] > cutoff) { candidatePeriods.push_back(period); break; } } // Check if the selected seasonal component with period p is really an // additive combination of two or more shorter seasonal components, i.e. // if p mod p_i = 0 for all p_i and each component with period p_i has // reasonable autocorrelation. if (candidatePeriods.empty()) { checkForSeasonalDecomposition(correlations, maxCorrelationPeriod, cutoff, maxComponents, candidatePeriods); } LOG_TRACE(<< "candidate periods = " << candidatePeriods); // If we've already selected the candidate components or we've explained // nearly all the variance then stop. if (std::all_of(candidatePeriods.begin(), candidatePeriods.end(), [&](std::size_t period) { return std::find(selectedPeriods.begin(), selectedPeriods.end(), period) != selectedPeriods.end(); }) || varianceWithComponent < eps * varianceWithoutComponent) { break; } selectedPeriods.insert(selectedPeriods.end(), candidatePeriods.begin(), candidatePeriods.end()); valuesToTest.assign(values.begin(), values.begin() + n); // Check for weekend/weekday decomposition if the candidate seasonality // is daily or weekly. if (checkForTradingDayDecomposition( valuesToTest, outlierFraction, day, week, decomposition, components, candidatePeriods, result, startOfWeekOverride, significantPValue)) { valuesToTest.assign(values.begin(), values.begin() + n); } else { for (auto period : candidatePeriods) { appendSeasonalComponentSummary(period, result); } } LOG_TRACE(<< "selected periods = " << result); fitSeasonalComponentsRobust(result, outlierFraction, valuesToTest, components); // Here we use a test of the explained variance vs a null hypothesis // which doesn't include the components. Note that since we find the // maximum the autocorrelation over n = |periods| we expect the chance // of seeing the smallest p-value to be chance that n p-values are all // greater than the observed p-value or 1 - (1 - p)^n. In practice, // this only holds if the p-values are independent, which they clearly // aren't. For any correlation we have morally 1 - (1 - p)^{k n} for // k < 1. We use k = 0.5. auto H1 = residualVarianceStats(valuesToTest, result, components); varianceWithoutComponent = H0.s_ResidualVariance; varianceWithComponent = H1.s_ResidualVariance; pValue = common::CTools::oneMinusPowOneMinusX( nestedDecompositionPValue(H0, H1), 0.5 * static_cast<double>(periods.size())); LOG_TRACE(<< H1.print() << " vs " << H0.print()); LOG_TRACE(<< "p-value = " << pValue << ", p-value to accept = " << significantPValue); H0 = H1; removeComponents(result, components, valuesToTest); } while (pValue < significantPValue && result.size() < maxComponents); result.resize(sizeWithoutComponent); return result; } CSignal::TSeasonalComponentVec CSignal::tradingDayDecomposition(const TFloatMeanAccumulatorVec& values, double outlierFraction, std::size_t week, TOptionalSize startOfWeekOverride, double significantPValue) { using TMeanVarAccumulatorBuffer = boost::circular_buffer<TMeanVarAccumulator>; using TMeanVarAccumulatorBufferVec = std::vector<TMeanVarAccumulatorBuffer>; constexpr std::size_t WEEKEND_DAILY{0}; constexpr std::size_t WEEKDAY_DAILY{1}; if (100 * values.size() < 190 * week || week < 14) { return {}; } std::size_t day{(week + 3) / 7}; std::size_t weekend{(2 * week + 3) / 7}; TFloatMeanAccumulatorVec temporaryValues{values}; // Work on the largest subset of the values which is a multiple week. std::size_t remainder{temporaryValues.size() % week}; TFloatMeanAccumulatorCRng valuesToTest{temporaryValues, 0, temporaryValues.size() - remainder}; std::size_t n{valuesToTest.size()}; LOG_TRACE(<< "number values = " << n << "/" << values.size()); std::size_t startOfWeek{startOfWeekOverride != std::nullopt && *startOfWeekOverride < week ? *startOfWeekOverride : 0}; TSeasonalComponentVec dailyPeriod{seasonalComponentSummary(day)}; TMeanAccumulatorVecVec temporaryComponents; fitSeasonalComponentsRobust(dailyPeriod, outlierFraction, temporaryValues, temporaryComponents); auto dailyHypothesis = residualVarianceStats(temporaryValues, dailyPeriod, temporaryComponents); double epsVariance{ common::CTools::pow2(1000.0 * std::numeric_limits<double>::epsilon()) * [&] { TMeanVarAccumulator moments; for (const auto& value : values) { if (common::CBasicStatistics::count(value) > 0.0) { moments.add(common::CBasicStatistics::mean(value)); } } return common::CBasicStatistics::maximumLikelihoodVariance(moments); }()}; LOG_TRACE(<< "daily variance = " << dailyHypothesis.s_ResidualVariance << " threshold to test = " << epsVariance); if (dailyHypothesis.s_ResidualVariance <= epsVariance) { return {}; } temporaryValues = values; TSeasonalComponentVec weeklyPeriod{seasonalComponentSummary(week)}; TMeanAccumulatorVecVec weeklyComponent; fitSeasonalComponents(weeklyPeriod, values, weeklyComponent); reweightOutliers(weeklyPeriod, weeklyComponent, outlierFraction, temporaryValues); // Compute the partitions' index windows. TSizeSizePr2Vec weekends; TSizeSizePr2Vec weekdays; for (std::size_t i = 0; i < n; i += week) { weekends.emplace_back(i, i + weekend); weekdays.emplace_back(i + weekend, i + week); } std::size_t strides[]{day, day}; TSizeSizePr2Vec partitions[]{weekends, weekdays}; LOG_TRACE(<< "day = " << day << ", weekend = " << weekend); LOG_TRACE(<< "weekends = " << weekends << ", weekdays = " << weekdays); LOG_TRACE(<< "strides = " << strides); LOG_TRACE(<< "partitions = " << partitions); // Initialize the components. TMeanVarAccumulatorBuffer components[]{ TMeanVarAccumulatorBuffer{day, TMeanVarAccumulator{}}, TMeanVarAccumulatorBuffer{day, TMeanVarAccumulator{}}}; TMeanVarAccumulatorBufferVec placeholder(1); auto initialize = [&](const TSizeSizePr& window, TMeanVarAccumulatorBuffer& component) { placeholder[0].swap(component); std::size_t period{placeholder[0].size()}; doFitSeasonalComponents({{period, startOfWeek, week, window}}, valuesToTest, placeholder); placeholder[0].swap(component); }; initialize({0, weekend}, components[WEEKEND_DAILY]); initialize({weekend, week}, components[WEEKDAY_DAILY]); LOG_TRACE(<< "components = " << components); TMeanAccumulator variances[std::size(components)]; for (std::size_t i = 0; i < std::size(components); ++i) { variances[i] = std::accumulate( components[i].begin(), components[i].end(), TMeanAccumulator{}, [](auto variance_, const auto& value) { variance_.add(common::CBasicStatistics::maximumLikelihoodVariance(value), common::CBasicStatistics::count(value)); return variance_; }); } TDoubleVec candidateVariances(week, 0.0); auto captureVarianceAtStartOfWeek = [&](std::size_t i) { candidateVariances[i] = common::CBasicStatistics::mean( variances[WEEKEND_DAILY] + variances[WEEKDAY_DAILY]); }; // Choose the best partition. if (startOfWeekOverride != std::nullopt && *startOfWeekOverride < week) { captureVarianceAtStartOfWeek(startOfWeek); } else { // Compute the variances for each candidate partition. captureVarianceAtStartOfWeek(0); for (std::size_t i = 0; i + 1 < week; ++i) { for (std::size_t j = 0; j < std::size(components); ++j) { TMeanVarAccumulator next; for (const auto& subset : partitions[j]) { for (std::size_t k = i + subset.first + strides[j]; k <= i + subset.second; k += strides[j]) { next.add(common::CBasicStatistics::mean(valuesToTest[k % n]), common::CBasicStatistics::count(valuesToTest[k % n])); } } auto last = components[j].front(); components[j].push_back(next); variances[j] += common::CBasicStatistics::momentsAccumulator( common::CBasicStatistics::count(next), common::CBasicStatistics::maximumLikelihoodVariance(next)); variances[j] -= common::CBasicStatistics::momentsAccumulator( common::CBasicStatistics::count(last), common::CBasicStatistics::maximumLikelihoodVariance(last)); } captureVarianceAtStartOfWeek(i + 1); } double minCost{std::numeric_limits<double>::max()}; startOfWeek = week + 1; // If the choice is marginal, we seek to partition where the time series // value is absolutely small. double varianceThreshold{1.05 * *std::min_element(candidateVariances.begin(), candidateVariances.end())}; for (std::size_t i = 0; i < candidateVariances.size(); ++i) { if (candidateVariances[i] < varianceThreshold) { double cost{0.0}; for (std::size_t k = i; k < i + n; k += week) { double knots[]{ common::CBasicStatistics::mean(valuesToTest[(k + n - 1) % n]), common::CBasicStatistics::mean(valuesToTest[(k + n + 0) % n]), common::CBasicStatistics::mean( valuesToTest[(k + n + weekend - 1) % n]), common::CBasicStatistics::mean( valuesToTest[(k + n + weekend + 0) % n])}; cost += std::fabs(knots[0]) + std::fabs(knots[1]) + std::fabs(knots[2]) + std::fabs(knots[3]) + std::fabs(knots[1] - knots[0]) + std::fabs(knots[3] - knots[2]); } LOG_TRACE(<< "cost(" << i << ") = " << cost); std::tie(minCost, startOfWeek) = std::min( std::make_pair(minCost, startOfWeek), std::make_pair(cost, i)); } } } LOG_TRACE(<< "start of week = " << startOfWeek << "/" << week); // Check if there is reasonable evidence for weekday/weekend partition. TSeasonalComponentVec weekendPeriods{ {week, startOfWeek, week, TSizeSizePr{0 * day, 2 * day}}, {day, startOfWeek, week, TSizeSizePr{2 * day, 7 * day}}}; temporaryValues = values; fitSeasonalComponentsRobust(weekendPeriods, outlierFraction, temporaryValues, temporaryComponents); auto weekendHypothesis = residualVarianceStats(temporaryValues, weekendPeriods, temporaryComponents); double pValue{common::CTools::oneMinusPowOneMinusX( nestedDecompositionPValue(dailyHypothesis, weekendHypothesis), 0.5 * static_cast<double>(week))}; weekendPeriods.emplace_back(day, startOfWeek, week, TSizeSizePr{0 * day, 2 * day}); weekendPeriods.emplace_back(week, startOfWeek, week, TSizeSizePr{2 * day, 7 * day}); std::sort(weekendPeriods.begin(), weekendPeriods.end()); LOG_TRACE(<< "p-value = " << pValue); return pValue >= significantPValue ? TSeasonalComponentVec{} : weekendPeriods; } void CSignal::fitSeasonalComponents(const TSeasonalComponentVec& periods, const TFloatMeanAccumulatorVec& values, TMeanAccumulatorVecVec& components) { doFitSeasonalComponents(periods, values, components); } void CSignal::fitSeasonalComponentsRobust(const TSeasonalComponentVec& periods, double outlierFraction, TFloatMeanAccumulatorVec& values, TMeanAccumulatorVecVec& components) { fitSeasonalComponents(periods, values, components); if (outlierFraction > 0.0) { reweightOutliers(periods, components, outlierFraction, values); fitSeasonalComponents(periods, values, components); } } void CSignal::removeExtremeOutliers(double fraction, TFloatMeanAccumulatorVec& values) { using TMinAccumulator = common::CBasicStatistics::SMin<std::pair<double, std::size_t>>::TAccumulator; using TMaxAccumulator = common::CBasicStatistics::SMax<std::pair<double, std::size_t>>::TAccumulator; using TMinMaxAccumulator = common::CBasicStatistics::CMinMax<double>; std::size_t windowLength{static_cast<std::size_t>( std::ceil(common::CTools::truncate(fraction, 0.0, 1.0) * static_cast<double>(countNotMissing(values))))}; if (windowLength == 0 || windowLength == values.size()) { return; } TMinAccumulator min; TMaxAccumulator max; for (std::size_t i = 0; i < values.size(); ++i) { double x{common::CBasicStatistics::mean(values[i])}; std::size_t n{common::CBasicStatistics::count(values[i]) > 0.0 ? 1U : 0U}; min.add({x, i}, n); max.add({x, i}, n); } double threshold{std::exp(LOG_LARGE_OUTLIER_SCALE)}; TMaxAccumulator outliers; for (auto i : {min[0].second, max[0].second}) { std::size_t end{std::min(i + windowLength, values.size())}; for (std::size_t j = i; j < end; ++j) { TMinMaxAccumulator minmax; for (std::size_t k = 0; k + windowLength <= j; ++k) { minmax.add(common::CBasicStatistics::mean(values[k]), common::CBasicStatistics::count(values[k]) > 0.0 ? 1 : 0); } for (std::size_t k = j + 1; k < values.size(); ++k) { minmax.add(common::CBasicStatistics::mean(values[k]), common::CBasicStatistics::count(values[k]) > 0.0 ? 1 : 0); } if (minmax.initialized()) { TMeanAccumulator level; for (std::size_t k = j - std::min(windowLength - 1, j); k <= j; ++k) { level.add(common::CBasicStatistics::mean(values[k]), common::CBasicStatistics::count(values[k])); } if (common::CBasicStatistics::count(level) > 0.0) { double x{common::CBasicStatistics::mean(level)}; double difference{ (std::max(minmax.min() - x, std::max(x - minmax.max(), 0.0)))}; outliers.add({std::min(difference / minmax.range(), threshold + 1.0), j}); } } } } if (outliers.count() > 0) { auto[size, i] = outliers[0]; if (size > threshold) { for (std::size_t j = i - std::min(windowLength - 1, i); j <= i; ++j) { values[j] = TMeanAccumulator{}; } } } } bool CSignal::reweightOutliers(const TSeasonalComponentVec& periods, const TMeanAccumulatorVecVec& components, double fraction, TFloatMeanAccumulatorVec& values) { auto predictor = [&](std::size_t index) { double value{0.0}; for (std::size_t i = 0; i < components.size(); ++i) { value += periods[i].value(components[i], index); } return value; }; return reweightOutliers(predictor, fraction, values); } bool CSignal::reweightOutliers(const TIndexPredictor& predictor, double fraction, TFloatMeanAccumulatorVec& values) { using TDoubleSizePr = std::pair<double, std::size_t>; using TMaxAccumulator = common::CBasicStatistics::COrderStatisticsHeap<TDoubleSizePr, std::greater<TDoubleSizePr>>; std::size_t numberOutliers{static_cast<std::size_t>([&] { return fraction * static_cast<double>(countNotMissing(values)); }())}; LOG_TRACE(<< "number outliers = " << numberOutliers); if (numberOutliers == 0) { return false; } TMaxAccumulator outliers{2 * numberOutliers}; TMeanAccumulator meanDifference; TMeanVarAccumulator predictionMoments; for (std::size_t i = 0; i < values.size(); ++i) { if (common::CBasicStatistics::count(values[i]) > 0.0) { double prediction{predictor(i)}; double difference{std::fabs(common::CBasicStatistics::mean(values[i]) - prediction)}; outliers.add({difference, i}); meanDifference.add(difference); predictionMoments.add(std::fabs(common::CBasicStatistics::mean(values[i]))); } } if (common::CBasicStatistics::mean(meanDifference) == 0.0) { return false; } outliers.sort(); LOG_TRACE(<< "outliers = " << outliers); TMeanAccumulator meanDifferenceOfOutliers; for (std::size_t i = 0; 4 * i < outliers.count(); ++i) { meanDifferenceOfOutliers.add(outliers[i].first); } meanDifference -= meanDifferenceOfOutliers; double threshold{std::max( 3.0 * common::CBasicStatistics::mean(meanDifference), 0.05 * std::sqrt(common::CBasicStatistics::variance(predictionMoments)))}; LOG_TRACE(<< "threshold = " << common::CBasicStatistics::mean(meanDifference)); bool result{false}; double logThreshold{std::log(threshold)}; for (const auto& outlier : outliers) { double logDifference{std::log(outlier.first)}; double weight{ common::CTools::linearlyInterpolate(logThreshold - LOG_SMALL_OUTLIER_SCALE, logThreshold, 1.0, 0.1, logDifference) * common::CTools::linearlyInterpolate(logThreshold, logThreshold + LOG_LARGE_OUTLIER_SCALE, 1.0, 0.1, logDifference)}; common::CBasicStatistics::count(values[outlier.second]) *= weight; result |= (weight < 1.0); } LOG_TRACE(<< "values - outliers = " << values); return result; } double CSignal::meanNumberRepeatedValues(const TFloatMeanAccumulatorVec& values, const SSeasonalComponentSummary& period) { TDoubleVec counts(period.period(), 0.0); for (std::size_t i = 0; i < values.size(); ++i) { if (common::CBasicStatistics::count(values[i]) > 0.0 && period.contains(i)) { counts[period.offset(i)] += 1.0; } } return common::CBasicStatistics::mean( std::accumulate(counts.begin(), counts.end(), TMeanAccumulator{}, [](TMeanAccumulator mean, double count) { if (count > 0.0) { mean.add(count); } return mean; })); } CSignal::SVarianceStats CSignal::residualVarianceStats(const TFloatMeanAccumulatorVec& values, const TSeasonalComponentVec& periods, const TMeanAccumulatorVecVec& components) { std::size_t numberValues{0}; for (std::size_t i = 0; i < values.size(); ++i) { if (common::CBasicStatistics::count(values[i]) > 0.0 && std::any_of(periods.begin(), periods.end(), [i](const auto& period) { return period.contains(i); })) { ++numberValues; } } std::size_t parameters{0}; for (const auto& component : components) { parameters += countNotMissing(component); } return {residualVariance(values, periods, components, [](const TFloatMeanAccumulator&) { return 1.0; }), residualVariance(values, periods, components), static_cast<double>(numberValues) - static_cast<double>(parameters), parameters}; } double CSignal::residualVariance(const TFloatMeanAccumulatorVec& values, const TSeasonalComponentVec& periods, const TMeanAccumulatorVecVec& components, const TMomentWeightFunc& weight) { TMeanVarAccumulator moments; for (std::size_t i = 0; i < values.size(); ++i) { bool skip{true}; double prediction{0.0}; if (common::CBasicStatistics::count(values[i]) > 0.0) { for (std::size_t j = 0; j < periods.size(); ++j) { if (periods[j].contains(i)) { skip = false; prediction += common::CBasicStatistics::mean( components[j][periods[j].offset(i)]); } } } if (skip == false) { double value{common::CBasicStatistics::mean(values[i])}; moments.add(value - prediction, weight(values[i])); } } return common::CBasicStatistics::maximumLikelihoodVariance(moments); } double CSignal::nestedDecompositionPValue(const SVarianceStats& H0, const SVarianceStats& H1) { if (H1.s_DegreesFreedom <= 0.0 || // Insufficient data to test H1 H1.s_NumberParameters <= H0.s_NumberParameters || // H1 is not nested H0.s_ResidualVariance == 0.0) { // The values were constant return 1.0; } double eps{std::numeric_limits<double>::epsilon()}; double v0[]{H0.s_ResidualVariance, std::max(H0.s_TruncatedResidualVariance, eps * H0.s_ResidualVariance)}; double v1[]{std::max(H1.s_ResidualVariance, eps * v0[0]), std::max(H1.s_TruncatedResidualVariance, eps * v0[1])}; double df[]{static_cast<double>(H1.s_NumberParameters - H0.s_NumberParameters), H1.s_DegreesFreedom}; // This assumes that H1 is nested in H0. return std::min(common::CStatisticalTests::rightTailFTest( std::max(v0[0] - v1[0], 0.0), v1[0], df[0], df[1]), common::CStatisticalTests::rightTailFTest( std::max(v0[1] - v1[1], 0.0), v1[1], df[0], df[1])); } std::size_t CSignal::selectComponentSize(const TFloatMeanAccumulatorVec& values, std::size_t period) { auto interpolate = [&](std::size_t i, const std::size_t* adjacent, const TMeanAccumulatorVec& centres, const TMeanAccumulatorVec& model) { double prediction{0.0}; double adjacentCentres[]{common::CBasicStatistics::mean(centres[adjacent[0]]), common::CBasicStatistics::mean(centres[adjacent[1]]), common::CBasicStatistics::mean(centres[adjacent[2]])}; double distances[]{std::fabs(adjacentCentres[0] - static_cast<double>(i)), std::fabs(adjacentCentres[1] - static_cast<double>(i)), std::fabs(adjacentCentres[2] - static_cast<double>(i))}; double Z{0.0}; for (std::size_t j = 0; j < 3; ++j) { distances[j] = std::min(distances[j], static_cast<double>(period) - distances[j]); double weight{std::exp(-distances[j])}; prediction += weight * common::CBasicStatistics::mean(model[adjacent[j]]); Z += weight; } return prediction / Z; }; auto residualVariance = [&](const TMeanAccumulatorVec& centres, const TMeanAccumulatorVec& model) { TMeanVarAccumulator moments; std::size_t compression{period / model.size()}; for (std::size_t i = 0; i < values.size(); ++i) { std::size_t bucket{(i % period) / compression}; std::size_t indices[]{(bucket + model.size() - 1) % model.size(), (bucket + model.size() + 0) % model.size(), (bucket + model.size() + 1) % model.size()}; double prediction{interpolate(i % period, indices, centres, model)}; moments.add(common::CBasicStatistics::mean(values[i]) - prediction, common::CBasicStatistics::count(values[i])); } return common::CBasicStatistics::maximumLikelihoodVariance(moments); }; TMeanAccumulatorVec centres(period); for (std::size_t i = 0; i < centres.size(); ++i) { centres[i].add(static_cast<double>(i % period)); } TMeanAccumulatorVecVec component; fitSeasonalComponents({seasonalComponentSummary(period)}, values, component); TMeanAccumulatorVec compressedComponent{std::move(component[0])}; std::size_t size{period}; std::size_t H0{1}; double degreesFreedom[]{static_cast<double>(values.size() - period), 0.0}; double variances[]{residualVariance(centres, compressedComponent), 0.0}; for (std::size_t i = 2; i <= period / 2; ++i) { if (period % i == 0) { LOG_TRACE(<< "size = " << period / i); centres.assign(period / i, TMeanAccumulator{}); compressedComponent.assign(period / i, TMeanAccumulator{}); for (std::size_t j = 0; j < values.size(); ++j) { std::size_t bucket{(j % period) / i}; centres[bucket].add(static_cast<double>(j % period), common::CBasicStatistics::count(values[j])); compressedComponent[bucket].add( common::CBasicStatistics::mean(values[j]), common::CBasicStatistics::count(values[j])); } LOG_TRACE(<< "centres = " << centres); degreesFreedom[H0] = static_cast<double>(values.size() - period / i); variances[H0] = residualVariance(centres, compressedComponent); LOG_TRACE(<< "degrees freedom = " << degreesFreedom); LOG_TRACE(<< "variances = " << variances); if (common::CStatisticalTests::rightTailFTest( variances[H0], variances[1 - H0], degreesFreedom[H0], degreesFreedom[1 - H0]) < 0.1) { break; } if (common::CStatisticalTests::rightTailFTest( variances[1 - H0], variances[H0], degreesFreedom[1 - H0], degreesFreedom[H0]) < 0.1) { H0 = 1 - H0; } size = compressedComponent.size(); } } LOG_TRACE(<< "selected size = " << size); return size; } CSignal::TMeanAccumulatorVec CSignal::smoothResample(std::size_t size, TMeanAccumulatorVec component) { if (size >= component.size()) { return component; } // Pass size of zero is really an error. We'll handle this as best we can // and log an error. if (size == 0) { LOG_ERROR(<< "Smooth resample passed component size of zero! Using one."); size = 1; } // Smooth by convolving with a triangle function. int n{static_cast<int>(component.size())}; int width{static_cast<int>((n + size - 1) / size)}; TMeanAccumulatorVec smooth(n); for (int i = 0; i < n; ++i) { if (common::CBasicStatistics::count(component[i]) > 0.0) { double Z{0.0}; for (int j = i - width + 1; j < i + width; ++j) { const auto& value = component[(n + j) % n]; if (common::CBasicStatistics::count(value) > 0.0) { double weight{static_cast<double>( width - (std::max(i, j) - std::min(i, j)))}; smooth[i].add(common::CBasicStatistics::mean(value), weight * common::CBasicStatistics::count(value)); Z += weight; } } common::CBasicStatistics::count(smooth[i]) /= Z; } } return smooth; } CSignal::TPredictor CSignal::bucketPredictor(const TPredictor& predictor, core_t::TTime bucketsStartTime, core_t::TTime bucketLength, core_t::TTime bucketAverageSampleTime, core_t::TTime sampleInterval) { // We assume that we have samples at some approximately fixed interval l and // a bucket length L >= l. We also know // // bucketAverageSampleTime = l / L sum_{0<=i<n}{ t0 + i l } (1) // // where n = L / l. This summation is n t0 + n(n - 1)/2 l and we can use this // to solve (1) for t0. The prediction of the bucket average for the bucket // starting at t is then // // 1 / n sum_{0<=i<n}{ p(t + t0 + i * l) } double n{static_cast<double>(bucketLength) / static_cast<double>(sampleInterval)}; core_t::TTime offset{static_cast<core_t::TTime>( std::max(static_cast<double>(bucketAverageSampleTime) - 0.5 * (n - 1.0) * static_cast<double>(sampleInterval), 0.0))}; return [=](core_t::TTime time) { TMeanAccumulator result; time += bucketsStartTime; for (core_t::TTime dt = offset; dt < bucketLength; dt += sampleInterval) { result.add(predictor(time + dt)); } return common::CBasicStatistics::mean(result); }; } void CSignal::removeComponents(const TSeasonalComponentVec& periods, const TMeanAccumulatorVecVec& components, TFloatMeanAccumulatorVec& values) { for (std::size_t i = 0; i < values.size(); ++i) { if (common::CBasicStatistics::count(values[i]) > 0.0) { for (std::size_t j = 0; j < components.size(); ++j) { common::CBasicStatistics::moment<0>(values[i]) -= periods[j].value(components[j], i); } } } } void CSignal::checkForSeasonalDecomposition(const TDoubleVec& correlations, std::size_t maxCorrelationPeriod, double cutoff, std::size_t maxComponents, TSizeVec& candidatePeriods) { double correlation{0.0}; while (candidatePeriods.size() < maxComponents) { std::size_t nextPeriod{0}; double nextCorrelation{0.0}; for (std::size_t i = maxCorrelationPeriod / 2; i >= 2; --i) { std::size_t period{maxCorrelationPeriod / i}; if (maxCorrelationPeriod % period == 0 && std::find_if(candidatePeriods.begin(), candidatePeriods.end(), [&](const auto& candidatePeriod) { return candidatePeriod % period == 0 || period % candidatePeriod == 0; }) == candidatePeriods.end() && correlations[period - 1] > nextCorrelation) { nextPeriod = period; nextCorrelation = correlations[period - 1]; } } if (nextCorrelation > 0.2 && correlation < cutoff) { candidatePeriods.push_back(nextPeriod); correlation += nextCorrelation; } else { break; } } if (candidatePeriods.empty() || correlation < cutoff) { candidatePeriods.assign(1, maxCorrelationPeriod); } } bool CSignal::checkForTradingDayDecomposition(TFloatMeanAccumulatorVec& values, double outlierFraction, std::size_t day, std::size_t week, TSeasonalComponentVec& decomposition, TMeanAccumulatorVecVec& components, TSizeVec& candidatePeriods, TSeasonalComponentVec& result, TOptionalSize startOfWeekOverride, double significantPValue) { if (std::find_if(candidatePeriods.begin(), candidatePeriods.end(), [&](const auto& period) { return period == week; }) != candidatePeriods.end()) { decomposition.clear(); for (const auto& period : candidatePeriods) { if (period != day && period != week) { appendSeasonalComponentSummary(period, decomposition); } } if (decomposition.size() > 0) { fitSeasonalComponents(decomposition, values, components); removeComponents(decomposition, components, values); } decomposition = tradingDayDecomposition( values, outlierFraction, week, startOfWeekOverride, significantPValue); if (decomposition.size() > 0) { for (std::size_t i = 0; i + 1 < candidatePeriods.size(); ++i) { if (candidatePeriods[i] != day) { appendSeasonalComponentSummary(candidatePeriods[i], result); } else { result.insert(result.end(), decomposition.begin(), decomposition.end()); decomposition.clear(); } } result.insert(result.end(), decomposition.begin(), decomposition.end()); } } return candidatePeriods.back() == week; } template<typename VALUES, typename COMPONENT> void CSignal::doFitSeasonalComponents(const TSeasonalComponentVec& periods, const VALUES& values, std::vector<COMPONENT>& components) { if (periods.empty()) { return; } LOG_TRACE(<< "periods = " << periods); components.resize(periods.size()); for (std::size_t i = 0; i < periods.size(); ++i) { components[i].assign(periods[i].period(), typename COMPONENT::value_type{}); } // The iterative scheme is as follows: // 1. Minimize MSE for period p(i) w.r.t. {values - components(j != i)} // holding other components fixed, // 2. Set i = i + 1 % "number components". // // Note that the iterative refinements are only necessary if the number // of values isn't a multiple of the least common multiple of the periods. // We could check this but the iterations are really cheap so just always // iterating is fast enough. std::size_t iterations{components.size() == 1 ? 1 : 2 * components.size()}; for (std::size_t iteration = 0; iteration < iterations; ++iteration) { std::size_t i{iteration % components.size()}; auto predictor = [&](std::size_t index) { double value{0.0}; for (std::size_t j = 0; j < i; ++j) { value += periods[j].value(components[j], index); } for (std::size_t j = i + 1; j < components.size(); ++j) { value += periods[j].value(components[j], index); } return value; }; fitSeasonalComponentsMinusPrediction(periods[i], predictor, values, components[i]); } } template<typename PREDICTOR, typename VALUES, typename COMPONENT> void CSignal::fitSeasonalComponentsMinusPrediction(const SSeasonalComponentSummary& period, const PREDICTOR& predictor, const VALUES& values, COMPONENT& component) { if (period.period() > 0) { component.assign(period.period(), typename COMPONENT::value_type{}); for (std::size_t i = 0; i < values.size(); ++i) { if (period.contains(i)) { component[period.offset(i)].add( common::CBasicStatistics::mean(values[i]) - predictor(i), common::CBasicStatistics::count(values[i])); } } } } } } }