lib/model/CAnomalyScore.cc (808 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <model/CAnomalyScore.h> #include <core/CContainerPrinter.h> #include <core/CIEEE754.h> #include <core/CJsonStatePersistInserter.h> #include <core/CJsonStateRestoreTraverser.h> #include <core/CLogger.h> #include <core/CPersistUtils.h> #include <core/Constants.h> #include <core/RestoreMacros.h> #include <maths/common/CBasicStatistics.h> #include <maths/common/CBasicStatisticsPersist.h> #include <maths/common/CChecksum.h> #include <maths/common/COrderings.h> #include <maths/common/CTools.h> #include <maths/common/Constants.h> #include <maths/common/ProbabilityAggregators.h> #include <model/CAnomalyDetectorModelConfig.h> #include <cstdint> #include <sstream> #include <vector> namespace ml { namespace model { namespace { using TDoubleVec = std::vector<double>; //! Add valid \p probabilities to \p aggregator and return the //! number of valid probabilities. template<typename AGGREGATOR> std::size_t addProbabilities(const TDoubleVec& probabilities, AGGREGATOR& aggregator) { std::size_t n = 0; for (auto p : probabilities) { if (!(p >= 0.0 && p <= 1.0)) { LOG_ERROR(<< "Invalid probability " << p); } else { ++n; aggregator.add(p); } } return n; } //! The function to convert probabilities to *raw* scores. double probabilityToScore(double probability) { return maths::common::CTools::anomalyScore(probability); } //! The function to convert *raw* scores to probabilities. double scoreToProbability(double score) { return maths::common::CTools::inverseAnomalyScore(score); } // We use short field names to reduce the state size const std::string HIGH_PERCENTILE_SCORE_TAG("a"); const std::string HIGH_PERCENTILE_COUNT_TAG("b"); //const std::string MAX_SCORE_TAG("c"); Used in versions < 6.5 const std::string RAW_SCORE_QUANTILE_SUMMARY("d"); const std::string RAW_SCORE_HIGH_QUANTILE_SUMMARY("e"); const std::string TIME_TO_QUANTILE_DECAY_TAG("f"); // Since version >= 6.5 const std::string MAX_SCORES_PER_PARTITION_TAG("g"); const std::string IS_FOR_MEMBERS_OF_POPULATION_TAG("h"); // Since version >= 8.4 const std::string COUNT_SINCE_LAST_SAMPLE_TAG("i"); const std::string SAMPLE_TAG("j"); // Nested const std::string MAX_SCORE_TAG("a"); const std::string TIME_SINCE_LAST_SCORE_TAG("b"); const std::string EMPTY_STRING; // This is the version to assume for the format that doesn't contain a version // attribute - NEVER CHANGE THIS const std::string MISSING_VERSION_FORMAT_VERSION("1"); const std::string TIME_INFLUENCER("bucket_time"); } const std::string CAnomalyScore::MLCUE_ATTRIBUTE("mlcue"); const std::string CAnomalyScore::MLKEY_ATTRIBUTE("mlkey"); const std::string CAnomalyScore::MLQUANTILESDESCRIPTION_ATTRIBUTE("mlquantilesdescription"); const std::string CAnomalyScore::MLVERSION_ATTRIBUTE("mlversion"); const std::string CAnomalyScore::TIME_ATTRIBUTE("time"); // Increment this every time a change to the format is made that requires // existing state to be discarded const std::string CAnomalyScore::CURRENT_FORMAT_VERSION("3"); const std::string CAnomalyScore::WARNING_SEVERITY("warning"); const std::string CAnomalyScore::MINOR_SEVERITY("minor"); const std::string CAnomalyScore::MAJOR_SEVERITY("major"); const std::string CAnomalyScore::CRITICAL_SEVERITY("critical"); bool CAnomalyScore::compute(double jointProbabilityWeight, double extremeProbabilityWeight, std::size_t minExtremeSamples, std::size_t maxExtremeSamples, double maximumAnomalousProbability, const TDoubleVec& probabilities, double& overallAnomalyScore, double& overallProbability) { overallAnomalyScore = 0.0; overallProbability = 1.0; if (probabilities.empty()) { // Nothing to do. return true; } maths::common::CLogJointProbabilityOfLessLikelySamples logPJointCalculator; std::size_t n = std::min(addProbabilities(probabilities, logPJointCalculator), maxExtremeSamples); // Note the upper bound is significantly tighter, so we just // use that in the following calculation. double logPJoint; if (!logPJointCalculator.calculateUpperBound(logPJoint)) { LOG_ERROR(<< "Unable to calculate anomaly score" << ", probabilities = " << probabilities); return false; } // Sanity check the probability not greater than 1.0. if (logPJoint > 0.0) { LOG_ERROR(<< "Invalid log joint probability " << logPJoint << ", probabilities = " << probabilities); return false; } double logPExtreme = 0.0; for (std::size_t m = 1, i = maths::common::CTools::truncate(minExtremeSamples, m, n); i <= n; ++i) { maths::common::CLogProbabilityOfMFromNExtremeSamples logPExtremeCalculator(i); addProbabilities(probabilities, logPExtremeCalculator); double logPi; if (!logPExtremeCalculator.calibrated(logPi)) { LOG_ERROR(<< "Unable to calculate anomaly score" << ", probabilities = " << probabilities); return false; } if (logPi < logPExtreme) { logPExtreme = logPi; } } // Sanity check the probability in the range [0, 1]. if (logPExtreme > 0.0) { LOG_ERROR(<< "Invalid log extreme probability " << logPExtreme << ", probabilities = " << probabilities); return false; } double logMaximumAnomalousProbability = std::log(maximumAnomalousProbability); if (logPJoint > logMaximumAnomalousProbability && logPExtreme > logMaximumAnomalousProbability) { overallProbability = std::exp(jointProbabilityWeight * logPJoint) * std::exp(extremeProbabilityWeight * logPExtreme); return true; } // The following extends the range of the joint probability to // [e^-100000, 1]. static const double NORMAL_RANGE_SCORE_FRACTION = 0.8; static const double LOG_SMALLEST_PROBABILITY = std::log(maths::common::CTools::smallestProbability()); static const double SMALLEST_PROBABILITY_DEVIATION = probabilityToScore(maths::common::CTools::smallestProbability()); static const double SMALLEST_LOG_JOINT_PROBABILTY = -100000.0; static const double SMALLEST_LOG_EXTREME_PROBABILTY = -1500.0; if (logPJoint < LOG_SMALLEST_PROBABILITY) { double interpolate = std::min((logPJoint - LOG_SMALLEST_PROBABILITY) / (SMALLEST_LOG_JOINT_PROBABILTY - LOG_SMALLEST_PROBABILITY), 1.0); overallAnomalyScore = (NORMAL_RANGE_SCORE_FRACTION + (1.0 - NORMAL_RANGE_SCORE_FRACTION) * interpolate) * jointProbabilityWeight * SMALLEST_PROBABILITY_DEVIATION; } else { overallAnomalyScore = NORMAL_RANGE_SCORE_FRACTION * jointProbabilityWeight * probabilityToScore(std::exp(logPJoint)); } if (logPExtreme < LOG_SMALLEST_PROBABILITY) { double interpolate = std::min((logPExtreme - LOG_SMALLEST_PROBABILITY) / (SMALLEST_LOG_EXTREME_PROBABILTY - LOG_SMALLEST_PROBABILITY), 1.0); overallAnomalyScore += (NORMAL_RANGE_SCORE_FRACTION + (1.0 - NORMAL_RANGE_SCORE_FRACTION) * interpolate) * extremeProbabilityWeight * SMALLEST_PROBABILITY_DEVIATION; } else { overallAnomalyScore += NORMAL_RANGE_SCORE_FRACTION * extremeProbabilityWeight * probabilityToScore(std::exp(logPExtreme)); } // Invert the deviation in the region it is 1-to-1 otherwise // use the weighted harmonic mean. overallProbability = overallAnomalyScore > 0.0 ? scoreToProbability(std::min(overallAnomalyScore / NORMAL_RANGE_SCORE_FRACTION, SMALLEST_PROBABILITY_DEVIATION)) : std::exp(jointProbabilityWeight * logPJoint) * std::exp(extremeProbabilityWeight * logPExtreme); LOG_TRACE(<< "logJointProbability = " << logPJoint << ", jointProbabilityWeight = " << jointProbabilityWeight << ", logExtremeProbability = " << logPExtreme << ", extremeProbabilityWeight = " << extremeProbabilityWeight << ", overallProbability = " << overallProbability << ", overallAnomalyScore = " << overallAnomalyScore << ", # probabilities = " << probabilities.size() << ", probabilities = " << probabilities); return true; } CAnomalyScore::CComputer::CComputer(double jointProbabilityWeight, double extremeProbabilityWeight, std::size_t minExtremeSamples, std::size_t maxExtremeSamples, double maximumAnomalousProbability) : m_JointProbabilityWeight(jointProbabilityWeight), m_ExtremeProbabilityWeight(extremeProbabilityWeight), m_MinExtremeSamples(std::min(minExtremeSamples, maxExtremeSamples)), m_MaxExtremeSamples(maxExtremeSamples), m_MaximumAnomalousProbability(maximumAnomalousProbability) { } bool CAnomalyScore::CComputer::operator()(const TDoubleVec& probabilities, double& overallAnomalyScore, double& overallProbability) const { return CAnomalyScore::compute(m_JointProbabilityWeight, m_ExtremeProbabilityWeight, m_MinExtremeSamples, m_MaxExtremeSamples, m_MaximumAnomalousProbability, probabilities, overallAnomalyScore, overallProbability); } CAnomalyScore::CNormalizer::CNormalizer(const CAnomalyDetectorModelConfig& config) : m_NoisePercentile(config.noisePercentile()), m_NoiseMultiplier(config.noiseMultiplier()), m_NormalizedScoreKnotPoints(config.normalizedScoreKnotPoints()), m_HighPercentileScore(std::numeric_limits<std::uint32_t>::max()), m_CountPerSample(core::constants::HOUR / maths::common::CTools::truncate(2 * config.bucketLength(), 5 * core::constants::MINUTE, core::constants::HOUR)), m_RawScoreQuantileSummary(201, config.decayRate()), m_RawScoreHighQuantileSummary(201, config.decayRate()), m_DecayRate(config.decayRate() * std::max(static_cast<double>(config.bucketLength()) / static_cast<double>(CAnomalyDetectorModelConfig::STANDARD_BUCKET_LENGTH), 1.0)), m_TimeToQuantileDecay(QUANTILE_DECAY_TIME) { } bool CAnomalyScore::CNormalizer::canNormalize() const { return m_RawScoreQuantileSummary.n() > 0; } bool CAnomalyScore::CNormalizer::normalize(const CMaximumScoreScope& scope, double& score) const { if (score == 0.0) { // Nothing to do. return true; } if (m_RawScoreQuantileSummary.n() == 0) { score = 0.0; return true; } LOG_TRACE(<< "Normalising " << score); double normalizedScores[]{m_MaximumNormalizedScore, m_MaximumNormalizedScore, m_MaximumNormalizedScore, m_MaximumNormalizedScore}; std::uint32_t discreteScore = this->discreteScore(score); // Our normalized score is the minimum of a set of different // score ceilings. The idea is that we have a number of factors // which can reduce the score based on the other score values // observed so far and the absolute score for some of our // functions. We want the following properties: // 1) The noise like scores, i.e. the smallest scores which // relatively frequently, to have low normalized score. // 2) Higher resolution of the highest scores we've seen. // 3) The highest raw score ever observed has the highest // normalized score. // 4) We don't want to generate significant anomalies for // moderately unusual events before we have enough history // to assess their probability accurately. // // To ensure 1) we use a ceiling for the score that is a // multiple of a low(ish) percentile score. To ensure 2) we // use non-linear (actually piecewise linear) mapping from // percentiles to scores, i.e. low percentile map to a small // normalized score range and high percentiles map to a large // normalized score range. Finally to ensure 3) we // use a ceiling which is linear interpolation between a raw // score of zero and the maximum ever raw score for the partition. // To achieve 4), we cap the maximum score such that probabilities // near the cutoff don't generate large normalized scores. // Compute the noise ceiling. Note that since the scores are // logarithms (base e) the difference corresponds to the scaled, // 10 / log(10), signal strength in dB. By default a signal of // 75dB corresponds to a normalized score of 75. Note that if // the noise percentile is zero then really it is unknown, // since scores are truncated to zero. In this case we don't // want this term to constrain the normalized score. However, // we also want this term to be smooth, i.e. the score should // be nearly continuous when F(0) = pn. Here, F(.) denotes the // c.d.f. of the score and pn the noise percentile. We achieve // this by adding "max score" * min(F(0) / "noise percentile", // to the score. std::uint32_t noiseScore; m_RawScoreQuantileSummary.quantile(m_NoisePercentile / 100.0, noiseScore); auto knotPoint = std::lower_bound(m_NormalizedScoreKnotPoints.begin(), m_NormalizedScoreKnotPoints.end(), TDoubleDoublePr(m_NoisePercentile, 0.0)); double signalStrength = m_NoiseMultiplier * 10.0 / DISCRETIZATION_FACTOR * (static_cast<double>(discreteScore) - static_cast<double>(noiseScore)); double l0; double u0; m_RawScoreQuantileSummary.cdf(0, 0.0, l0, u0); normalizedScores[0] = knotPoint->second * std::max(1.0 + signalStrength, 0.0) + m_MaximumNormalizedScore * std::max(2.0 * std::min(50.0 * (l0 + u0) / m_NoisePercentile, 1.0) - 1.0, 0.0); LOG_TRACE(<< "normalizedScores[0] = " << normalizedScores[0] << ", knotPoint = " << knotPoint->second << ", discreteScore = " << discreteScore << ", noiseScore = " << noiseScore << ", l(0) = " << l0 << ", u(0) = " << u0 << ", signalStrength = " << signalStrength); // Compute the raw normalized score percentile compared to historic values. double lowerPercentile; double upperPercentile; this->quantile(score, 70.0 /*confidence interval*/, lowerPercentile, upperPercentile); lowerPercentile *= 100.0; upperPercentile *= 100.0; if (lowerPercentile > upperPercentile) { std::swap(lowerPercentile, upperPercentile); } lowerPercentile = maths::common::CTools::truncate(lowerPercentile, 0.0, 100.0); upperPercentile = maths::common::CTools::truncate(upperPercentile, 0.0, 100.0); std::size_t lowerKnotPoint = std::max(std::lower_bound(m_NormalizedScoreKnotPoints.begin(), m_NormalizedScoreKnotPoints.end(), lowerPercentile, maths::common::COrderings::SFirstLess()) - m_NormalizedScoreKnotPoints.begin(), std::ptrdiff_t(1)); std::size_t upperKnotPoint = std::max(std::lower_bound(m_NormalizedScoreKnotPoints.begin(), m_NormalizedScoreKnotPoints.end(), upperPercentile, maths::common::COrderings::SFirstLess()) - m_NormalizedScoreKnotPoints.begin(), std::ptrdiff_t(1)); if (lowerKnotPoint < m_NormalizedScoreKnotPoints.size()) { const TDoubleDoublePr& left = m_NormalizedScoreKnotPoints[lowerKnotPoint - 1]; const TDoubleDoublePr& right = m_NormalizedScoreKnotPoints[lowerKnotPoint]; normalizedScores[1] = maths::common::CTools::linearlyInterpolate( left.first, right.first, left.second, right.second, lowerPercentile); } else { normalizedScores[1] = m_MaximumNormalizedScore; } if (upperKnotPoint < m_NormalizedScoreKnotPoints.size()) { const TDoubleDoublePr& left = m_NormalizedScoreKnotPoints[upperKnotPoint - 1]; const TDoubleDoublePr& right = m_NormalizedScoreKnotPoints[upperKnotPoint]; normalizedScores[1] = (normalizedScores[1] + maths::common::CTools::linearlyInterpolate( left.first, right.first, left.second, right.second, upperPercentile)) / 2.0; } else { normalizedScores[1] = (normalizedScores[1] + m_MaximumNormalizedScore) / 2.0; } LOG_TRACE(<< "normalizedScores[1] = " << normalizedScores[1] << ", lowerPercentile = " << lowerPercentile << ", upperPercentile = " << upperPercentile); double maxScore{0.0}; bool hasValidMaxScore = this->maxScore(scope, maxScore); LOG_TRACE(<< "hasValidMaxScore = " << hasValidMaxScore << ", scope = " << scope.print()); if (hasValidMaxScore) { // Compute the maximum score ceiling double ratio = score / maxScore; normalizedScores[2] = m_MaximumNormalizedScore * std::min({0.0 + 1.5 * ratio, 0.5 + 0.5 * ratio}); LOG_TRACE(<< "normalizedScores[2] = " << normalizedScores[2] << ", score = " << score << ", maxScore = " << maxScore << ", ratio = " << ratio); } // Logarithmically interpolate the maximum score between the // largest significant and small probability. static const double M = (probabilityToScore(maths::common::SMALL_PROBABILITY) - probabilityToScore(maths::common::LARGEST_SIGNIFICANT_PROBABILITY)) / (std::log(maths::common::SMALL_PROBABILITY) - std::log(maths::common::LARGEST_SIGNIFICANT_PROBABILITY)); static const double C = std::log(maths::common::LARGEST_SIGNIFICANT_PROBABILITY); normalizedScores[3] = m_MaximumNormalizedScore * (0.95 * M * (std::log(scoreToProbability(score)) - C) + 0.05); LOG_TRACE(<< "normalizedScores[3] = " << normalizedScores[3] << ", score = " << score << ", probability = " << scoreToProbability(score)); score = std::min(*std::min_element(std::begin(normalizedScores), std::end(normalizedScores)), m_MaximumNormalizedScore); LOG_TRACE(<< "normalizedScore = " << score << ", scope = " << scope.print()); return true; } void CAnomalyScore::CNormalizer::quantile(double score, double confidence, double& lowerBound, double& upperBound) const { std::uint32_t discreteScore = this->discreteScore(score); double n = static_cast<double>(m_RawScoreQuantileSummary.n()); double lowerQuantile = (100.0 - confidence) / 200.0; double upperQuantile = (100.0 + confidence) / 200.0; double h = static_cast<double>(m_HighPercentileCount); double f = h / n; if (!(f >= 0.0 && f <= 1.0)) { f = maths::common::CTools::truncate(f, 0.0, 1.0); LOG_ERROR(<< "h = " << h << ", n = " << n); } double fl = maths::common::CQDigest::cdfQuantile(n, f, lowerQuantile); double fu = maths::common::CQDigest::cdfQuantile(n, f, upperQuantile); if (discreteScore <= m_HighPercentileScore || m_RawScoreHighQuantileSummary.n() == 0) { m_RawScoreQuantileSummary.cdf(discreteScore, 0.0, lowerBound, upperBound); double pdfLowerBound; double pdfUpperBound; m_RawScoreQuantileSummary.pdf(discreteScore, 0.0, pdfLowerBound, pdfUpperBound); lowerBound = maths::common::CTools::truncate(lowerBound - pdfUpperBound, 0.0, fl); upperBound = maths::common::CTools::truncate(upperBound - pdfLowerBound, 0.0, fu); if (!(lowerBound >= 0.0 && lowerBound <= 1.0) || !(upperBound >= 0.0 && upperBound <= 1.0)) { LOG_ERROR(<< "score = " << score << ", cdf = [" << lowerBound << "," << upperBound << "]" << ", pdf = [" << pdfLowerBound << "," << pdfUpperBound << "]"); } lowerBound = maths::common::CQDigest::cdfQuantile(n, lowerBound, lowerQuantile); upperBound = maths::common::CQDigest::cdfQuantile(n, upperBound, upperQuantile); LOG_TRACE(<< "score = " << score << ", cdf = [" << lowerBound << "," << upperBound << "]" << ", pdf = [" << pdfLowerBound << "," << pdfUpperBound << "]"); return; } // Note if cutoffUpperBound were ever equal to one to working // precision then lowerBound will be zero. The same is true // for the cutoffLowerBound and upperBound. In practice both // these situations should never happen but we trap them // to avoid NaNs in the following calculation. m_RawScoreHighQuantileSummary.cdf(discreteScore, 0.0, lowerBound, upperBound); double cutoffCdfLowerBound; double cutoffCdfUpperBound; m_RawScoreHighQuantileSummary.cdf(m_HighPercentileScore, 0.0, cutoffCdfLowerBound, cutoffCdfUpperBound); double pdfLowerBound; double pdfUpperBound; m_RawScoreHighQuantileSummary.pdf(discreteScore, 0.0, pdfLowerBound, pdfUpperBound); lowerBound = fl + (1.0 - fl) * std::max(lowerBound - cutoffCdfUpperBound - pdfUpperBound, 0.0) / std::max(1.0 - cutoffCdfUpperBound, std::numeric_limits<double>::epsilon()); upperBound = fu + (1.0 - fu) * std::max(upperBound - cutoffCdfLowerBound - pdfLowerBound, 0.0) / std::max(1.0 - cutoffCdfLowerBound, std::numeric_limits<double>::epsilon()); if (!(lowerBound >= 0.0 && lowerBound <= 1.0) || !(upperBound >= 0.0 && upperBound <= 1.0)) { LOG_ERROR(<< "score = " << score << ", cdf = [" << lowerBound << "," << upperBound << "]" << ", cutoff = [" << cutoffCdfLowerBound << "," << cutoffCdfUpperBound << "]" << ", pdf = [" << pdfLowerBound << "," << pdfUpperBound << "]" << ", f = " << f); } lowerBound = maths::common::CQDigest::cdfQuantile(n, lowerBound, lowerQuantile); upperBound = maths::common::CQDigest::cdfQuantile(n, upperBound, upperQuantile); LOG_TRACE(<< "score = " << score << ", cdf = [" << lowerBound << "," << upperBound << "]" << ", cutoff = [" << cutoffCdfLowerBound << "," << cutoffCdfUpperBound << "]" << ", pdf = [" << pdfLowerBound << "," << pdfUpperBound << "]" << ", f = " << f); } bool CAnomalyScore::CNormalizer::updateQuantiles(const CMaximumScoreScope& scope, double score) { using TUInt32UInt64Pr = std::pair<std::uint32_t, std::uint64_t>; using TUInt32UInt64PrVec = std::vector<TUInt32UInt64Pr>; CMaxScore& maxScore = m_MaxScores[scope.key(m_IsForMembersOfPopulation, m_Dictionary)]; double oldMaxScore{maxScore.score()}; maxScore.add(score); bool bigChange{maxScore.score() > BIG_CHANGE_FACTOR * oldMaxScore}; LOG_TRACE(<< "updateQuantiles: scope = " << scope.print() << ", oldMaxScore = " << oldMaxScore << ", score = " << score << ", newMaxScore = " << maxScore.score() << ", bigChange = " << bigChange); m_Sample = std::max(m_Sample, score); if (++m_CountSinceLastSample < m_CountPerSample) { return bigChange; } std::uint32_t discreteScore = this->discreteScore(m_Sample); LOG_TRACE(<< "score = " << m_Sample << ", discreteScore = " << discreteScore << ", maxScore = " << maxScore.score() << ", scope = " << scope.print()); m_CountSinceLastSample = 0; m_Sample = 0.0; std::uint64_t n = m_RawScoreQuantileSummary.n(); std::uint64_t k = m_RawScoreQuantileSummary.k(); LOG_TRACE(<< "n = " << n << ", k = " << k); // We are about to compress the q-digest, at the moment it comprises // the unique values we have seen so far. So we extract the values // greater than the HIGH_PERCENTILE percentile at this point to // initialize the fine grain high quantile summary. if ((n + 1) == k) { LOG_TRACE(<< "Initializing H"); TUInt32UInt64PrVec L; m_RawScoreQuantileSummary.summary(L); if (L.empty()) { LOG_ERROR(<< "High quantile summary is empty: " << m_RawScoreQuantileSummary.print()); } else { auto highPercentileCount = static_cast<std::uint64_t>( (HIGH_PERCENTILE / 100.0) * static_cast<double>(n) + 0.5); // Estimate the high percentile score and update the count. std::size_t i = 1; m_HighPercentileScore = L[0].first; m_HighPercentileCount = L[0].second; for (/**/; i < L.size(); ++i) { if (L[i].second > highPercentileCount) { m_HighPercentileScore = L[i - 1].first; m_HighPercentileCount = L[i - 1].second; break; } } if (m_HighPercentileCount > n) { LOG_ERROR(<< "Invalid c(H) " << m_HighPercentileCount); LOG_ERROR(<< "target " << highPercentileCount); LOG_ERROR(<< "L " << L); m_HighPercentileCount = n; } LOG_TRACE(<< "s(H) = " << m_HighPercentileScore << ", c(H) = " << m_HighPercentileCount << ", percentile = " << 100.0 * static_cast<double>(m_HighPercentileCount) / static_cast<double>(n) << "%, desired c(H) = " << highPercentileCount); // Populate the high quantile summary. for (/**/; i < L.size(); ++i) { std::uint32_t x = L[i].first; std::uint64_t m = L[i].second - L[i - 1].second; LOG_TRACE(<< "Adding (" << x << ", " << m << ") to H"); m_RawScoreHighQuantileSummary.add(x, m); } } } m_RawScoreQuantileSummary.add(discreteScore); if (discreteScore <= m_HighPercentileScore) { ++m_HighPercentileCount; } else { m_RawScoreHighQuantileSummary.add(discreteScore); } LOG_TRACE(<< "percentile = " << static_cast<double>(m_HighPercentileCount) / static_cast<double>(n + 1)); // Periodically refresh the high percentile score. if ((n + 1) > k && (n + 1) % k == 0) { LOG_TRACE(<< "Refreshing high quantile summary"); auto highPercentileCount = static_cast<std::uint64_t>( (HIGH_PERCENTILE / 100.0) * static_cast<double>(n + 1) + 0.5); LOG_TRACE(<< "s(H) = " << m_HighPercentileScore << ", c(H) = " << m_HighPercentileCount << ", desired c(H) = " << highPercentileCount); if (m_HighPercentileCount > highPercentileCount) { TUInt32UInt64PrVec L; m_RawScoreQuantileSummary.summary(L); TUInt32UInt64PrVec H; m_RawScoreHighQuantileSummary.summary(H); std::size_t i0 = std::min( static_cast<std::size_t>( std::lower_bound(L.begin(), L.end(), highPercentileCount, maths::common::COrderings::SSecondLess()) - L.begin()), L.size() - 1); std::size_t j = std::min( static_cast<std::size_t>( std::upper_bound(H.begin(), H.end(), L[i0], maths::common::COrderings::SFirstLess()) - H.begin()), H.size() - 1); std::uint64_t r = L[i0].second; for (std::size_t i = i0 + 1; i < L.size() && L[i0].second + m_RawScoreHighQuantileSummary.n() < n + 1; ++i) { for (/**/; j < H.size() && H[j].first <= L[i].first; ++j) { r += (H[j].second - (j == 0 ? static_cast<std::uint64_t>(0) : H[j - 1].second)); } std::uint32_t x = L[i].first; std::uint64_t m = r < L[i].second ? L[i].second - r : static_cast<std::uint64_t>(0); r += m; if (m > 0) { LOG_TRACE(<< "Adding (" << x << ',' << m << ") to H"); m_RawScoreHighQuantileSummary.add(x, m); } } m_HighPercentileScore = L[i0].first; m_HighPercentileCount = L[i0].second; if (m_HighPercentileCount > n + 1) { LOG_ERROR(<< "Invalid c(H) " << m_HighPercentileCount); LOG_ERROR(<< "target " << highPercentileCount); LOG_ERROR(<< "L " << L); m_HighPercentileCount = n; } LOG_TRACE(<< "s(H) = " << m_HighPercentileScore << ", c(H) = " << m_HighPercentileCount << ", percentile = " << 100.0 * static_cast<double>(m_HighPercentileCount) / static_cast<double>(n + 1) << "%"); } else { m_RawScoreQuantileSummary.quantile(HIGH_PERCENTILE / 100.0, m_HighPercentileScore); double lowerBound; double upperBound; m_RawScoreQuantileSummary.cdf(m_HighPercentileScore, 0.0, lowerBound, upperBound); m_HighPercentileCount = static_cast<std::uint64_t>( static_cast<double>(n + 1) * lowerBound + 0.5); LOG_TRACE(<< "s(H) = " << m_HighPercentileScore << ", c(H) = " << m_HighPercentileCount << ", percentile = " << 100.0 * lowerBound << "%"); } } return bigChange; } void CAnomalyScore::CNormalizer::propagateForwardByTime(double time) { if (time < 0.0) { LOG_ERROR(<< "Can't propagate normalizer backwards in time"); return; } double alpha = std::exp(-2.0 * m_DecayRate * time); for (auto i = m_MaxScores.begin(); i != m_MaxScores.end(); i->second.forget(time) ? i = m_MaxScores.erase(i) : ++i) { i->second.age(alpha); } // Quantiles age at a much slower rate than everything else so that // we can accurately estimate high quantiles. We achieve this by only // aging them a certain fraction of the time. m_TimeToQuantileDecay -= time / static_cast<double>(m_CountPerSample); if (m_TimeToQuantileDecay <= 0.0) { time = std::floor((QUANTILE_DECAY_TIME - m_TimeToQuantileDecay) / QUANTILE_DECAY_TIME); std::uint64_t n = m_RawScoreQuantileSummary.n(); m_RawScoreQuantileSummary.propagateForwardsByTime(time); m_RawScoreHighQuantileSummary.propagateForwardsByTime(time); if (n > 0) { m_HighPercentileCount = static_cast<std::uint64_t>( static_cast<double>(m_RawScoreQuantileSummary.n()) / static_cast<double>(n) * static_cast<double>(m_HighPercentileCount) + 0.5); } m_TimeToQuantileDecay += QUANTILE_DECAY_TIME + std::floor(-m_TimeToQuantileDecay / QUANTILE_DECAY_TIME); } } void CAnomalyScore::CNormalizer::isForMembersOfPopulation(bool resultIsForMemberOfPopulation) { if (m_IsForMembersOfPopulation == std::nullopt) { m_IsForMembersOfPopulation.emplace(resultIsForMemberOfPopulation); } else { m_IsForMembersOfPopulation = *m_IsForMembersOfPopulation || resultIsForMemberOfPopulation; } } bool CAnomalyScore::CNormalizer::isUpgradable(const std::string& fromVersion, const std::string& toVersion) { // Any changes to this method need to be reflected in the upgrade() method // below to prevent an inconsistency where this method says an upgrade is // possible but the upgrade() method can't do it. return (fromVersion == "1" && toVersion == "2") || (fromVersion == "1" && toVersion == "3") || (fromVersion == "2" && toVersion == "3"); } bool CAnomalyScore::CNormalizer::upgrade(const std::string& loadedVersion, const std::string& currentVersion) { if (loadedVersion == currentVersion) { // No upgrade required. return true; } // We know how to upgrade between versions 1, 2 and 3. static const double HIGH_SCORE_UPGRADE_FACTOR[][3] = { {1.0, 0.3, 0.3}, {1.0 / 0.3, 1.0, 1.0}, {1.0 / 0.3, 1.0, 1.0}}; static const double Q_DIGEST_UPGRADE_FACTOR[][3] = { {1.0, 3.0, 30.0}, {1.0 / 3.0, 1.0, 10.0}, {1.0 / 30.0, 1.0 / 10.0, 1.0}}; std::size_t i; std::size_t j; if (!core::CStringUtils::stringToType(loadedVersion, i) || !core::CStringUtils::stringToType(currentVersion, j) || i - 1 >= std::size(HIGH_SCORE_UPGRADE_FACTOR) || j - 1 >= std::size(HIGH_SCORE_UPGRADE_FACTOR[0])) { LOG_ERROR(<< "Don't know how to upgrade quantiles from version " << loadedVersion << " to version " << currentVersion); return false; } double highScoreUpgradeFactor = HIGH_SCORE_UPGRADE_FACTOR[i - 1][j - 1]; double qDigestUpgradeFactor = Q_DIGEST_UPGRADE_FACTOR[i - 1][j - 1]; LOG_INFO(<< "Upgrading quantiles from version " << loadedVersion << " to version " << currentVersion << " - will scale highest score by " << highScoreUpgradeFactor << " and Q digest min/max values by " << qDigestUpgradeFactor); // For the maximum score aging is equivalent to scaling. for (auto& element : m_MaxScores) { element.second.age(highScoreUpgradeFactor); } if (m_RawScoreQuantileSummary.scale(qDigestUpgradeFactor) == false) { LOG_ERROR(<< "Failed to scale raw score quantiles"); return false; } if (m_RawScoreHighQuantileSummary.scale(qDigestUpgradeFactor) == false) { LOG_ERROR(<< "Failed to scale raw score high quantiles"); return false; } return true; } void CAnomalyScore::CNormalizer::clear() { m_HighPercentileScore = std::numeric_limits<std::uint32_t>::max(); m_HighPercentileCount = 0ULL; m_IsForMembersOfPopulation.reset(); m_MaxScores.clear(); m_CountSinceLastSample = 0; m_Sample = 0.0; m_RawScoreQuantileSummary.clear(); m_RawScoreHighQuantileSummary.clear(); m_TimeToQuantileDecay = QUANTILE_DECAY_TIME; } void CAnomalyScore::CNormalizer::acceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertValue(HIGH_PERCENTILE_SCORE_TAG, m_HighPercentileScore); inserter.insertValue(HIGH_PERCENTILE_COUNT_TAG, m_HighPercentileCount); inserter.insertValue(COUNT_SINCE_LAST_SAMPLE_TAG, m_CountSinceLastSample); inserter.insertValue(SAMPLE_TAG, m_Sample, core::CIEEE754::E_DoublePrecision); inserter.insertLevel(RAW_SCORE_QUANTILE_SUMMARY, [this](auto& inserter_) { m_RawScoreQuantileSummary.acceptPersistInserter(inserter_); }); inserter.insertLevel(RAW_SCORE_HIGH_QUANTILE_SUMMARY, [this](auto& inserter_) { m_RawScoreHighQuantileSummary.acceptPersistInserter(inserter_); }); inserter.insertValue(TIME_TO_QUANTILE_DECAY_TAG, m_TimeToQuantileDecay); if (m_IsForMembersOfPopulation != std::nullopt) { inserter.insertValue(IS_FOR_MEMBERS_OF_POPULATION_TAG, *m_IsForMembersOfPopulation ? 1 : 0); } core::CPersistUtils::persist(MAX_SCORES_PER_PARTITION_TAG, m_MaxScores, inserter); } bool CAnomalyScore::CNormalizer::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { do { const std::string& name = traverser.name(); LOG_TRACE(<< "name: " << name); // This used to be 64 bit but is now 32 bit, so may need adjusting // on restoration RESTORE_SETUP_TEARDOWN( HIGH_PERCENTILE_SCORE_TAG, std::uint64_t highPercentileScore64(0), core::CStringUtils::stringToType(traverser.value(), highPercentileScore64), m_HighPercentileScore = static_cast<std::uint32_t>(std::min( highPercentileScore64, static_cast<std::uint64_t>(std::numeric_limits<std::uint32_t>::max())))); RESTORE(HIGH_PERCENTILE_COUNT_TAG, core::CStringUtils::stringToType(traverser.value(), m_HighPercentileCount)); RESTORE_BUILT_IN(COUNT_SINCE_LAST_SAMPLE_TAG, m_CountSinceLastSample) RESTORE_BUILT_IN(SAMPLE_TAG, m_Sample) RESTORE(RAW_SCORE_QUANTILE_SUMMARY, traverser.traverseSubLevel([this](auto& traverser_) { return m_RawScoreQuantileSummary.acceptRestoreTraverser(traverser_); })) RESTORE(RAW_SCORE_HIGH_QUANTILE_SUMMARY, traverser.traverseSubLevel([this](auto& traverser_) { return m_RawScoreHighQuantileSummary.acceptRestoreTraverser(traverser_); })) RESTORE_SETUP_TEARDOWN(IS_FOR_MEMBERS_OF_POPULATION_TAG, int value, core::CStringUtils::stringToType(traverser.value(), value), m_IsForMembersOfPopulation.emplace(value == 1)) core::CPersistUtils::restore(MAX_SCORES_PER_PARTITION_TAG, m_MaxScores, traverser); } while (traverser.next()); return true; } std::uint64_t CAnomalyScore::CNormalizer::checksum() const { auto seed = static_cast<std::uint64_t>(m_NoisePercentile); seed = maths::common::CChecksum::calculate(seed, m_NoiseMultiplier); seed = maths::common::CChecksum::calculate(seed, m_NormalizedScoreKnotPoints); seed = maths::common::CChecksum::calculate(seed, m_MaximumNormalizedScore); seed = maths::common::CChecksum::calculate(seed, m_HighPercentileScore); seed = maths::common::CChecksum::calculate(seed, m_HighPercentileCount); seed = maths::common::CChecksum::calculate(seed, m_IsForMembersOfPopulation); seed = maths::common::CChecksum::calculate(seed, m_CountSinceLastSample); seed = maths::common::CChecksum::calculate(seed, m_CountPerSample); seed = maths::common::CChecksum::calculate(seed, m_Sample); seed = maths::common::CChecksum::calculate(seed, m_MaxScores); seed = maths::common::CChecksum::calculate(seed, m_RawScoreQuantileSummary); seed = maths::common::CChecksum::calculate(seed, m_RawScoreHighQuantileSummary); seed = maths::common::CChecksum::calculate(seed, m_DecayRate); return maths::common::CChecksum::calculate(seed, m_TimeToQuantileDecay); } std::uint32_t CAnomalyScore::CNormalizer::discreteScore(double rawScore) const { return static_cast<std::uint32_t>(DISCRETIZATION_FACTOR * rawScore + 0.5); } double CAnomalyScore::CNormalizer::rawScore(std::uint32_t discreteScore) const { return static_cast<double>(discreteScore) / DISCRETIZATION_FACTOR; } bool CAnomalyScore::CNormalizer::maxScore(const CMaximumScoreScope& scope, double& maxScore) const { auto itr = m_MaxScores.find(scope.key(m_IsForMembersOfPopulation, m_Dictionary)); if (itr == m_MaxScores.end()) { return false; } maxScore = itr->second.score(); return maxScore != 0.0; } CAnomalyScore::CNormalizer::CMaximumScoreScope::CMaximumScoreScope( TOptionalStrCRef partitionFieldName, TOptionalStrCRef partitionFieldValue, TOptionalStrCRef personFieldName, TOptionalStrCRef personFieldValue) : m_PartitionFieldName{std::move(partitionFieldName)}, m_PartitionFieldValue{std::move( partitionFieldValue)}, m_PersonFieldName{std::move(personFieldName)}, m_PersonFieldValue{std::move(personFieldValue)} { } CAnomalyScore::CNormalizer::TWord CAnomalyScore::CNormalizer::CMaximumScoreScope::key(TOptionalBool isPopulationAnalysis, const TDictionary& dictionary) const { if (isPopulationAnalysis == std::nullopt) { LOG_ERROR(<< "Using normalizer without refreshing settings"); } else if (*isPopulationAnalysis) { return TWord{}; } // The influencer named 'bucket_time' corresponds to the root level // normalizer for which we have keyed the maximum score on a blank // personName. const TOptionalStr& personFieldName = (m_PersonFieldName.get() == TIME_INFLUENCER) ? EMPTY_STRING : m_PersonFieldName.get(); return dictionary.word(m_PartitionFieldName.get().value_or(""), m_PartitionFieldValue.get().value_or(""), personFieldName.value_or(""), m_PersonFieldValue.get().value_or("")); } std::string CAnomalyScore::CNormalizer::CMaximumScoreScope::print() const { return "\"" + m_PartitionFieldName.get().value_or("") + "_" + m_PartitionFieldValue.get().value_or("") + "_" + m_PersonFieldName.get().value_or("") + "_" + m_PersonFieldValue.get().value_or("") + "\""; } void CAnomalyScore::CNormalizer::CMaxScore::add(double score) { m_Score.add(score); m_TimeSinceLastScore = 0.0; } double CAnomalyScore::CNormalizer::CMaxScore::score() const { return m_Score.count() == 0 ? 0.0 : m_Score[0]; } void CAnomalyScore::CNormalizer::CMaxScore::age(double alpha) { m_Score.age(alpha); } bool CAnomalyScore::CNormalizer::CMaxScore::forget(double time) { m_TimeSinceLastScore += time; return m_TimeSinceLastScore > FORGET_MAX_SCORE_INTERVAL; } void CAnomalyScore::CNormalizer::CMaxScore::acceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertValue(MAX_SCORE_TAG, m_Score.toDelimited()); inserter.insertValue(TIME_SINCE_LAST_SCORE_TAG, m_TimeSinceLastScore, core::CIEEE754::E_SinglePrecision); } bool CAnomalyScore::CNormalizer::CMaxScore::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { do { const std::string& name{traverser.name()}; RESTORE(MAX_SCORE_TAG, m_Score.fromDelimited(traverser.value())) RESTORE_BUILT_IN(TIME_SINCE_LAST_SCORE_TAG, m_TimeSinceLastScore) } while (traverser.next()); return true; } std::uint64_t CAnomalyScore::CNormalizer::CMaxScore::checksum() const { std::uint64_t seed = maths::common::CChecksum::calculate(0, m_Score); return maths::common::CChecksum::calculate(seed, m_TimeSinceLastScore); } // We use short field names to reduce the state size namespace { const std::string NORMALIZER_TAG("a"); } const std::string& CAnomalyScore::normalizedScoreToSeverity(double normalizedScore) { if (normalizedScore < 25.0) { return WARNING_SEVERITY; } if (normalizedScore < 50.0) { return MINOR_SEVERITY; } if (normalizedScore < 75.0) { return MAJOR_SEVERITY; } return CRITICAL_SEVERITY; } bool CAnomalyScore::normalizerFromJson(const std::string& json, CNormalizer& normalizer) { // This only works with uncompressed JSON - compression must be handled in // an outer layer. The method below works more efficiently with compressed // input. std::istringstream iss(json); core::CJsonStateRestoreTraverser traverser(iss); return normalizerFromJson(traverser, normalizer); } bool CAnomalyScore::normalizerFromJson(core::CStateRestoreTraverser& traverser, CNormalizer& normalizer) { bool restoredNormalizer(false); std::string restoredVersion(MISSING_VERSION_FORMAT_VERSION); while (traverser.next()) { const std::string& name = traverser.name(); if (name == MLVERSION_ATTRIBUTE) { restoredVersion = traverser.value(); if (restoredVersion != CURRENT_FORMAT_VERSION) { if (normalizer.isUpgradable(restoredVersion, CURRENT_FORMAT_VERSION)) { LOG_DEBUG(<< "Restored quantiles JSON version is " << restoredVersion << "; current JSON version is " << CURRENT_FORMAT_VERSION << " - will upgrade quantiles"); } else { // If the version has changed and the format is too different to // even upgrade then start again from scratch - this counts as a // successful load LOG_INFO(<< "Restored quantiles JSON version is " << restoredVersion << "; current JSON version is " << CURRENT_FORMAT_VERSION << " - will restart quantiles from scratch"); return true; } } } else if (name == NORMALIZER_TAG) { restoredNormalizer = traverser.traverseSubLevel([&](auto& traverser_) { return normalizer.acceptRestoreTraverser(traverser_); }); if (!restoredNormalizer) { LOG_ERROR(<< "Unable to restore quantiles to the normaliser"); } } } if (restoredNormalizer && restoredVersion != CURRENT_FORMAT_VERSION) { LOG_INFO(<< "Restored quantiles JSON version is " << restoredVersion << "; current JSON version is " << CURRENT_FORMAT_VERSION << " - will attempt upgrade"); if (normalizer.upgrade(restoredVersion, CURRENT_FORMAT_VERSION) == false) { LOG_ERROR(<< "Failed to upgrade quantiles from version " << restoredVersion << " to version " << CURRENT_FORMAT_VERSION); return false; } } return restoredNormalizer; } void CAnomalyScore::normalizerToJson(const CNormalizer& normalizer, const std::string& searchKey, const std::string& cue, const std::string& description, core_t::TTime time, std::ostream& strm) { core::CJsonStatePersistInserter inserter(strm); // Important: Even though some of these fields appear to be unnecessary, // the CHierarchicalResultsNormalizer requires them to exist in this // order. Do not change the fields or the ordering. inserter.insertValue(MLCUE_ATTRIBUTE, cue); inserter.insertValue(MLKEY_ATTRIBUTE, searchKey); inserter.insertValue(MLQUANTILESDESCRIPTION_ATTRIBUTE, description); inserter.insertValue(MLVERSION_ATTRIBUTE, CURRENT_FORMAT_VERSION); inserter.insertValue(TIME_ATTRIBUTE, core::CStringUtils::typeToString(time)); inserter.insertLevel(NORMALIZER_TAG, std::bind(&CNormalizer::acceptPersistInserter, &normalizer, std::placeholders::_1)); } } }