lib/model/CAnomalyDetectorModel.cc (583 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <model/CAnomalyDetectorModel.h> #include <core/CAllocationStrategy.h> #include <core/CLogger.h> #include <core/CMemoryDef.h> #include <core/CStatePersistInserter.h> #include <core/CStateRestoreTraverser.h> #include <core/CTimeUtils.h> #include <maths/common/CChecksum.h> #include <maths/common/CMultivariatePrior.h> #include <maths/common/COrderings.h> #include <maths/common/CRestoreParams.h> #include <maths/time_series/CModelStateSerialiser.h> #include <model/CDataGatherer.h> #include <model/CDetectionRule.h> #include <model/CHierarchicalResults.h> #include <model/CMemoryUsageEstimator.h> #include <model/CPartitioningFields.h> #include <model/CProbabilityAndInfluenceCalculator.h> #include <model/CResourceMonitor.h> #include <model/CSearchKey.h> #include <model/FrequencyPredicates.h> #include <model/ModelTypes.h> #include <model/SModelParams.h> #include <boost/unordered_set.hpp> #include <algorithm> namespace ml { namespace model { namespace { const std::string MODEL_TAG{"a"}; const std::string EMPTY; const model_t::CResultType SKIP_SAMPLING_RESULT_TYPE; const double SKIP_SAMPLING_WEIGHT{0.005}; const core_t::TTime APPLIED_DETECTION_RULE_EXPIRATION{31536000}; // 1 year const CAnomalyDetectorModel::TStr1Vec EMPTY_STRING_LIST; bool checkRules(const SModelParams::TDetectionRuleVec& detectionRules, const CAnomalyDetectorModel& model, model_t::EFeature feature, CDetectionRule::ERuleAction action, const model_t::CResultType& resultType, std::size_t pid, std::size_t cid, core_t::TTime time) { bool isIgnored{false}; for (auto& rule : detectionRules) { isIgnored = isIgnored || rule.apply(action, model, feature, resultType, pid, cid, time); } return isIgnored; } bool checkScheduledEvents(const SModelParams::TStrDetectionRulePrVec& scheduledEvents, const CAnomalyDetectorModel& model, model_t::EFeature feature, CDetectionRule::ERuleAction action, const model_t::CResultType& resultType, std::size_t pid, std::size_t cid, core_t::TTime time) { bool isIgnored{false}; for (auto& event : scheduledEvents) { isIgnored = isIgnored || event.second.apply(action, model, feature, resultType, pid, cid, time); } return isIgnored; } void callbackRules(const SModelParams::TDetectionRuleVec& detectionRules, CAnomalyDetectorModel& model, core_t::TTime time) { for (const auto& rule : detectionRules) { rule.executeCallback(model, time); } } void callbackScheduledEvents(const SModelParams::TStrDetectionRulePrVec& scheduledEvents, CAnomalyDetectorModel& model, core_t::TTime time) { for (const auto& scheduledEvent : scheduledEvents) { scheduledEvent.second.executeCallback(model, time); } } } CAnomalyDetectorModel::CAnomalyDetectorModel(const SModelParams& params, const TDataGathererPtr& dataGatherer, const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators) : m_Params(params), m_DataGatherer(dataGatherer), m_BucketCount(0.0), m_InfluenceCalculators(influenceCalculators) { if (!m_DataGatherer) { LOG_ABORT(<< "Must provide a data gatherer"); } for (auto& calculators : m_InfluenceCalculators) { std::sort(calculators.begin(), calculators.end(), maths::common::COrderings::SFirstLess()); } } CAnomalyDetectorModel::CAnomalyDetectorModel(bool isForPersistence, const CAnomalyDetectorModel& other) : // The copy of m_DataGatherer is a shallow copy. This would be unacceptable // if we were going to persist the data gatherer from within this class. // We don't, so that's OK, but the next issue is that another thread will be // modifying the data gatherer m_DataGatherer points to whilst this object // is being persisted. Therefore, persistence must only call methods on the // data gatherer that are invariant. m_Params(other.m_Params), m_DataGatherer(other.m_DataGatherer), m_PersonBucketCounts(other.m_PersonBucketCounts), m_BucketCount(other.m_BucketCount) { if (!isForPersistence) { LOG_ABORT(<< "This constructor only creates clones for persistence"); } } std::string CAnomalyDetectorModel::description() const { return m_DataGatherer->description(); } const std::string& CAnomalyDetectorModel::personName(std::size_t pid) const { return m_DataGatherer->personName(pid, core::CStringUtils::typeToString(pid)); } const std::string& CAnomalyDetectorModel::personName(std::size_t pid, const std::string& fallback) const { return m_DataGatherer->personName(pid, fallback); } std::string CAnomalyDetectorModel::printPeople(const TSizeVec& pids, std::size_t limit) const { if (pids.empty()) { return std::string(); } if (limit == 0) { return core::CStringUtils::typeToString(pids.size()) + " in total"; } std::string result{this->personName(pids[0])}; for (std::size_t i = 1; i < std::min(limit, pids.size()); ++i) { result += ' '; result += this->personName(pids[i]); } if (limit < pids.size()) { result += " and "; result += core::CStringUtils::typeToString(pids.size() - limit); result += " others"; } return result; } std::size_t CAnomalyDetectorModel::numberOfPeople() const { return m_DataGatherer->numberActivePeople(); } const std::string& CAnomalyDetectorModel::attributeName(std::size_t cid) const { return m_DataGatherer->attributeName(cid, core::CStringUtils::typeToString(cid)); } const std::string& CAnomalyDetectorModel::attributeName(std::size_t cid, const std::string& fallback) const { return m_DataGatherer->attributeName(cid, fallback); } std::string CAnomalyDetectorModel::printAttributes(const TSizeVec& cids, std::size_t limit) const { if (cids.empty()) { return std::string(); } if (limit == 0) { return core::CStringUtils::typeToString(cids.size()) + " in total"; } std::string result{this->attributeName(cids[0])}; for (std::size_t i = 1; i < std::min(limit, cids.size()); ++i) { result += ' '; result += this->attributeName(cids[i]); } if (limit < cids.size()) { result += " and "; result += core::CStringUtils::typeToString(cids.size() - limit); result += " others"; } return result; } void CAnomalyDetectorModel::sample(core_t::TTime startTime, core_t::TTime endTime, CResourceMonitor& /*resourceMonitor*/) { using TSizeUSet = boost::unordered_set<std::size_t>; callbackScheduledEvents(this->params().s_ScheduledEvents.get(), std::ref(*this), startTime); callbackRules(this->params().s_DetectionRules.get(), std::ref(*this), startTime); const CDataGatherer& gatherer{this->dataGatherer()}; core_t::TTime bucketLength{this->bucketLength()}; for (core_t::TTime time = startTime; time < endTime; time += bucketLength) { const auto& counts = gatherer.bucketCounts(time); TSizeUSet uniquePeople; for (const auto& count : counts) { std::size_t pid{CDataGatherer::extractPersonId(count)}; if (uniquePeople.insert(pid).second) { m_PersonBucketCounts[pid] += 1.0; } } m_BucketCount += 1.0; double alpha{std::exp(-this->params().s_DecayRate)}; for (std::size_t pid = 0; pid < m_PersonBucketCounts.size(); ++pid) { m_PersonBucketCounts[pid] *= alpha; } m_BucketCount *= alpha; } } void CAnomalyDetectorModel::skipSampling(core_t::TTime endTime) { CDataGatherer& gatherer{this->dataGatherer()}; core_t::TTime startTime{gatherer.earliestBucketStartTime()}; if (!gatherer.validateSampleTimes(startTime, endTime)) { return; } gatherer.skipSampleNow(endTime); this->doSkipSampling(startTime, endTime); this->currentBucketStartTime(endTime - gatherer.bucketLength()); } bool CAnomalyDetectorModel::addResults(core_t::TTime startTime, core_t::TTime endTime, std::size_t numberAttributeProbabilities, CHierarchicalResults& results) const { TSizeVec personIds; if (!this->bucketStatsAvailable(startTime)) { LOG_TRACE(<< "No stats available for time " << startTime); return false; } this->currentBucketPersonIds(startTime, personIds); LOG_TRACE(<< "Outputting results for " << personIds.size() << " people"); CPartitioningFields partitioningFields(m_DataGatherer->partitionFieldName(), m_DataGatherer->partitionFieldValue()); partitioningFields.add(m_DataGatherer->personFieldName(), EMPTY); for (auto pid : personIds) { if (this->category() == model_t::E_Counting) { SAnnotatedProbability annotatedProbability; this->computeProbability(pid, startTime, endTime, partitioningFields, numberAttributeProbabilities, annotatedProbability); results.addSimpleCountResult(annotatedProbability, this, startTime); } else { LOG_TRACE(<< "AddResult, for time [" << startTime << "," << endTime << ")"); partitioningFields.back().second = std::cref(this->personName(pid)); std::for_each(m_DataGatherer->beginInfluencers(), m_DataGatherer->endInfluencers(), [&results](const std::string& influencer) { results.addInfluencer(influencer); }); SAnnotatedProbability annotatedProbability; annotatedProbability.s_ResultType = results.resultType(); if (this->computeProbability(pid, startTime, endTime, partitioningFields, numberAttributeProbabilities, annotatedProbability)) { function_t::EFunction function{m_DataGatherer->function()}; results.addModelResult( m_DataGatherer->searchKey().detectorIndex(), this->isPopulation(), function_t::name(function), function, m_DataGatherer->partitionFieldName(), m_DataGatherer->partitionFieldValue(), m_DataGatherer->personFieldName(), this->personName(pid), m_DataGatherer->valueFieldName(), annotatedProbability, this, startTime); } } } return true; } std::size_t CAnomalyDetectorModel::defaultPruneWindow() const { // The longest we'll consider keeping priors for is 1M buckets. double decayRate{this->params().s_DecayRate}; double factor{this->params().s_PruneWindowScaleMaximum}; return (decayRate == 0.0) ? MAXIMUM_PERMITTED_AGE : std::min(static_cast<std::size_t>(factor / decayRate), MAXIMUM_PERMITTED_AGE); } std::size_t CAnomalyDetectorModel::minimumPruneWindow() const { double decayRate{this->params().s_DecayRate}; double factor{this->params().s_PruneWindowScaleMinimum}; return (decayRate == 0.0) ? MAXIMUM_PERMITTED_AGE : std::min(static_cast<std::size_t>(factor / decayRate), MAXIMUM_PERMITTED_AGE); } void CAnomalyDetectorModel::prune() { this->prune(this->defaultPruneWindow()); } std::uint64_t CAnomalyDetectorModel::checksum(bool /*includeCurrentBucketStats*/) const { using TStrCRefUInt64Map = std::map<TStrCRef, std::uint64_t, maths::common::COrderings::SLess>; std::uint64_t seed{m_DataGatherer->checksum()}; seed = maths::common::CChecksum::calculate(seed, m_Params); seed = maths::common::CChecksum::calculate(seed, m_BucketCount); TStrCRefUInt64Map hashes; for (std::size_t pid = 0; pid < m_PersonBucketCounts.size(); ++pid) { if (m_DataGatherer->isPersonActive(pid)) { std::uint64_t& hash{hashes[std::cref(m_DataGatherer->personName(pid))]}; hash = maths::common::CChecksum::calculate(hash, m_PersonBucketCounts[pid]); } } seed = maths::common::CChecksum::calculate(seed, m_AppliedRuleChecksums); LOG_TRACE(<< "seed = " << seed); LOG_TRACE(<< "checksums = " << hashes); return maths::common::CChecksum::calculate(seed, hashes); } void CAnomalyDetectorModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CAnomalyDetectorModel"); core::memory_debug::dynamicSize("m_DataGatherer", m_DataGatherer, mem); core::memory_debug::dynamicSize("m_Params", m_Params, mem); core::memory_debug::dynamicSize("m_PersonBucketCounts", m_PersonBucketCounts, mem); core::memory_debug::dynamicSize("m_InfluenceCalculators", m_InfluenceCalculators, mem); core::memory_debug::dynamicSize("m_AppliedRuleChecksums", m_AppliedRuleChecksums, mem); } std::size_t CAnomalyDetectorModel::memoryUsage() const { std::size_t mem{core::memory::dynamicSize(m_Params)}; mem += core::memory::dynamicSize(m_DataGatherer); mem += core::memory::dynamicSize(m_PersonBucketCounts); mem += core::memory::dynamicSize(m_InfluenceCalculators); mem += core::memory::dynamicSize(m_AppliedRuleChecksums); return mem; } CAnomalyDetectorModel::TOptionalSize CAnomalyDetectorModel::estimateMemoryUsage(std::size_t numberPeople, std::size_t numberAttributes, std::size_t numberCorrelations) const { CMemoryUsageEstimator::TSizeArray predictors; predictors[CMemoryUsageEstimator::E_People] = numberPeople; predictors[CMemoryUsageEstimator::E_Attributes] = numberAttributes; predictors[CMemoryUsageEstimator::E_Correlations] = numberCorrelations; return this->memoryUsageEstimator()->estimate(predictors); } std::size_t CAnomalyDetectorModel::estimateMemoryUsageOrComputeAndUpdate(std::size_t numberPeople, std::size_t numberAttributes, std::size_t numberCorrelations) { TOptionalSize estimate{this->estimateMemoryUsage(numberPeople, numberAttributes, numberCorrelations)}; if (estimate) { return *estimate; } std::size_t computed{this->computeMemoryUsage()}; CMemoryUsageEstimator::TSizeArray predictors; predictors[CMemoryUsageEstimator::E_People] = numberPeople; predictors[CMemoryUsageEstimator::E_Attributes] = numberAttributes; predictors[CMemoryUsageEstimator::E_Correlations] = numberCorrelations; this->memoryUsageEstimator()->addValue(predictors, computed); return computed; } const CDataGatherer& CAnomalyDetectorModel::dataGatherer() const { return *m_DataGatherer; } CDataGatherer& CAnomalyDetectorModel::dataGatherer() { return *m_DataGatherer; } core_t::TTime CAnomalyDetectorModel::bucketLength() const { return m_DataGatherer->bucketLength(); } double CAnomalyDetectorModel::personFrequency(std::size_t pid) const { return m_BucketCount <= 0.0 ? 0.5 : m_PersonBucketCounts[pid] / m_BucketCount; } bool CAnomalyDetectorModel::isTimeUnset(core_t::TTime time) { return time == TIME_UNSET; } CPersonFrequencyGreaterThan CAnomalyDetectorModel::personFilter() const { return CPersonFrequencyGreaterThan(*this, m_Params.get().s_ExcludePersonFrequency); } CAttributeFrequencyGreaterThan CAnomalyDetectorModel::attributeFilter() const { return CAttributeFrequencyGreaterThan(*this, m_Params.get().s_ExcludeAttributeFrequency); } const SModelParams& CAnomalyDetectorModel::params() const { return m_Params; } double CAnomalyDetectorModel::learnRate(model_t::EFeature feature) const { return model_t::learnRate(feature, m_Params); } const CInfluenceCalculator* CAnomalyDetectorModel::influenceCalculator(model_t::EFeature feature, std::size_t iid) const { if (iid >= m_InfluenceCalculators.size()) { LOG_ERROR(<< "Influencer identifier " << iid << " out of range"); return nullptr; } const TFeatureInfluenceCalculatorCPtrPrVec& calculators{m_InfluenceCalculators[iid]}; auto result = std::lower_bound(calculators.begin(), calculators.end(), feature, maths::common::COrderings::SFirstLess()); return result != calculators.end() && result->first == feature ? result->second.get() : nullptr; } const CAnomalyDetectorModel::TDoubleVec& CAnomalyDetectorModel::personBucketCounts() const { return m_PersonBucketCounts; } CAnomalyDetectorModel::TDoubleVec& CAnomalyDetectorModel::personBucketCounts() { return m_PersonBucketCounts; } void CAnomalyDetectorModel::windowBucketCount(double windowBucketCount) { m_BucketCount = windowBucketCount; } double CAnomalyDetectorModel::windowBucketCount() const { return m_BucketCount; } void CAnomalyDetectorModel::createNewModels(std::size_t n, std::size_t /*m*/) { if (n > 0) { n += m_PersonBucketCounts.size(); core::CAllocationStrategy::resize(m_PersonBucketCounts, n, 0.0); } } void CAnomalyDetectorModel::updateRecycledModels() { TSizeVec& people{m_DataGatherer->recycledPersonIds()}; for (auto pid : people) { if (pid < m_PersonBucketCounts.size()) { m_PersonBucketCounts[pid] = 0.0; } else { LOG_ERROR(<< "Recycled person identifier '" << pid << "' out-of-range [," << m_PersonBucketCounts.size() << ")"); } } people.clear(); } bool CAnomalyDetectorModel::shouldIgnoreResult(model_t::EFeature feature, const model_t::CResultType& resultType, std::size_t pid, std::size_t cid, core_t::TTime time) const { bool shouldIgnore = checkScheduledEvents(this->params().s_ScheduledEvents.get(), std::cref(*this), feature, CDetectionRule::E_SkipResult, resultType, pid, cid, time) || checkRules(this->params().s_DetectionRules.get(), std::cref(*this), feature, CDetectionRule::E_SkipResult, resultType, pid, cid, time); return shouldIgnore; } double CAnomalyDetectorModel::initialCountWeight(model_t::EFeature feature, std::size_t pid, std::size_t cid, core_t::TTime time) const { if (checkScheduledEvents(this->params().s_ScheduledEvents.get(), *this, feature, CDetectionRule::E_SkipModelUpdate, SKIP_SAMPLING_RESULT_TYPE, pid, cid, time) == true) { return 0.0; } if (checkRules(this->params().s_DetectionRules.get(), *this, feature, CDetectionRule::E_SkipModelUpdate, SKIP_SAMPLING_RESULT_TYPE, pid, cid, time) == true) { return SKIP_SAMPLING_WEIGHT; } return 1.0; } bool CAnomalyDetectorModel::shouldSkipUpdate(model_t::EFeature feature, std::size_t pid, std::size_t cid, core_t::TTime time) const { return this->initialCountWeight(feature, pid, cid, time) != 1.0; } const CAnomalyDetectorModel::TStr1Vec& CAnomalyDetectorModel::scheduledEventDescriptions(core_t::TTime /*time*/) const { return EMPTY_STRING_LIST; } maths::common::CModel* CAnomalyDetectorModel::tinyModel() { return new maths::common::CModelStub; } const std::size_t CAnomalyDetectorModel::MAXIMUM_PERMITTED_AGE(1000000); const core_t::TTime CAnomalyDetectorModel::TIME_UNSET(-1); const std::string CAnomalyDetectorModel::EMPTY_STRING; CAnomalyDetectorModel::SFeatureModels::SFeatureModels(model_t::EFeature feature, TMathsModelSPtr newModel) : s_Feature(feature), s_NewModel(newModel) { } bool CAnomalyDetectorModel::SFeatureModels::acceptRestoreTraverser(const SModelParams& params_, core::CStateRestoreTraverser& traverser) { maths_t::EDataType dataType{s_NewModel->dataType()}; maths::common::SModelRestoreParams params{ s_NewModel->params(), params_.decompositionRestoreParams(dataType), params_.distributionRestoreParams(dataType)}; do { if (traverser.name() == MODEL_TAG) { TMathsModelUPtr model; if (!traverser.traverseSubLevel(std::bind<bool>( maths::time_series::CModelStateSerialiser(), std::cref(params), std::ref(model), std::placeholders::_1))) { return false; } s_Models.push_back(std::move(model)); } } while (traverser.next()); return true; } void CAnomalyDetectorModel::SFeatureModels::persistModelsState(core::CStatePersistInserter& inserter) const { for (const auto& model : s_Models) { model->persistModelsState(inserter); } } void CAnomalyDetectorModel::SFeatureModels::acceptPersistInserter(core::CStatePersistInserter& inserter) const { for (const auto& model : s_Models) { inserter.insertLevel( MODEL_TAG, std::bind<void>(maths::time_series::CModelStateSerialiser(), std::cref(*model), std::placeholders::_1)); } } void CAnomalyDetectorModel::SFeatureModels::debugMemoryUsage( const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("SFeatureModels"); core::memory_debug::dynamicSize("s_NewModel", s_NewModel, mem); core::memory_debug::dynamicSize("s_Models", s_Models, mem); } std::size_t CAnomalyDetectorModel::SFeatureModels::memoryUsage() const { return core::memory::dynamicSize(s_NewModel) + core::memory::dynamicSize(s_Models); } bool CAnomalyDetectorModel::SFeatureModels::shouldPersist() const { return std::any_of(s_Models.begin(), s_Models.end(), [](const auto& model) { return model->shouldPersist(); }); } CAnomalyDetectorModel::SFeatureCorrelateModels::SFeatureCorrelateModels( model_t::EFeature feature, const TMultivariatePriorSPtr& modelPrior, TCorrelationsPtr&& model) : s_Feature(feature), s_ModelPrior(modelPrior), s_Models(std::move(model)) { } CAnomalyDetectorModel::SFeatureCorrelateModels::~SFeatureCorrelateModels() = default; CAnomalyDetectorModel::SFeatureCorrelateModels::SFeatureCorrelateModels(SFeatureCorrelateModels&&) = default; CAnomalyDetectorModel::SFeatureCorrelateModels& CAnomalyDetectorModel::SFeatureCorrelateModels:: operator=(SFeatureCorrelateModels&&) = default; bool CAnomalyDetectorModel::SFeatureCorrelateModels::acceptRestoreTraverser( const SModelParams& params_, core::CStateRestoreTraverser& traverser) { maths_t::EDataType dataType{s_ModelPrior->dataType()}; maths::common::SDistributionRestoreParams params{ params_.distributionRestoreParams(dataType)}; std::size_t count{0u}; do { if (traverser.name() == MODEL_TAG) { if (!traverser.traverseSubLevel(std::bind( &maths::time_series::CTimeSeriesCorrelations::acceptRestoreTraverser, s_Models.get(), std::cref(params), std::placeholders::_1)) || count++ > 0) { return false; } } } while (traverser.next()); return true; } void CAnomalyDetectorModel::SFeatureCorrelateModels::acceptPersistInserter( core::CStatePersistInserter& inserter) const { inserter.insertLevel(MODEL_TAG, std::bind(&maths::time_series::CTimeSeriesCorrelations::acceptPersistInserter, s_Models.get(), std::placeholders::_1)); } void CAnomalyDetectorModel::SFeatureCorrelateModels::debugMemoryUsage( const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("SFeatureCorrelateModels"); core::memory_debug::dynamicSize("s_ModelPrior", s_ModelPrior, mem); core::memory_debug::dynamicSize("s_Models", s_Models, mem); } std::size_t CAnomalyDetectorModel::SFeatureCorrelateModels::memoryUsage() const { return core::memory::dynamicSize(s_ModelPrior) + core::memory::dynamicSize(s_Models); } CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::CTimeSeriesCorrelateModelAllocator( CResourceMonitor& resourceMonitor, TMemoryUsage memoryUsage, std::size_t resourceLimit, std::size_t maxNumberCorrelations) : m_ResourceMonitor(&resourceMonitor), m_MemoryUsage(memoryUsage), m_ResourceLimit(resourceLimit), m_MaxNumberCorrelations(maxNumberCorrelations) { } bool CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::areAllocationsAllowed() const { return m_ResourceMonitor->areAllocationsAllowed(); } bool CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::exceedsLimit(std::size_t correlations) const { return !m_ResourceMonitor->haveNoLimit() && m_MemoryUsage(correlations) >= m_ResourceLimit; } std::size_t CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::maxNumberCorrelations() const { return m_MaxNumberCorrelations; } std::size_t CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::chunkSize() const { return 500; } CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::TMultivariatePriorUPtr CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::newPrior() const { return TMultivariatePriorUPtr(m_PrototypePrior->clone()); } void CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::prototypePrior( const TMultivariatePriorSPtr& prior) { m_PrototypePrior = prior; } bool CMemoryCircuitBreaker::areAllocationsAllowed() const { return m_ResourceMonitor->areAllocationsAllowed(); } CAnomalyDetectorModel::TUint64TTimePrVec& CAnomalyDetectorModel::appliedRuleChecksums() { return m_AppliedRuleChecksums; } const CAnomalyDetectorModel::TUint64TTimePrVec& CAnomalyDetectorModel::appliedRuleChecksums() const { return m_AppliedRuleChecksums; } bool CAnomalyDetectorModel::checkRuleApplied(const CDetectionRule& rule) const { auto checksum = rule.checksum(); return std::find_if(m_AppliedRuleChecksums.begin(), m_AppliedRuleChecksums.end(), [checksum](const auto& pair) { return pair.first == checksum; }) != m_AppliedRuleChecksums.end(); } void CAnomalyDetectorModel::markRuleApplied(const CDetectionRule& rule) { auto currentTime = core::CTimeUtils::now(); m_AppliedRuleChecksums.emplace_back(rule.checksum(), currentTime); // Remove all rules that are older than the expiration time m_AppliedRuleChecksums.erase( std::remove_if(m_AppliedRuleChecksums.begin(), m_AppliedRuleChecksums.end(), [currentTime](const auto& pair) { return currentTime - pair.second > APPLIED_DETECTION_RULE_EXPIRATION; }), m_AppliedRuleChecksums.end()); } } }