lib/model/CHierarchicalResultsAggregator.cc (348 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <model/CHierarchicalResultsAggregator.h> #include <core/CLogger.h> #include <core/CMemoryDef.h> #include <core/CPersistUtils.h> #include <core/CStatePersistInserter.h> #include <core/CStateRestoreTraverser.h> #include <core/RestoreMacros.h> #include <maths/common/CMathsFuncs.h> #include <maths/common/CTools.h> #include <model/CAnomalyDetectorModelConfig.h> #include <model/CAnomalyScore.h> #include <boost/container/flat_map.hpp> #include <boost/container/flat_set.hpp> #include <algorithm> #include <cmath> #include <string> #include <utility> #include <vector> namespace ml { namespace model { namespace { using TOptionalStr = CHierarchicalResults::TOptionalStr; using TOptionalStrOptionalStrPr = CHierarchicalResults::TOptionalStrOptionalStrPr; using TOptionalStrOptionalStrPrDoublePr = CHierarchicalResults::TOptionalStrOptionalStrPrDoublePr; using TOptionalStrOptionalStrPrDoublePrVec = CHierarchicalResults::TOptionalStrOptionalStrPrDoublePrVec; //! \brief Creates new detector equalizers. class CDetectorEqualizerFactory { public: CDetectorEqualizer make(const CHierarchicalResults::TNode&, bool) const { return CDetectorEqualizer(); } }; //! Compute the probability of \p influence. bool influenceProbability(const TOptionalStrOptionalStrPrDoublePrVec& influences, const TOptionalStr& influencerName, const TOptionalStr& influencerValue, double p, double& result) { TOptionalStrOptionalStrPr influence(influencerName, influencerValue); std::size_t k{static_cast<std::size_t>( std::lower_bound(influences.begin(), influences.end(), influence, maths::common::COrderings::SFirstLess()) - influences.begin())}; if (k < influences.size() && influences[k].first == influence) { result = influences[k].second == 1.0 ? p : std::exp(influences[k].second * maths::common::CTools::fastLog(p)); return true; } return false; } const core::CHashing::CMurmurHash2String HASHER; const std::string BUCKET_TAG("a"); const std::string INFLUENCER_BUCKET_TAG("b"); const std::string INFLUENCER_TAG("c"); const std::string PARTITION_TAG("d"); const std::string PERSON_TAG("e"); const std::string LEAF_TAG("f"); } // unnamed:: CHierarchicalResultsAggregator::CHierarchicalResultsAggregator(const CAnomalyDetectorModelConfig& modelConfig) : TBase(TDetectorEqualizer()), m_Job(E_NoOp), m_DecayRate(modelConfig.decayRate()), m_MaximumAnomalousProbability(modelConfig.maximumAnomalousProbability()) { this->refresh(modelConfig); } void CHierarchicalResultsAggregator::setJob(EJob job) { m_Job = job; } void CHierarchicalResultsAggregator::refresh(const CAnomalyDetectorModelConfig& modelConfig) { m_DecayRate = modelConfig.decayRate(); m_MaximumAnomalousProbability = modelConfig.maximumAnomalousProbability(); for (std::size_t i = 0; i < model_t::NUMBER_AGGREGATION_STYLES; ++i) { for (std::size_t j = 0; j < model_t::NUMBER_AGGREGATION_PARAMS; ++j) { m_Parameters[i][j] = modelConfig.aggregationStyleParam( static_cast<model_t::EAggregationStyle>(i), static_cast<model_t::EAggregationParam>(j)); } } } void CHierarchicalResultsAggregator::clear() { this->TBase::clear(); } void CHierarchicalResultsAggregator::visit(const CHierarchicalResults& /*results*/, const TNode& node, bool pivot) { if (isLeaf(node)) { this->aggregateLeaf(node); } else { this->aggregateNode(node, pivot); } } void CHierarchicalResultsAggregator::propagateForwardByTime(double time) { if (time < 0.0) { LOG_ERROR(<< "Can't propagate normalizer backwards in time"); return; } double factor{std::exp( -m_DecayRate * CDetectorEqualizer::largestProbabilityToCorrect() * time)}; this->age(std::bind(&TDetectorEqualizer::age, std::placeholders::_1, factor)); } void CHierarchicalResultsAggregator::acceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertLevel(BUCKET_TAG, std::bind(&TDetectorEqualizer::acceptPersistInserter, std::cref(this->bucketElement()), std::placeholders::_1)); core::CPersistUtils::persist(INFLUENCER_BUCKET_TAG, this->influencerBucketSet(), inserter); core::CPersistUtils::persist(INFLUENCER_TAG, this->influencerSet(), inserter); core::CPersistUtils::persist(PARTITION_TAG, this->partitionSet(), inserter); core::CPersistUtils::persist(PERSON_TAG, this->personSet(), inserter); core::CPersistUtils::persist(LEAF_TAG, this->leafSet(), inserter); } bool CHierarchicalResultsAggregator::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { do { const std::string& name = traverser.name(); RESTORE(BUCKET_TAG, traverser.traverseSubLevel(std::bind( &TDetectorEqualizer::acceptRestoreTraverser, std::ref(this->bucketElement()), std::placeholders::_1))) RESTORE(INFLUENCER_BUCKET_TAG, core::CPersistUtils::restore(INFLUENCER_BUCKET_TAG, this->influencerBucketSet(), traverser)); RESTORE(INFLUENCER_TAG, core::CPersistUtils::restore( INFLUENCER_TAG, this->influencerSet(), traverser)); RESTORE(PARTITION_TAG, core::CPersistUtils::restore( PARTITION_TAG, this->partitionSet(), traverser)); RESTORE(PERSON_TAG, core::CPersistUtils::restore(PERSON_TAG, this->personSet(), traverser)); RESTORE(LEAF_TAG, core::CPersistUtils::restore(LEAF_TAG, this->leafSet(), traverser)); } while (traverser.next()); return true; } std::uint64_t CHierarchicalResultsAggregator::checksum() const { std::uint64_t seed = static_cast<std::uint64_t>(m_DecayRate); seed = maths::common::CChecksum::calculate(seed, m_Parameters); seed = maths::common::CChecksum::calculate(seed, m_MaximumAnomalousProbability); return this->TBase::checksum(seed); } void CHierarchicalResultsAggregator::aggregateLeaf(const TNode& node) { if (isSimpleCount(node)) { return; } int detector{node.s_Detector}; double probability{node.probability()}; if (!maths::common::CMathsFuncs::isFinite(probability)) { probability = 1.0; } probability = maths::common::CTools::truncate( probability, maths::common::CTools::smallestProbability(), 1.0); this->correctProbability(node, false, detector, probability); model_t::EAggregationStyle style{isAttribute(node) ? model_t::E_AggregateAttributes : model_t::E_AggregatePeople}; node.s_AnnotatedProbability.s_Probability = probability; node.s_AggregationStyle = style; node.s_SmallestChildProbability = probability; node.s_SmallestDescendantProbability = probability; node.s_RawAnomalyScore = maths::common::CTools::anomalyScore(probability); } void CHierarchicalResultsAggregator::aggregateNode(const TNode& node, bool pivot) { LOG_TRACE(<< "node = " << node.print() << ", pivot = " << pivot); std::size_t numberDetectors; TIntSizePrDouble1VecUMap partition[N]; if (!this->partitionChildProbabilities(node, pivot, numberDetectors, partition)) { return; } LOG_TRACE(<< "partition = " << partition); int detector; int aggregation; TDouble1Vec detectorProbabilities; this->detectorProbabilities(node, pivot, numberDetectors, partition, detector, aggregation, detectorProbabilities); LOG_TRACE(<< "detector = " << detector << ", aggregation = " << aggregation << ", detector probabilities = " << detectorProbabilities); const double* params{m_Parameters[model_t::E_AggregateDetectors]}; CAnomalyScore::compute( params[model_t::E_JointProbabilityWeight], params[model_t::E_ExtremeProbabilityWeight], static_cast<std::size_t>(params[model_t::E_MinExtremeSamples]), static_cast<std::size_t>(params[model_t::E_MaxExtremeSamples]), m_MaximumAnomalousProbability, detectorProbabilities, node.s_RawAnomalyScore, node.s_AnnotatedProbability.s_Probability); node.s_Detector = detector; node.s_AggregationStyle = aggregation; LOG_TRACE(<< "probability = " << node.probability()); } bool CHierarchicalResultsAggregator::partitionChildProbabilities( const TNode& node, bool pivot, std::size_t& numberDetectors, TIntSizePrDouble1VecUMap (&partition)[N]) { using TSizeFSet = boost::container::flat_set<std::size_t>; using TMinAccumulator = maths::common::CBasicStatistics::SMin<double>::TAccumulator; for (std::size_t i = 0; i < N; ++i) { partition[i].reserve(node.s_Children.size()); } bool haveResult{false}; TSizeFSet detectors; TMinAccumulator pMinChild; TMinAccumulator pMinDescendent; for (const auto& child : node.s_Children) { if (isSimpleCount(*child)) { continue; } double probability{child->probability()}; std::size_t key{0}; if (pivot && !isRoot(node) && !influenceProbability(child->s_AnnotatedProbability.s_Influences, node.s_Spec.s_PersonFieldName, node.s_Spec.s_PersonFieldValue, probability, probability)) { LOG_ERROR(<< "Couldn't find influence for " << child->print()); continue; } else { key = this->hash(*child); } haveResult = true; pMinChild.add(probability); if (isTypeForWhichWeWriteResults(*child, pivot)) { pMinDescendent.add(probability); } pMinDescendent.add(child->s_SmallestDescendantProbability); model_t::EAggregationStyle style{ static_cast<model_t::EAggregationStyle>(child->s_AggregationStyle)}; switch (style) { case model_t::E_AggregatePeople: case model_t::E_AggregateAttributes: detectors.insert(child->s_Detector); partition[style][{child->s_Detector, key}].push_back(probability); break; case model_t::E_AggregateDetectors: LOG_ERROR(<< "Unexpected aggregation style for " << child->print()); continue; } } if (haveResult) { node.s_SmallestChildProbability = maths::common::CTools::truncate( pMinChild[0], maths::common::CTools::smallestProbability(), 1.0); node.s_SmallestDescendantProbability = maths::common::CTools::truncate( pMinDescendent[0], maths::common::CTools::smallestProbability(), 1.0); } numberDetectors = detectors.size(); LOG_TRACE(<< "detector = " << detectors); return haveResult; } void CHierarchicalResultsAggregator::detectorProbabilities( const TNode& node, bool pivot, std::size_t numberDetectors, const TIntSizePrDouble1VecUMap (&partition)[N], int& detector, int& aggregation, TDouble1Vec& probabilities) { using TIntDouble1VecFMap = boost::container::flat_map<int, TDouble1Vec>; int fallback{static_cast<int>(model_t::E_AggregatePeople)}; detector = -3; aggregation = (pivot || isPartition(node) || (isPopulation(node) && isPerson(node))) ? fallback : -1; TIntDouble1VecFMap detectorProbabilities; detectorProbabilities.reserve(numberDetectors); for (int i = 0; i < static_cast<int>(N); ++i) { const double* params{m_Parameters[i]}; for (const auto& subset : partition[i]) { int detector_{subset.first.first}; double probability; if (subset.second.size() == 1) { probability = subset.second[0]; } else { double rawAnomalyScore; CAnomalyScore::compute( params[model_t::E_JointProbabilityWeight], params[model_t::E_ExtremeProbabilityWeight], static_cast<std::size_t>(params[model_t::E_MinExtremeSamples]), static_cast<std::size_t>(params[model_t::E_MaxExtremeSamples]), m_MaximumAnomalousProbability, subset.second, rawAnomalyScore, probability); } if (!maths::common::CMathsFuncs::isFinite(probability)) { probability = 1.0; } detectorProbabilities[detector_].push_back(probability); switch (detector) { case -3: detector = detector_; /*first value we've seen*/ break; case -2: /*we have a mix of detectors*/ break; default: detector = (detector != detector_ ? -2 : detector_); break; } switch (aggregation) { case -1: aggregation = i; /*first value we've seen*/ break; default: aggregation = (aggregation != i ? fallback : i); break; } } } probabilities.reserve(numberDetectors); for (const auto& dp : detectorProbabilities) { double probability{dp.second[0]}; if (dp.second.size() > 1) { const double* params{m_Parameters[model_t::E_AggregatePeople]}; double rawAnomalyScore; CAnomalyScore::compute( params[model_t::E_JointProbabilityWeight], params[model_t::E_ExtremeProbabilityWeight], static_cast<std::size_t>(params[model_t::E_MinExtremeSamples]), static_cast<std::size_t>(params[model_t::E_MaxExtremeSamples]), m_MaximumAnomalousProbability, dp.second, rawAnomalyScore, probability); } probabilities.push_back(this->correctProbability(node, pivot, dp.first, probability)); } } std::size_t CHierarchicalResultsAggregator::hash(const TNode& node) const { std::size_t result{HASHER(node.s_Spec.s_PartitionFieldValue)}; if (node.s_Spec.s_IsPopulation) { boost::hash_combine(result, HASHER(node.s_Spec.s_PersonFieldValue)); } return result; } double CHierarchicalResultsAggregator::correctProbability(const TNode& node, bool pivot, int detector, double probability) { using TMaxAccumulator = maths::common::CBasicStatistics::SMax<double>::TAccumulator; if (probability < CDetectorEqualizer::largestProbabilityToCorrect()) { TDetectorEqualizerPtrVec equalizers; this->elements(node, pivot, CDetectorEqualizerFactory{}, equalizers); TMaxAccumulator corrected; for (auto& equalizer : equalizers) { switch (m_Job) { case E_UpdateAndCorrect: equalizer->add(detector, probability); corrected.add(equalizer->correct(detector, probability)); break; case E_Correct: corrected.add(equalizer->correct(detector, probability)); break; case E_NoOp: break; } } if (corrected.count() > 0) { probability = corrected[0]; } } return probability; } } }