lib/model/CAnomalyScore.cc (808 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CAnomalyScore.h>
#include <core/CContainerPrinter.h>
#include <core/CIEEE754.h>
#include <core/CJsonStatePersistInserter.h>
#include <core/CJsonStateRestoreTraverser.h>
#include <core/CLogger.h>
#include <core/CPersistUtils.h>
#include <core/Constants.h>
#include <core/RestoreMacros.h>
#include <maths/common/CBasicStatistics.h>
#include <maths/common/CBasicStatisticsPersist.h>
#include <maths/common/CChecksum.h>
#include <maths/common/COrderings.h>
#include <maths/common/CTools.h>
#include <maths/common/Constants.h>
#include <maths/common/ProbabilityAggregators.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <cstdint>
#include <sstream>
#include <vector>
namespace ml {
namespace model {
namespace {
using TDoubleVec = std::vector<double>;
//! Add valid \p probabilities to \p aggregator and return the
//! number of valid probabilities.
template<typename AGGREGATOR>
std::size_t addProbabilities(const TDoubleVec& probabilities, AGGREGATOR& aggregator) {
std::size_t n = 0;
for (auto p : probabilities) {
if (!(p >= 0.0 && p <= 1.0)) {
LOG_ERROR(<< "Invalid probability " << p);
} else {
++n;
aggregator.add(p);
}
}
return n;
}
//! The function to convert probabilities to *raw* scores.
double probabilityToScore(double probability) {
return maths::common::CTools::anomalyScore(probability);
}
//! The function to convert *raw* scores to probabilities.
double scoreToProbability(double score) {
return maths::common::CTools::inverseAnomalyScore(score);
}
// We use short field names to reduce the state size
const std::string HIGH_PERCENTILE_SCORE_TAG("a");
const std::string HIGH_PERCENTILE_COUNT_TAG("b");
//const std::string MAX_SCORE_TAG("c"); Used in versions < 6.5
const std::string RAW_SCORE_QUANTILE_SUMMARY("d");
const std::string RAW_SCORE_HIGH_QUANTILE_SUMMARY("e");
const std::string TIME_TO_QUANTILE_DECAY_TAG("f");
// Since version >= 6.5
const std::string MAX_SCORES_PER_PARTITION_TAG("g");
const std::string IS_FOR_MEMBERS_OF_POPULATION_TAG("h");
// Since version >= 8.4
const std::string COUNT_SINCE_LAST_SAMPLE_TAG("i");
const std::string SAMPLE_TAG("j");
// Nested
const std::string MAX_SCORE_TAG("a");
const std::string TIME_SINCE_LAST_SCORE_TAG("b");
const std::string EMPTY_STRING;
// This is the version to assume for the format that doesn't contain a version
// attribute - NEVER CHANGE THIS
const std::string MISSING_VERSION_FORMAT_VERSION("1");
const std::string TIME_INFLUENCER("bucket_time");
}
const std::string CAnomalyScore::MLCUE_ATTRIBUTE("mlcue");
const std::string CAnomalyScore::MLKEY_ATTRIBUTE("mlkey");
const std::string CAnomalyScore::MLQUANTILESDESCRIPTION_ATTRIBUTE("mlquantilesdescription");
const std::string CAnomalyScore::MLVERSION_ATTRIBUTE("mlversion");
const std::string CAnomalyScore::TIME_ATTRIBUTE("time");
// Increment this every time a change to the format is made that requires
// existing state to be discarded
const std::string CAnomalyScore::CURRENT_FORMAT_VERSION("3");
const std::string CAnomalyScore::WARNING_SEVERITY("warning");
const std::string CAnomalyScore::MINOR_SEVERITY("minor");
const std::string CAnomalyScore::MAJOR_SEVERITY("major");
const std::string CAnomalyScore::CRITICAL_SEVERITY("critical");
bool CAnomalyScore::compute(double jointProbabilityWeight,
double extremeProbabilityWeight,
std::size_t minExtremeSamples,
std::size_t maxExtremeSamples,
double maximumAnomalousProbability,
const TDoubleVec& probabilities,
double& overallAnomalyScore,
double& overallProbability) {
overallAnomalyScore = 0.0;
overallProbability = 1.0;
if (probabilities.empty()) {
// Nothing to do.
return true;
}
maths::common::CLogJointProbabilityOfLessLikelySamples logPJointCalculator;
std::size_t n = std::min(addProbabilities(probabilities, logPJointCalculator),
maxExtremeSamples);
// Note the upper bound is significantly tighter, so we just
// use that in the following calculation.
double logPJoint;
if (!logPJointCalculator.calculateUpperBound(logPJoint)) {
LOG_ERROR(<< "Unable to calculate anomaly score"
<< ", probabilities = " << probabilities);
return false;
}
// Sanity check the probability not greater than 1.0.
if (logPJoint > 0.0) {
LOG_ERROR(<< "Invalid log joint probability " << logPJoint
<< ", probabilities = " << probabilities);
return false;
}
double logPExtreme = 0.0;
for (std::size_t m = 1, i = maths::common::CTools::truncate(minExtremeSamples, m, n);
i <= n; ++i) {
maths::common::CLogProbabilityOfMFromNExtremeSamples logPExtremeCalculator(i);
addProbabilities(probabilities, logPExtremeCalculator);
double logPi;
if (!logPExtremeCalculator.calibrated(logPi)) {
LOG_ERROR(<< "Unable to calculate anomaly score"
<< ", probabilities = " << probabilities);
return false;
}
if (logPi < logPExtreme) {
logPExtreme = logPi;
}
}
// Sanity check the probability in the range [0, 1].
if (logPExtreme > 0.0) {
LOG_ERROR(<< "Invalid log extreme probability " << logPExtreme
<< ", probabilities = " << probabilities);
return false;
}
double logMaximumAnomalousProbability = std::log(maximumAnomalousProbability);
if (logPJoint > logMaximumAnomalousProbability && logPExtreme > logMaximumAnomalousProbability) {
overallProbability = std::exp(jointProbabilityWeight * logPJoint) *
std::exp(extremeProbabilityWeight * logPExtreme);
return true;
}
// The following extends the range of the joint probability to
// [e^-100000, 1].
static const double NORMAL_RANGE_SCORE_FRACTION = 0.8;
static const double LOG_SMALLEST_PROBABILITY =
std::log(maths::common::CTools::smallestProbability());
static const double SMALLEST_PROBABILITY_DEVIATION =
probabilityToScore(maths::common::CTools::smallestProbability());
static const double SMALLEST_LOG_JOINT_PROBABILTY = -100000.0;
static const double SMALLEST_LOG_EXTREME_PROBABILTY = -1500.0;
if (logPJoint < LOG_SMALLEST_PROBABILITY) {
double interpolate = std::min((logPJoint - LOG_SMALLEST_PROBABILITY) /
(SMALLEST_LOG_JOINT_PROBABILTY - LOG_SMALLEST_PROBABILITY),
1.0);
overallAnomalyScore = (NORMAL_RANGE_SCORE_FRACTION +
(1.0 - NORMAL_RANGE_SCORE_FRACTION) * interpolate) *
jointProbabilityWeight * SMALLEST_PROBABILITY_DEVIATION;
} else {
overallAnomalyScore = NORMAL_RANGE_SCORE_FRACTION * jointProbabilityWeight *
probabilityToScore(std::exp(logPJoint));
}
if (logPExtreme < LOG_SMALLEST_PROBABILITY) {
double interpolate = std::min((logPExtreme - LOG_SMALLEST_PROBABILITY) /
(SMALLEST_LOG_EXTREME_PROBABILTY - LOG_SMALLEST_PROBABILITY),
1.0);
overallAnomalyScore += (NORMAL_RANGE_SCORE_FRACTION +
(1.0 - NORMAL_RANGE_SCORE_FRACTION) * interpolate) *
extremeProbabilityWeight * SMALLEST_PROBABILITY_DEVIATION;
} else {
overallAnomalyScore += NORMAL_RANGE_SCORE_FRACTION * extremeProbabilityWeight *
probabilityToScore(std::exp(logPExtreme));
}
// Invert the deviation in the region it is 1-to-1 otherwise
// use the weighted harmonic mean.
overallProbability =
overallAnomalyScore > 0.0
? scoreToProbability(std::min(overallAnomalyScore / NORMAL_RANGE_SCORE_FRACTION,
SMALLEST_PROBABILITY_DEVIATION))
: std::exp(jointProbabilityWeight * logPJoint) *
std::exp(extremeProbabilityWeight * logPExtreme);
LOG_TRACE(<< "logJointProbability = " << logPJoint << ", jointProbabilityWeight = "
<< jointProbabilityWeight << ", logExtremeProbability = " << logPExtreme
<< ", extremeProbabilityWeight = " << extremeProbabilityWeight
<< ", overallProbability = " << overallProbability
<< ", overallAnomalyScore = " << overallAnomalyScore << ", # probabilities = "
<< probabilities.size() << ", probabilities = " << probabilities);
return true;
}
CAnomalyScore::CComputer::CComputer(double jointProbabilityWeight,
double extremeProbabilityWeight,
std::size_t minExtremeSamples,
std::size_t maxExtremeSamples,
double maximumAnomalousProbability)
: m_JointProbabilityWeight(jointProbabilityWeight),
m_ExtremeProbabilityWeight(extremeProbabilityWeight),
m_MinExtremeSamples(std::min(minExtremeSamples, maxExtremeSamples)),
m_MaxExtremeSamples(maxExtremeSamples),
m_MaximumAnomalousProbability(maximumAnomalousProbability) {
}
bool CAnomalyScore::CComputer::operator()(const TDoubleVec& probabilities,
double& overallAnomalyScore,
double& overallProbability) const {
return CAnomalyScore::compute(m_JointProbabilityWeight, m_ExtremeProbabilityWeight,
m_MinExtremeSamples, m_MaxExtremeSamples,
m_MaximumAnomalousProbability, probabilities,
overallAnomalyScore, overallProbability);
}
CAnomalyScore::CNormalizer::CNormalizer(const CAnomalyDetectorModelConfig& config)
: m_NoisePercentile(config.noisePercentile()),
m_NoiseMultiplier(config.noiseMultiplier()),
m_NormalizedScoreKnotPoints(config.normalizedScoreKnotPoints()),
m_HighPercentileScore(std::numeric_limits<std::uint32_t>::max()),
m_CountPerSample(core::constants::HOUR /
maths::common::CTools::truncate(2 * config.bucketLength(),
5 * core::constants::MINUTE,
core::constants::HOUR)),
m_RawScoreQuantileSummary(201, config.decayRate()),
m_RawScoreHighQuantileSummary(201, config.decayRate()),
m_DecayRate(config.decayRate() *
std::max(static_cast<double>(config.bucketLength()) /
static_cast<double>(CAnomalyDetectorModelConfig::STANDARD_BUCKET_LENGTH),
1.0)),
m_TimeToQuantileDecay(QUANTILE_DECAY_TIME) {
}
bool CAnomalyScore::CNormalizer::canNormalize() const {
return m_RawScoreQuantileSummary.n() > 0;
}
bool CAnomalyScore::CNormalizer::normalize(const CMaximumScoreScope& scope,
double& score) const {
if (score == 0.0) {
// Nothing to do.
return true;
}
if (m_RawScoreQuantileSummary.n() == 0) {
score = 0.0;
return true;
}
LOG_TRACE(<< "Normalising " << score);
double normalizedScores[]{m_MaximumNormalizedScore, m_MaximumNormalizedScore,
m_MaximumNormalizedScore, m_MaximumNormalizedScore};
std::uint32_t discreteScore = this->discreteScore(score);
// Our normalized score is the minimum of a set of different
// score ceilings. The idea is that we have a number of factors
// which can reduce the score based on the other score values
// observed so far and the absolute score for some of our
// functions. We want the following properties:
// 1) The noise like scores, i.e. the smallest scores which
// relatively frequently, to have low normalized score.
// 2) Higher resolution of the highest scores we've seen.
// 3) The highest raw score ever observed has the highest
// normalized score.
// 4) We don't want to generate significant anomalies for
// moderately unusual events before we have enough history
// to assess their probability accurately.
//
// To ensure 1) we use a ceiling for the score that is a
// multiple of a low(ish) percentile score. To ensure 2) we
// use non-linear (actually piecewise linear) mapping from
// percentiles to scores, i.e. low percentile map to a small
// normalized score range and high percentiles map to a large
// normalized score range. Finally to ensure 3) we
// use a ceiling which is linear interpolation between a raw
// score of zero and the maximum ever raw score for the partition.
// To achieve 4), we cap the maximum score such that probabilities
// near the cutoff don't generate large normalized scores.
// Compute the noise ceiling. Note that since the scores are
// logarithms (base e) the difference corresponds to the scaled,
// 10 / log(10), signal strength in dB. By default a signal of
// 75dB corresponds to a normalized score of 75. Note that if
// the noise percentile is zero then really it is unknown,
// since scores are truncated to zero. In this case we don't
// want this term to constrain the normalized score. However,
// we also want this term to be smooth, i.e. the score should
// be nearly continuous when F(0) = pn. Here, F(.) denotes the
// c.d.f. of the score and pn the noise percentile. We achieve
// this by adding "max score" * min(F(0) / "noise percentile",
// to the score.
std::uint32_t noiseScore;
m_RawScoreQuantileSummary.quantile(m_NoisePercentile / 100.0, noiseScore);
auto knotPoint = std::lower_bound(m_NormalizedScoreKnotPoints.begin(),
m_NormalizedScoreKnotPoints.end(),
TDoubleDoublePr(m_NoisePercentile, 0.0));
double signalStrength =
m_NoiseMultiplier * 10.0 / DISCRETIZATION_FACTOR *
(static_cast<double>(discreteScore) - static_cast<double>(noiseScore));
double l0;
double u0;
m_RawScoreQuantileSummary.cdf(0, 0.0, l0, u0);
normalizedScores[0] =
knotPoint->second * std::max(1.0 + signalStrength, 0.0) +
m_MaximumNormalizedScore *
std::max(2.0 * std::min(50.0 * (l0 + u0) / m_NoisePercentile, 1.0) - 1.0, 0.0);
LOG_TRACE(<< "normalizedScores[0] = " << normalizedScores[0]
<< ", knotPoint = " << knotPoint->second << ", discreteScore = " << discreteScore
<< ", noiseScore = " << noiseScore << ", l(0) = " << l0
<< ", u(0) = " << u0 << ", signalStrength = " << signalStrength);
// Compute the raw normalized score percentile compared to historic values.
double lowerPercentile;
double upperPercentile;
this->quantile(score, 70.0 /*confidence interval*/, lowerPercentile, upperPercentile);
lowerPercentile *= 100.0;
upperPercentile *= 100.0;
if (lowerPercentile > upperPercentile) {
std::swap(lowerPercentile, upperPercentile);
}
lowerPercentile = maths::common::CTools::truncate(lowerPercentile, 0.0, 100.0);
upperPercentile = maths::common::CTools::truncate(upperPercentile, 0.0, 100.0);
std::size_t lowerKnotPoint =
std::max(std::lower_bound(m_NormalizedScoreKnotPoints.begin(),
m_NormalizedScoreKnotPoints.end(), lowerPercentile,
maths::common::COrderings::SFirstLess()) -
m_NormalizedScoreKnotPoints.begin(),
std::ptrdiff_t(1));
std::size_t upperKnotPoint =
std::max(std::lower_bound(m_NormalizedScoreKnotPoints.begin(),
m_NormalizedScoreKnotPoints.end(), upperPercentile,
maths::common::COrderings::SFirstLess()) -
m_NormalizedScoreKnotPoints.begin(),
std::ptrdiff_t(1));
if (lowerKnotPoint < m_NormalizedScoreKnotPoints.size()) {
const TDoubleDoublePr& left = m_NormalizedScoreKnotPoints[lowerKnotPoint - 1];
const TDoubleDoublePr& right = m_NormalizedScoreKnotPoints[lowerKnotPoint];
normalizedScores[1] = maths::common::CTools::linearlyInterpolate(
left.first, right.first, left.second, right.second, lowerPercentile);
} else {
normalizedScores[1] = m_MaximumNormalizedScore;
}
if (upperKnotPoint < m_NormalizedScoreKnotPoints.size()) {
const TDoubleDoublePr& left = m_NormalizedScoreKnotPoints[upperKnotPoint - 1];
const TDoubleDoublePr& right = m_NormalizedScoreKnotPoints[upperKnotPoint];
normalizedScores[1] =
(normalizedScores[1] + maths::common::CTools::linearlyInterpolate(
left.first, right.first, left.second,
right.second, upperPercentile)) /
2.0;
} else {
normalizedScores[1] = (normalizedScores[1] + m_MaximumNormalizedScore) / 2.0;
}
LOG_TRACE(<< "normalizedScores[1] = " << normalizedScores[1] << ", lowerPercentile = "
<< lowerPercentile << ", upperPercentile = " << upperPercentile);
double maxScore{0.0};
bool hasValidMaxScore = this->maxScore(scope, maxScore);
LOG_TRACE(<< "hasValidMaxScore = " << hasValidMaxScore
<< ", scope = " << scope.print());
if (hasValidMaxScore) {
// Compute the maximum score ceiling
double ratio = score / maxScore;
normalizedScores[2] = m_MaximumNormalizedScore *
std::min({0.0 + 1.5 * ratio, 0.5 + 0.5 * ratio});
LOG_TRACE(<< "normalizedScores[2] = " << normalizedScores[2] << ", score = " << score
<< ", maxScore = " << maxScore << ", ratio = " << ratio);
}
// Logarithmically interpolate the maximum score between the
// largest significant and small probability.
static const double M =
(probabilityToScore(maths::common::SMALL_PROBABILITY) -
probabilityToScore(maths::common::LARGEST_SIGNIFICANT_PROBABILITY)) /
(std::log(maths::common::SMALL_PROBABILITY) -
std::log(maths::common::LARGEST_SIGNIFICANT_PROBABILITY));
static const double C = std::log(maths::common::LARGEST_SIGNIFICANT_PROBABILITY);
normalizedScores[3] = m_MaximumNormalizedScore *
(0.95 * M * (std::log(scoreToProbability(score)) - C) + 0.05);
LOG_TRACE(<< "normalizedScores[3] = " << normalizedScores[3] << ", score = " << score
<< ", probability = " << scoreToProbability(score));
score = std::min(*std::min_element(std::begin(normalizedScores), std::end(normalizedScores)),
m_MaximumNormalizedScore);
LOG_TRACE(<< "normalizedScore = " << score << ", scope = " << scope.print());
return true;
}
void CAnomalyScore::CNormalizer::quantile(double score,
double confidence,
double& lowerBound,
double& upperBound) const {
std::uint32_t discreteScore = this->discreteScore(score);
double n = static_cast<double>(m_RawScoreQuantileSummary.n());
double lowerQuantile = (100.0 - confidence) / 200.0;
double upperQuantile = (100.0 + confidence) / 200.0;
double h = static_cast<double>(m_HighPercentileCount);
double f = h / n;
if (!(f >= 0.0 && f <= 1.0)) {
f = maths::common::CTools::truncate(f, 0.0, 1.0);
LOG_ERROR(<< "h = " << h << ", n = " << n);
}
double fl = maths::common::CQDigest::cdfQuantile(n, f, lowerQuantile);
double fu = maths::common::CQDigest::cdfQuantile(n, f, upperQuantile);
if (discreteScore <= m_HighPercentileScore || m_RawScoreHighQuantileSummary.n() == 0) {
m_RawScoreQuantileSummary.cdf(discreteScore, 0.0, lowerBound, upperBound);
double pdfLowerBound;
double pdfUpperBound;
m_RawScoreQuantileSummary.pdf(discreteScore, 0.0, pdfLowerBound, pdfUpperBound);
lowerBound = maths::common::CTools::truncate(lowerBound - pdfUpperBound, 0.0, fl);
upperBound = maths::common::CTools::truncate(upperBound - pdfLowerBound, 0.0, fu);
if (!(lowerBound >= 0.0 && lowerBound <= 1.0) ||
!(upperBound >= 0.0 && upperBound <= 1.0)) {
LOG_ERROR(<< "score = " << score << ", cdf = [" << lowerBound << ","
<< upperBound << "]"
<< ", pdf = [" << pdfLowerBound << "," << pdfUpperBound << "]");
}
lowerBound = maths::common::CQDigest::cdfQuantile(n, lowerBound, lowerQuantile);
upperBound = maths::common::CQDigest::cdfQuantile(n, upperBound, upperQuantile);
LOG_TRACE(<< "score = " << score << ", cdf = [" << lowerBound << "," << upperBound << "]"
<< ", pdf = [" << pdfLowerBound << "," << pdfUpperBound << "]");
return;
}
// Note if cutoffUpperBound were ever equal to one to working
// precision then lowerBound will be zero. The same is true
// for the cutoffLowerBound and upperBound. In practice both
// these situations should never happen but we trap them
// to avoid NaNs in the following calculation.
m_RawScoreHighQuantileSummary.cdf(discreteScore, 0.0, lowerBound, upperBound);
double cutoffCdfLowerBound;
double cutoffCdfUpperBound;
m_RawScoreHighQuantileSummary.cdf(m_HighPercentileScore, 0.0,
cutoffCdfLowerBound, cutoffCdfUpperBound);
double pdfLowerBound;
double pdfUpperBound;
m_RawScoreHighQuantileSummary.pdf(discreteScore, 0.0, pdfLowerBound, pdfUpperBound);
lowerBound = fl + (1.0 - fl) *
std::max(lowerBound - cutoffCdfUpperBound - pdfUpperBound, 0.0) /
std::max(1.0 - cutoffCdfUpperBound,
std::numeric_limits<double>::epsilon());
upperBound = fu + (1.0 - fu) *
std::max(upperBound - cutoffCdfLowerBound - pdfLowerBound, 0.0) /
std::max(1.0 - cutoffCdfLowerBound,
std::numeric_limits<double>::epsilon());
if (!(lowerBound >= 0.0 && lowerBound <= 1.0) ||
!(upperBound >= 0.0 && upperBound <= 1.0)) {
LOG_ERROR(<< "score = " << score << ", cdf = [" << lowerBound << "," << upperBound << "]"
<< ", cutoff = [" << cutoffCdfLowerBound << "," << cutoffCdfUpperBound << "]"
<< ", pdf = [" << pdfLowerBound << "," << pdfUpperBound << "]"
<< ", f = " << f);
}
lowerBound = maths::common::CQDigest::cdfQuantile(n, lowerBound, lowerQuantile);
upperBound = maths::common::CQDigest::cdfQuantile(n, upperBound, upperQuantile);
LOG_TRACE(<< "score = " << score << ", cdf = [" << lowerBound << "," << upperBound << "]"
<< ", cutoff = [" << cutoffCdfLowerBound << "," << cutoffCdfUpperBound << "]"
<< ", pdf = [" << pdfLowerBound << "," << pdfUpperBound << "]"
<< ", f = " << f);
}
bool CAnomalyScore::CNormalizer::updateQuantiles(const CMaximumScoreScope& scope, double score) {
using TUInt32UInt64Pr = std::pair<std::uint32_t, std::uint64_t>;
using TUInt32UInt64PrVec = std::vector<TUInt32UInt64Pr>;
CMaxScore& maxScore = m_MaxScores[scope.key(m_IsForMembersOfPopulation, m_Dictionary)];
double oldMaxScore{maxScore.score()};
maxScore.add(score);
bool bigChange{maxScore.score() > BIG_CHANGE_FACTOR * oldMaxScore};
LOG_TRACE(<< "updateQuantiles: scope = " << scope.print()
<< ", oldMaxScore = " << oldMaxScore << ", score = " << score
<< ", newMaxScore = " << maxScore.score() << ", bigChange = " << bigChange);
m_Sample = std::max(m_Sample, score);
if (++m_CountSinceLastSample < m_CountPerSample) {
return bigChange;
}
std::uint32_t discreteScore = this->discreteScore(m_Sample);
LOG_TRACE(<< "score = " << m_Sample << ", discreteScore = " << discreteScore
<< ", maxScore = " << maxScore.score() << ", scope = " << scope.print());
m_CountSinceLastSample = 0;
m_Sample = 0.0;
std::uint64_t n = m_RawScoreQuantileSummary.n();
std::uint64_t k = m_RawScoreQuantileSummary.k();
LOG_TRACE(<< "n = " << n << ", k = " << k);
// We are about to compress the q-digest, at the moment it comprises
// the unique values we have seen so far. So we extract the values
// greater than the HIGH_PERCENTILE percentile at this point to
// initialize the fine grain high quantile summary.
if ((n + 1) == k) {
LOG_TRACE(<< "Initializing H");
TUInt32UInt64PrVec L;
m_RawScoreQuantileSummary.summary(L);
if (L.empty()) {
LOG_ERROR(<< "High quantile summary is empty: "
<< m_RawScoreQuantileSummary.print());
} else {
auto highPercentileCount = static_cast<std::uint64_t>(
(HIGH_PERCENTILE / 100.0) * static_cast<double>(n) + 0.5);
// Estimate the high percentile score and update the count.
std::size_t i = 1;
m_HighPercentileScore = L[0].first;
m_HighPercentileCount = L[0].second;
for (/**/; i < L.size(); ++i) {
if (L[i].second > highPercentileCount) {
m_HighPercentileScore = L[i - 1].first;
m_HighPercentileCount = L[i - 1].second;
break;
}
}
if (m_HighPercentileCount > n) {
LOG_ERROR(<< "Invalid c(H) " << m_HighPercentileCount);
LOG_ERROR(<< "target " << highPercentileCount);
LOG_ERROR(<< "L " << L);
m_HighPercentileCount = n;
}
LOG_TRACE(<< "s(H) = " << m_HighPercentileScore
<< ", c(H) = " << m_HighPercentileCount << ", percentile = "
<< 100.0 * static_cast<double>(m_HighPercentileCount) /
static_cast<double>(n)
<< "%, desired c(H) = " << highPercentileCount);
// Populate the high quantile summary.
for (/**/; i < L.size(); ++i) {
std::uint32_t x = L[i].first;
std::uint64_t m = L[i].second - L[i - 1].second;
LOG_TRACE(<< "Adding (" << x << ", " << m << ") to H");
m_RawScoreHighQuantileSummary.add(x, m);
}
}
}
m_RawScoreQuantileSummary.add(discreteScore);
if (discreteScore <= m_HighPercentileScore) {
++m_HighPercentileCount;
} else {
m_RawScoreHighQuantileSummary.add(discreteScore);
}
LOG_TRACE(<< "percentile = "
<< static_cast<double>(m_HighPercentileCount) / static_cast<double>(n + 1));
// Periodically refresh the high percentile score.
if ((n + 1) > k && (n + 1) % k == 0) {
LOG_TRACE(<< "Refreshing high quantile summary");
auto highPercentileCount = static_cast<std::uint64_t>(
(HIGH_PERCENTILE / 100.0) * static_cast<double>(n + 1) + 0.5);
LOG_TRACE(<< "s(H) = " << m_HighPercentileScore << ", c(H) = " << m_HighPercentileCount
<< ", desired c(H) = " << highPercentileCount);
if (m_HighPercentileCount > highPercentileCount) {
TUInt32UInt64PrVec L;
m_RawScoreQuantileSummary.summary(L);
TUInt32UInt64PrVec H;
m_RawScoreHighQuantileSummary.summary(H);
std::size_t i0 = std::min(
static_cast<std::size_t>(
std::lower_bound(L.begin(), L.end(), highPercentileCount,
maths::common::COrderings::SSecondLess()) -
L.begin()),
L.size() - 1);
std::size_t j = std::min(
static_cast<std::size_t>(
std::upper_bound(H.begin(), H.end(), L[i0],
maths::common::COrderings::SFirstLess()) -
H.begin()),
H.size() - 1);
std::uint64_t r = L[i0].second;
for (std::size_t i = i0 + 1;
i < L.size() && L[i0].second + m_RawScoreHighQuantileSummary.n() < n + 1;
++i) {
for (/**/; j < H.size() && H[j].first <= L[i].first; ++j) {
r += (H[j].second - (j == 0 ? static_cast<std::uint64_t>(0)
: H[j - 1].second));
}
std::uint32_t x = L[i].first;
std::uint64_t m = r < L[i].second ? L[i].second - r
: static_cast<std::uint64_t>(0);
r += m;
if (m > 0) {
LOG_TRACE(<< "Adding (" << x << ',' << m << ") to H");
m_RawScoreHighQuantileSummary.add(x, m);
}
}
m_HighPercentileScore = L[i0].first;
m_HighPercentileCount = L[i0].second;
if (m_HighPercentileCount > n + 1) {
LOG_ERROR(<< "Invalid c(H) " << m_HighPercentileCount);
LOG_ERROR(<< "target " << highPercentileCount);
LOG_ERROR(<< "L " << L);
m_HighPercentileCount = n;
}
LOG_TRACE(<< "s(H) = " << m_HighPercentileScore
<< ", c(H) = " << m_HighPercentileCount << ", percentile = "
<< 100.0 * static_cast<double>(m_HighPercentileCount) /
static_cast<double>(n + 1)
<< "%");
} else {
m_RawScoreQuantileSummary.quantile(HIGH_PERCENTILE / 100.0, m_HighPercentileScore);
double lowerBound;
double upperBound;
m_RawScoreQuantileSummary.cdf(m_HighPercentileScore, 0.0, lowerBound, upperBound);
m_HighPercentileCount = static_cast<std::uint64_t>(
static_cast<double>(n + 1) * lowerBound + 0.5);
LOG_TRACE(<< "s(H) = " << m_HighPercentileScore << ", c(H) = " << m_HighPercentileCount
<< ", percentile = " << 100.0 * lowerBound << "%");
}
}
return bigChange;
}
void CAnomalyScore::CNormalizer::propagateForwardByTime(double time) {
if (time < 0.0) {
LOG_ERROR(<< "Can't propagate normalizer backwards in time");
return;
}
double alpha = std::exp(-2.0 * m_DecayRate * time);
for (auto i = m_MaxScores.begin(); i != m_MaxScores.end();
i->second.forget(time) ? i = m_MaxScores.erase(i) : ++i) {
i->second.age(alpha);
}
// Quantiles age at a much slower rate than everything else so that
// we can accurately estimate high quantiles. We achieve this by only
// aging them a certain fraction of the time.
m_TimeToQuantileDecay -= time / static_cast<double>(m_CountPerSample);
if (m_TimeToQuantileDecay <= 0.0) {
time = std::floor((QUANTILE_DECAY_TIME - m_TimeToQuantileDecay) / QUANTILE_DECAY_TIME);
std::uint64_t n = m_RawScoreQuantileSummary.n();
m_RawScoreQuantileSummary.propagateForwardsByTime(time);
m_RawScoreHighQuantileSummary.propagateForwardsByTime(time);
if (n > 0) {
m_HighPercentileCount = static_cast<std::uint64_t>(
static_cast<double>(m_RawScoreQuantileSummary.n()) /
static_cast<double>(n) * static_cast<double>(m_HighPercentileCount) +
0.5);
}
m_TimeToQuantileDecay += QUANTILE_DECAY_TIME +
std::floor(-m_TimeToQuantileDecay / QUANTILE_DECAY_TIME);
}
}
void CAnomalyScore::CNormalizer::isForMembersOfPopulation(bool resultIsForMemberOfPopulation) {
if (m_IsForMembersOfPopulation == std::nullopt) {
m_IsForMembersOfPopulation.emplace(resultIsForMemberOfPopulation);
} else {
m_IsForMembersOfPopulation = *m_IsForMembersOfPopulation || resultIsForMemberOfPopulation;
}
}
bool CAnomalyScore::CNormalizer::isUpgradable(const std::string& fromVersion,
const std::string& toVersion) {
// Any changes to this method need to be reflected in the upgrade() method
// below to prevent an inconsistency where this method says an upgrade is
// possible but the upgrade() method can't do it.
return (fromVersion == "1" && toVersion == "2") ||
(fromVersion == "1" && toVersion == "3") ||
(fromVersion == "2" && toVersion == "3");
}
bool CAnomalyScore::CNormalizer::upgrade(const std::string& loadedVersion,
const std::string& currentVersion) {
if (loadedVersion == currentVersion) {
// No upgrade required.
return true;
}
// We know how to upgrade between versions 1, 2 and 3.
static const double HIGH_SCORE_UPGRADE_FACTOR[][3] = {
{1.0, 0.3, 0.3}, {1.0 / 0.3, 1.0, 1.0}, {1.0 / 0.3, 1.0, 1.0}};
static const double Q_DIGEST_UPGRADE_FACTOR[][3] = {
{1.0, 3.0, 30.0}, {1.0 / 3.0, 1.0, 10.0}, {1.0 / 30.0, 1.0 / 10.0, 1.0}};
std::size_t i;
std::size_t j;
if (!core::CStringUtils::stringToType(loadedVersion, i) ||
!core::CStringUtils::stringToType(currentVersion, j) ||
i - 1 >= std::size(HIGH_SCORE_UPGRADE_FACTOR) ||
j - 1 >= std::size(HIGH_SCORE_UPGRADE_FACTOR[0])) {
LOG_ERROR(<< "Don't know how to upgrade quantiles from version "
<< loadedVersion << " to version " << currentVersion);
return false;
}
double highScoreUpgradeFactor = HIGH_SCORE_UPGRADE_FACTOR[i - 1][j - 1];
double qDigestUpgradeFactor = Q_DIGEST_UPGRADE_FACTOR[i - 1][j - 1];
LOG_INFO(<< "Upgrading quantiles from version " << loadedVersion << " to version "
<< currentVersion << " - will scale highest score by " << highScoreUpgradeFactor
<< " and Q digest min/max values by " << qDigestUpgradeFactor);
// For the maximum score aging is equivalent to scaling.
for (auto& element : m_MaxScores) {
element.second.age(highScoreUpgradeFactor);
}
if (m_RawScoreQuantileSummary.scale(qDigestUpgradeFactor) == false) {
LOG_ERROR(<< "Failed to scale raw score quantiles");
return false;
}
if (m_RawScoreHighQuantileSummary.scale(qDigestUpgradeFactor) == false) {
LOG_ERROR(<< "Failed to scale raw score high quantiles");
return false;
}
return true;
}
void CAnomalyScore::CNormalizer::clear() {
m_HighPercentileScore = std::numeric_limits<std::uint32_t>::max();
m_HighPercentileCount = 0ULL;
m_IsForMembersOfPopulation.reset();
m_MaxScores.clear();
m_CountSinceLastSample = 0;
m_Sample = 0.0;
m_RawScoreQuantileSummary.clear();
m_RawScoreHighQuantileSummary.clear();
m_TimeToQuantileDecay = QUANTILE_DECAY_TIME;
}
void CAnomalyScore::CNormalizer::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(HIGH_PERCENTILE_SCORE_TAG, m_HighPercentileScore);
inserter.insertValue(HIGH_PERCENTILE_COUNT_TAG, m_HighPercentileCount);
inserter.insertValue(COUNT_SINCE_LAST_SAMPLE_TAG, m_CountSinceLastSample);
inserter.insertValue(SAMPLE_TAG, m_Sample, core::CIEEE754::E_DoublePrecision);
inserter.insertLevel(RAW_SCORE_QUANTILE_SUMMARY, [this](auto& inserter_) {
m_RawScoreQuantileSummary.acceptPersistInserter(inserter_);
});
inserter.insertLevel(RAW_SCORE_HIGH_QUANTILE_SUMMARY, [this](auto& inserter_) {
m_RawScoreHighQuantileSummary.acceptPersistInserter(inserter_);
});
inserter.insertValue(TIME_TO_QUANTILE_DECAY_TAG, m_TimeToQuantileDecay);
if (m_IsForMembersOfPopulation != std::nullopt) {
inserter.insertValue(IS_FOR_MEMBERS_OF_POPULATION_TAG,
*m_IsForMembersOfPopulation ? 1 : 0);
}
core::CPersistUtils::persist(MAX_SCORES_PER_PARTITION_TAG, m_MaxScores, inserter);
}
bool CAnomalyScore::CNormalizer::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
do {
const std::string& name = traverser.name();
LOG_TRACE(<< "name: " << name);
// This used to be 64 bit but is now 32 bit, so may need adjusting
// on restoration
RESTORE_SETUP_TEARDOWN(
HIGH_PERCENTILE_SCORE_TAG, std::uint64_t highPercentileScore64(0),
core::CStringUtils::stringToType(traverser.value(), highPercentileScore64),
m_HighPercentileScore = static_cast<std::uint32_t>(std::min(
highPercentileScore64,
static_cast<std::uint64_t>(std::numeric_limits<std::uint32_t>::max()))));
RESTORE(HIGH_PERCENTILE_COUNT_TAG,
core::CStringUtils::stringToType(traverser.value(), m_HighPercentileCount));
RESTORE_BUILT_IN(COUNT_SINCE_LAST_SAMPLE_TAG, m_CountSinceLastSample)
RESTORE_BUILT_IN(SAMPLE_TAG, m_Sample)
RESTORE(RAW_SCORE_QUANTILE_SUMMARY, traverser.traverseSubLevel([this](auto& traverser_) {
return m_RawScoreQuantileSummary.acceptRestoreTraverser(traverser_);
}))
RESTORE(RAW_SCORE_HIGH_QUANTILE_SUMMARY,
traverser.traverseSubLevel([this](auto& traverser_) {
return m_RawScoreHighQuantileSummary.acceptRestoreTraverser(traverser_);
}))
RESTORE_SETUP_TEARDOWN(IS_FOR_MEMBERS_OF_POPULATION_TAG, int value,
core::CStringUtils::stringToType(traverser.value(), value),
m_IsForMembersOfPopulation.emplace(value == 1))
core::CPersistUtils::restore(MAX_SCORES_PER_PARTITION_TAG, m_MaxScores, traverser);
} while (traverser.next());
return true;
}
std::uint64_t CAnomalyScore::CNormalizer::checksum() const {
auto seed = static_cast<std::uint64_t>(m_NoisePercentile);
seed = maths::common::CChecksum::calculate(seed, m_NoiseMultiplier);
seed = maths::common::CChecksum::calculate(seed, m_NormalizedScoreKnotPoints);
seed = maths::common::CChecksum::calculate(seed, m_MaximumNormalizedScore);
seed = maths::common::CChecksum::calculate(seed, m_HighPercentileScore);
seed = maths::common::CChecksum::calculate(seed, m_HighPercentileCount);
seed = maths::common::CChecksum::calculate(seed, m_IsForMembersOfPopulation);
seed = maths::common::CChecksum::calculate(seed, m_CountSinceLastSample);
seed = maths::common::CChecksum::calculate(seed, m_CountPerSample);
seed = maths::common::CChecksum::calculate(seed, m_Sample);
seed = maths::common::CChecksum::calculate(seed, m_MaxScores);
seed = maths::common::CChecksum::calculate(seed, m_RawScoreQuantileSummary);
seed = maths::common::CChecksum::calculate(seed, m_RawScoreHighQuantileSummary);
seed = maths::common::CChecksum::calculate(seed, m_DecayRate);
return maths::common::CChecksum::calculate(seed, m_TimeToQuantileDecay);
}
std::uint32_t CAnomalyScore::CNormalizer::discreteScore(double rawScore) const {
return static_cast<std::uint32_t>(DISCRETIZATION_FACTOR * rawScore + 0.5);
}
double CAnomalyScore::CNormalizer::rawScore(std::uint32_t discreteScore) const {
return static_cast<double>(discreteScore) / DISCRETIZATION_FACTOR;
}
bool CAnomalyScore::CNormalizer::maxScore(const CMaximumScoreScope& scope,
double& maxScore) const {
auto itr = m_MaxScores.find(scope.key(m_IsForMembersOfPopulation, m_Dictionary));
if (itr == m_MaxScores.end()) {
return false;
}
maxScore = itr->second.score();
return maxScore != 0.0;
}
CAnomalyScore::CNormalizer::CMaximumScoreScope::CMaximumScoreScope(
TOptionalStrCRef partitionFieldName,
TOptionalStrCRef partitionFieldValue,
TOptionalStrCRef personFieldName,
TOptionalStrCRef personFieldValue)
: m_PartitionFieldName{std::move(partitionFieldName)}, m_PartitionFieldValue{std::move(
partitionFieldValue)},
m_PersonFieldName{std::move(personFieldName)}, m_PersonFieldValue{std::move(personFieldValue)} {
}
CAnomalyScore::CNormalizer::TWord
CAnomalyScore::CNormalizer::CMaximumScoreScope::key(TOptionalBool isPopulationAnalysis,
const TDictionary& dictionary) const {
if (isPopulationAnalysis == std::nullopt) {
LOG_ERROR(<< "Using normalizer without refreshing settings");
} else if (*isPopulationAnalysis) {
return TWord{};
}
// The influencer named 'bucket_time' corresponds to the root level
// normalizer for which we have keyed the maximum score on a blank
// personName.
const TOptionalStr& personFieldName = (m_PersonFieldName.get() == TIME_INFLUENCER)
? EMPTY_STRING
: m_PersonFieldName.get();
return dictionary.word(m_PartitionFieldName.get().value_or(""),
m_PartitionFieldValue.get().value_or(""),
personFieldName.value_or(""),
m_PersonFieldValue.get().value_or(""));
}
std::string CAnomalyScore::CNormalizer::CMaximumScoreScope::print() const {
return "\"" + m_PartitionFieldName.get().value_or("") + "_" +
m_PartitionFieldValue.get().value_or("") + "_" +
m_PersonFieldName.get().value_or("") + "_" +
m_PersonFieldValue.get().value_or("") + "\"";
}
void CAnomalyScore::CNormalizer::CMaxScore::add(double score) {
m_Score.add(score);
m_TimeSinceLastScore = 0.0;
}
double CAnomalyScore::CNormalizer::CMaxScore::score() const {
return m_Score.count() == 0 ? 0.0 : m_Score[0];
}
void CAnomalyScore::CNormalizer::CMaxScore::age(double alpha) {
m_Score.age(alpha);
}
bool CAnomalyScore::CNormalizer::CMaxScore::forget(double time) {
m_TimeSinceLastScore += time;
return m_TimeSinceLastScore > FORGET_MAX_SCORE_INTERVAL;
}
void CAnomalyScore::CNormalizer::CMaxScore::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(MAX_SCORE_TAG, m_Score.toDelimited());
inserter.insertValue(TIME_SINCE_LAST_SCORE_TAG, m_TimeSinceLastScore,
core::CIEEE754::E_SinglePrecision);
}
bool CAnomalyScore::CNormalizer::CMaxScore::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
do {
const std::string& name{traverser.name()};
RESTORE(MAX_SCORE_TAG, m_Score.fromDelimited(traverser.value()))
RESTORE_BUILT_IN(TIME_SINCE_LAST_SCORE_TAG, m_TimeSinceLastScore)
} while (traverser.next());
return true;
}
std::uint64_t CAnomalyScore::CNormalizer::CMaxScore::checksum() const {
std::uint64_t seed = maths::common::CChecksum::calculate(0, m_Score);
return maths::common::CChecksum::calculate(seed, m_TimeSinceLastScore);
}
// We use short field names to reduce the state size
namespace {
const std::string NORMALIZER_TAG("a");
}
const std::string& CAnomalyScore::normalizedScoreToSeverity(double normalizedScore) {
if (normalizedScore < 25.0) {
return WARNING_SEVERITY;
}
if (normalizedScore < 50.0) {
return MINOR_SEVERITY;
}
if (normalizedScore < 75.0) {
return MAJOR_SEVERITY;
}
return CRITICAL_SEVERITY;
}
bool CAnomalyScore::normalizerFromJson(const std::string& json, CNormalizer& normalizer) {
// This only works with uncompressed JSON - compression must be handled in
// an outer layer. The method below works more efficiently with compressed
// input.
std::istringstream iss(json);
core::CJsonStateRestoreTraverser traverser(iss);
return normalizerFromJson(traverser, normalizer);
}
bool CAnomalyScore::normalizerFromJson(core::CStateRestoreTraverser& traverser,
CNormalizer& normalizer) {
bool restoredNormalizer(false);
std::string restoredVersion(MISSING_VERSION_FORMAT_VERSION);
while (traverser.next()) {
const std::string& name = traverser.name();
if (name == MLVERSION_ATTRIBUTE) {
restoredVersion = traverser.value();
if (restoredVersion != CURRENT_FORMAT_VERSION) {
if (normalizer.isUpgradable(restoredVersion, CURRENT_FORMAT_VERSION)) {
LOG_DEBUG(<< "Restored quantiles JSON version is " << restoredVersion
<< "; current JSON version is " << CURRENT_FORMAT_VERSION
<< " - will upgrade quantiles");
} else {
// If the version has changed and the format is too different to
// even upgrade then start again from scratch - this counts as a
// successful load
LOG_INFO(<< "Restored quantiles JSON version is " << restoredVersion
<< "; current JSON version is " << CURRENT_FORMAT_VERSION
<< " - will restart quantiles from scratch");
return true;
}
}
} else if (name == NORMALIZER_TAG) {
restoredNormalizer = traverser.traverseSubLevel([&](auto& traverser_) {
return normalizer.acceptRestoreTraverser(traverser_);
});
if (!restoredNormalizer) {
LOG_ERROR(<< "Unable to restore quantiles to the normaliser");
}
}
}
if (restoredNormalizer && restoredVersion != CURRENT_FORMAT_VERSION) {
LOG_INFO(<< "Restored quantiles JSON version is " << restoredVersion << "; current JSON version is "
<< CURRENT_FORMAT_VERSION << " - will attempt upgrade");
if (normalizer.upgrade(restoredVersion, CURRENT_FORMAT_VERSION) == false) {
LOG_ERROR(<< "Failed to upgrade quantiles from version " << restoredVersion
<< " to version " << CURRENT_FORMAT_VERSION);
return false;
}
}
return restoredNormalizer;
}
void CAnomalyScore::normalizerToJson(const CNormalizer& normalizer,
const std::string& searchKey,
const std::string& cue,
const std::string& description,
core_t::TTime time,
std::ostream& strm) {
core::CJsonStatePersistInserter inserter(strm);
// Important: Even though some of these fields appear to be unnecessary,
// the CHierarchicalResultsNormalizer requires them to exist in this
// order. Do not change the fields or the ordering.
inserter.insertValue(MLCUE_ATTRIBUTE, cue);
inserter.insertValue(MLKEY_ATTRIBUTE, searchKey);
inserter.insertValue(MLQUANTILESDESCRIPTION_ATTRIBUTE, description);
inserter.insertValue(MLVERSION_ATTRIBUTE, CURRENT_FORMAT_VERSION);
inserter.insertValue(TIME_ATTRIBUTE, core::CStringUtils::typeToString(time));
inserter.insertLevel(NORMALIZER_TAG, std::bind(&CNormalizer::acceptPersistInserter,
&normalizer, std::placeholders::_1));
}
}
}