lib/model/CHierarchicalResultsNormalizer.cc (398 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CHierarchicalResultsNormalizer.h>
#include <core/CBase64Filter.h>
#include <core/CJsonStateRestoreTraverser.h>
#include <core/CStringUtils.h>
#include <maths/common/CTools.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <boost/iostreams/filter/gzip.hpp>
#include <boost/iostreams/filtering_stream.hpp>
#include <sstream>
namespace ml {
namespace model {
namespace {
// The bucket cue is "sysChange" for historical reasons. Do NOT tidy this up
// unless you fully understand the implications. In particular existing
// persisted quantiles will all need to be regenerated.
const std::string BUCKET_CUE("sysChange");
const std::string INFLUENCER_BUCKET_CUE_PREFIX("inflb");
const std::string INFLUENCER_CUE_PREFIX("infl");
const std::string PARTITION_CUE_PREFIX("part");
const std::string PERSON_CUE_PREFIX("per");
const std::string LEAF_CUE_PREFIX("leaf");
}
namespace hierarchical_results_normalizer_detail {
SNormalizer::SNormalizer(const std::string& description, const TNormalizerPtr& normalizer)
: s_Description(description), s_Normalizer(normalizer) {
}
void SNormalizer::clear() {
s_Normalizer->clear();
}
void SNormalizer::propagateForwardByTime(double time) {
s_Normalizer->propagateForwardByTime(time);
}
std::uint64_t SNormalizer::checksum() const {
std::uint64_t seed = maths::common::CChecksum::calculate(0, s_Description);
return maths::common::CChecksum::calculate(seed, s_Normalizer);
}
void SNormalizer::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("SNormalizer Memory Usage");
core::memory_debug::dynamicSize("s_Description", s_Description, mem);
core::memory_debug::dynamicSize("s_Normalizer", s_Normalizer, mem);
}
std::size_t SNormalizer::memoryUsage() const {
std::size_t mem = 0;
mem += core::memory::dynamicSize(s_Description);
mem += core::memory::dynamicSize(s_Normalizer);
return mem;
}
}
void CHierarchicalResultsNormalizer::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName(" Hierarchical Results Normalizer Memory Usage");
this->CHierarchicalResultsLevelSet::debugMemoryUsage(mem->addChild());
}
std::size_t CHierarchicalResultsNormalizer::memoryUsage() const {
return this->CHierarchicalResultsLevelSet::memoryUsage();
}
std::size_t CHierarchicalResultsNormalizer::staticSize() const {
return sizeof(*this);
}
void CHierarchicalResultsNormalizer::updateModelSizeStats(
CResourceMonitor::SModelSizeStats& /*modelSizeStats*/) const {
// do nothing
}
CHierarchicalResultsNormalizer::CHierarchicalResultsNormalizer(CLimits& limits,
const CAnomalyDetectorModelConfig& modelConfig)
: TBase(TNormalizer(std::string(),
std::make_shared<CAnomalyScore::CNormalizer>(modelConfig))),
m_Limits(limits), m_ModelConfig(modelConfig) {
limits.resourceMonitor().registerComponent(*this);
}
CHierarchicalResultsNormalizer::~CHierarchicalResultsNormalizer() {
m_Limits.resourceMonitor().unRegisterComponent(*this); // NOSONAR
}
void CHierarchicalResultsNormalizer::setJob(EJob job) {
m_Job = job;
}
void CHierarchicalResultsNormalizer::clear() {
this->TBase::clear();
m_HasLastUpdateCausedBigChange = false;
}
void CHierarchicalResultsNormalizer::resetBigChange() {
m_HasLastUpdateCausedBigChange = false;
}
void CHierarchicalResultsNormalizer::visit(const CHierarchicalResults& /*results*/,
const TNode& node,
bool pivot) {
TNormalizerPtrVec normalizers;
this->elements(node, pivot, CNormalizerFactory{m_ModelConfig}, normalizers);
if (normalizers.empty()) {
return;
}
// We need to reset this flag if the normalizer is to be used
// for results for members of a population analysis.
// This has to use the deviation of the probability rather than
// the anomaly score stored on the bucket because the later is
// scaled so that it sums to the bucket anomaly score.
double score = node.probability() > m_ModelConfig.maximumAnomalousProbability()
? 0.0
: maths::common::CTools::anomalyScore(node.probability());
CAnomalyScore::CNormalizer::CMaximumScoreScope scope{
node.s_Spec.s_PartitionFieldName, node.s_Spec.s_PartitionFieldValue,
node.s_Spec.s_PersonFieldName, node.s_Spec.s_PersonFieldValue};
switch (m_Job) {
case E_RefreshSettings:
for (auto& normalizer : normalizers) {
normalizer->s_Normalizer->isForMembersOfPopulation(isMemberOfPopulation(node));
}
break;
case E_UpdateQuantiles:
if (node.s_AnnotatedProbability.s_ShouldUpdateQuantiles == false) {
LOG_TRACE(<< "NOT Updating quantiles for " << node.s_Spec.print()
<< ", scope = '" << scope.print() << "', score = " << score);
return;
}
for (auto& normalizer : normalizers) {
m_HasLastUpdateCausedBigChange |=
normalizer->s_Normalizer->updateQuantiles(scope, score);
}
break;
case E_NormalizeScores:
// Normalize with the lowest suitable normalizer.
if (normalizers[0]->s_Normalizer->normalize(scope, score) == false) {
LOG_ERROR(<< "Failed to normalize " << score << " for "
<< node.s_Spec.print());
}
node.s_NormalizedAnomalyScore = score;
break;
case E_NoOp:
LOG_ERROR(<< "Calling normalize without setting job");
break;
}
}
void CHierarchicalResultsNormalizer::propagateForwardByTime(double time) {
if (time < 0.0) {
LOG_ERROR(<< "Can't propagate normalizer backwards in time");
return;
}
this->age(std::bind(&TNormalizer::propagateForwardByTime, std::placeholders::_1, time));
}
bool CHierarchicalResultsNormalizer::hasLastUpdateCausedBigChange() const {
return m_HasLastUpdateCausedBigChange;
}
void CHierarchicalResultsNormalizer::toJson(core_t::TTime time,
const std::string& key,
std::string& json,
bool makeArray) const {
std::ostringstream compressedStream;
using TFilteredOutput = boost::iostreams::filtering_stream<boost::iostreams::output>;
{
TFilteredOutput outFilter;
outFilter.push(boost::iostreams::gzip_compressor());
outFilter.push(core::CBase64Encoder());
outFilter.push(compressedStream);
if (makeArray) {
outFilter << '[';
}
for (std::size_t i = 0; i < this->leafSet().size(); ++i) {
const TWord& word = this->leafSet()[i].first;
const TNormalizer& normalizer = this->leafSet()[i].second;
CAnomalyScore::normalizerToJson(*normalizer.s_Normalizer, key, leafCue(word),
normalizer.s_Description, time, outFilter);
outFilter << ',';
}
for (std::size_t i = 0; i < this->personSet().size(); ++i) {
const TWord& word = this->personSet()[i].first;
const TNormalizer& normalizer = this->personSet()[i].second;
CAnomalyScore::normalizerToJson(*normalizer.s_Normalizer, key,
personCue(word), normalizer.s_Description,
time, outFilter);
outFilter << ',';
}
for (std::size_t i = 0; i < this->partitionSet().size(); ++i) {
const TWord& word = this->partitionSet()[i].first;
const TNormalizer& normalizer = this->partitionSet()[i].second;
CAnomalyScore::normalizerToJson(*normalizer.s_Normalizer, key,
partitionCue(word), normalizer.s_Description,
time, outFilter);
outFilter << ',';
}
for (std::size_t i = 0; i < this->influencerSet().size(); ++i) {
const TWord& word = this->influencerSet()[i].first;
const TNormalizer& normalizer = this->influencerSet()[i].second;
CAnomalyScore::normalizerToJson(*normalizer.s_Normalizer, key,
influencerCue(word),
normalizer.s_Description, time, outFilter);
outFilter << ',';
}
for (std::size_t i = 0; i < this->influencerBucketSet().size(); ++i) {
const TWord& word = this->influencerBucketSet()[i].first;
const TNormalizer& normalizer = this->influencerBucketSet()[i].second;
CAnomalyScore::normalizerToJson(*normalizer.s_Normalizer, key,
influencerBucketCue(word),
normalizer.s_Description, time, outFilter);
outFilter << ',';
}
// Put the bucket normalizer last so that incomplete restorations can be
// detected by checking whether the bucket normalizer is restored
CAnomalyScore::normalizerToJson(*this->bucketElement().s_Normalizer, key,
bucketCue(), "root", time, outFilter);
if (makeArray) {
outFilter << ']';
}
}
json = compressedStream.str();
}
CHierarchicalResultsNormalizer::ERestoreOutcome
CHierarchicalResultsNormalizer::fromJsonStream(std::istream& inputStream) {
bool isBucketNormalizerRestored = false;
this->TBase::clear();
// The state may be compressed or uncompressed. We can distinguish because
// the first character of a base64 encoded zlib compressed stream will never
// be a bracket.
using TFilteredInput = boost::iostreams::filtering_stream<boost::iostreams::input>;
TFilteredInput filteredInput;
std::istream* streamToTraverse = nullptr;
switch (inputStream.peek()) {
case EOF:
LOG_DEBUG(<< "No normalizer state to restore");
// this is not an error
return E_Ok;
case '[':
case '{':
LOG_DEBUG(<< "Detected uncompressed normalizer state");
streamToTraverse = &inputStream;
break;
default:
LOG_DEBUG(<< "Detected compressed normalizer state");
filteredInput.push(boost::iostreams::gzip_decompressor());
filteredInput.push(core::CBase64Decoder{});
filteredInput.push(inputStream);
streamToTraverse = &filteredInput;
break;
}
core::CJsonStateRestoreTraverser traverser(*streamToTraverse);
do {
// Call name() to prime the traverser if it hasn't started
if (traverser.name().empty()) {
if (traverser.isEof()) {
LOG_DEBUG(<< "No normalizer state to restore");
// this is not an error
return E_Ok;
}
}
// The MLCUE_ATTRIBUTE should always be the first field
if (traverser.name() != CAnomalyScore::MLCUE_ATTRIBUTE) {
if (!traverser.next()) {
LOG_INFO(<< "No normalizer state to restore");
return E_Ok;
}
LOG_ERROR(<< "Expected " << CAnomalyScore::MLCUE_ATTRIBUTE << " field in quantiles JSON got "
<< traverser.name() << " = " << traverser.value());
return E_Corrupt;
}
const std::string cue(traverser.value());
if (cue == BUCKET_CUE) {
if (CAnomalyScore::normalizerFromJson(
traverser, *this->bucketElement().s_Normalizer) == false) {
LOG_ERROR(<< "Unable to restore bucket normalizer");
return E_Corrupt;
}
isBucketNormalizerRestored = true;
} else {
TWordNormalizerPrVec* normalizerVec = nullptr;
TDictionary::TUInt64Array hashArray;
if (!this->parseCue(cue, normalizerVec, hashArray)) {
return E_Corrupt;
}
if (normalizerVec != nullptr) {
if (!traverser.next()) {
LOG_ERROR(<< "Cannot restore hierarchical normalizer - end of object reached when "
<< CAnomalyScore::MLKEY_ATTRIBUTE << " was expected");
return E_Corrupt;
}
if (!traverser.next()) {
LOG_ERROR(<< "Cannot restore hierarchical normalizer - end of object reached when "
<< CAnomalyScore::MLQUANTILESDESCRIPTION_ATTRIBUTE
<< " was expected");
return E_Corrupt;
}
if (traverser.name() != CAnomalyScore::MLQUANTILESDESCRIPTION_ATTRIBUTE) {
LOG_ERROR(<< "Cannot restore hierarchical normalizer - "
<< CAnomalyScore::MLQUANTILESDESCRIPTION_ATTRIBUTE << " element expected but found "
<< traverser.name() << '=' << traverser.value());
return E_Corrupt;
}
std::string quantileDesc(traverser.value());
auto normalizer = std::make_shared<CAnomalyScore::CNormalizer>(m_ModelConfig);
normalizerVec->emplace_back(TWord(hashArray),
TNormalizer(quantileDesc, normalizer));
if (CAnomalyScore::normalizerFromJson(traverser, *normalizer) == false) {
LOG_ERROR(<< "Unable to restore normalizer with cue " << cue);
return E_Corrupt;
}
}
}
} while (traverser.nextObject());
this->sort();
LOG_DEBUG(<< this->influencerBucketSet().size() << " influencer bucket normalizers, "
<< this->influencerSet().size() << " influencer normalizers, "
<< this->partitionSet().size() << " partition normalizers, "
<< this->personSet().size() << " person normalizers and "
<< this->leafSet().size() << " leaf normalizers restored from JSON stream");
return isBucketNormalizerRestored ? E_Ok : E_Incomplete;
}
const CAnomalyScore::CNormalizer& CHierarchicalResultsNormalizer::bucketNormalizer() const {
return *this->bucketElement().s_Normalizer;
}
const CAnomalyScore::CNormalizer*
CHierarchicalResultsNormalizer::influencerBucketNormalizer(const std::string& influencerFieldName) const {
const TNormalizer* normalizer = this->influencerBucketElement(influencerFieldName);
return normalizer ? normalizer->s_Normalizer.get() : nullptr;
}
const CAnomalyScore::CNormalizer*
CHierarchicalResultsNormalizer::influencerNormalizer(const std::string& influencerFieldName) const {
const TNormalizer* normalizer = this->influencerElement(influencerFieldName);
return normalizer ? normalizer->s_Normalizer.get() : nullptr;
}
const CAnomalyScore::CNormalizer*
CHierarchicalResultsNormalizer::partitionNormalizer(const std::string& partitionFieldName) const {
const TNormalizer* normalizer = this->partitionElement(partitionFieldName);
return normalizer ? normalizer->s_Normalizer.get() : nullptr;
}
const CAnomalyScore::CNormalizer*
CHierarchicalResultsNormalizer::leafNormalizer(const std::string& partitionFieldName,
const std::string& personFieldName,
const std::string& functionName,
const std::string& valueFieldName) const {
const TNormalizer* normalizer = this->leafElement(
partitionFieldName, personFieldName, functionName, valueFieldName);
return normalizer ? normalizer->s_Normalizer.get() : nullptr;
}
bool CHierarchicalResultsNormalizer::isMemberOfPopulation(const TNode& node,
std::function<bool(const TNode&)> test) {
if (isPopulation(node)) {
return true;
}
// Check whether node has a distinct person field name and value
// and if it does whether these match any of its descendant results
// which are from a population analysis. Note that test is only null
// the first time this function is invoked for the node under test.
// So the person field name and value in the lambda are set to the
// values for this node.
// Initialise these string references here as they are used by the
// "test" lambda, in this scope, below.
const std::string& personName = node.s_Spec.s_PersonFieldName.value_or("");
const std::string& personValue = node.s_Spec.s_PersonFieldValue.value_or("");
if (test == nullptr) {
if (personName.empty() || personValue.empty()) {
return false;
}
test = [&personName, &personValue](const TNode& child) {
return isPopulation(child) && child.s_Spec.s_PersonFieldName == personName &&
child.s_Spec.s_PersonFieldValue == personValue;
};
}
for (const auto& child : node.s_Children) {
if (test(*child) || (isLeaf(*child) == false && isMemberOfPopulation(*child))) {
return true;
}
}
return false;
}
bool CHierarchicalResultsNormalizer::parseCue(const std::string& cue,
TWordNormalizerPrVec*& normalizers,
TDictionary::TUInt64Array& hashArray) {
normalizers = nullptr;
std::size_t hashStartPos = 0;
if (cue.compare(0, INFLUENCER_BUCKET_CUE_PREFIX.length(),
INFLUENCER_BUCKET_CUE_PREFIX) == 0) {
normalizers = &this->influencerBucketSet();
hashStartPos = INFLUENCER_BUCKET_CUE_PREFIX.length();
} else if (cue.compare(0, INFLUENCER_CUE_PREFIX.length(), INFLUENCER_CUE_PREFIX) == 0) {
normalizers = &this->influencerSet();
hashStartPos = INFLUENCER_CUE_PREFIX.length();
} else if (cue.compare(0, PARTITION_CUE_PREFIX.length(), PARTITION_CUE_PREFIX) == 0) {
normalizers = &this->partitionSet();
hashStartPos = PARTITION_CUE_PREFIX.length();
} else if (cue.compare(0, PERSON_CUE_PREFIX.length(), PERSON_CUE_PREFIX) == 0) {
normalizers = &this->personSet();
hashStartPos = PERSON_CUE_PREFIX.length();
} else if (cue.compare(0, LEAF_CUE_PREFIX.length(), LEAF_CUE_PREFIX) == 0) {
normalizers = &this->leafSet();
hashStartPos = LEAF_CUE_PREFIX.length();
} else {
LOG_WARN(<< "Did not understand normalizer cue " << cue);
return true;
}
LOG_TRACE(<< "cue = " << cue << ", hash = " << cue.substr(hashStartPos));
if (core::CStringUtils::stringToType(cue.substr(hashStartPos), hashArray[0]) == false) {
LOG_ERROR(<< "Unable to parse normalizer hash from cue " << cue
<< " starting at position " << hashStartPos);
return false;
}
return true;
}
const std::string& CHierarchicalResultsNormalizer::bucketCue() {
return BUCKET_CUE;
}
std::string CHierarchicalResultsNormalizer::influencerBucketCue(const TWord& word) {
return INFLUENCER_BUCKET_CUE_PREFIX + core::CStringUtils::typeToString(word.hash64());
}
std::string CHierarchicalResultsNormalizer::influencerCue(const TWord& word) {
return INFLUENCER_CUE_PREFIX + core::CStringUtils::typeToString(word.hash64());
}
std::string CHierarchicalResultsNormalizer::partitionCue(const TWord& word) {
return PARTITION_CUE_PREFIX + core::CStringUtils::typeToString(word.hash64());
}
std::string CHierarchicalResultsNormalizer::personCue(const TWord& word) {
return PERSON_CUE_PREFIX + core::CStringUtils::typeToString(word.hash64());
}
std::string CHierarchicalResultsNormalizer::leafCue(const TWord& word) {
return LEAF_CUE_PREFIX + core::CStringUtils::typeToString(word.hash64());
}
CHierarchicalResultsNormalizer::CNormalizerFactory::CNormalizerFactory(const CAnomalyDetectorModelConfig& modelConfig)
: m_ModelConfig{modelConfig} {
}
CHierarchicalResultsNormalizer::TNormalizer
CHierarchicalResultsNormalizer::CNormalizerFactory::make(const TNode& node, bool) const {
return {node.s_Spec.s_PartitionFieldName.value_or("") + ' ' +
node.s_Spec.s_PersonFieldName.value_or("") + ' ' +
node.s_Spec.s_FunctionName.value_or("") + ' ' +
node.s_Spec.s_ValueFieldName.value_or(""),
std::make_shared<CAnomalyScore::CNormalizer>(m_ModelConfig)};
}
}
}