lib/model/CAnomalyDetectorModel.cc (583 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CAnomalyDetectorModel.h>
#include <core/CAllocationStrategy.h>
#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <core/CStatePersistInserter.h>
#include <core/CStateRestoreTraverser.h>
#include <core/CTimeUtils.h>
#include <maths/common/CChecksum.h>
#include <maths/common/CMultivariatePrior.h>
#include <maths/common/COrderings.h>
#include <maths/common/CRestoreParams.h>
#include <maths/time_series/CModelStateSerialiser.h>
#include <model/CDataGatherer.h>
#include <model/CDetectionRule.h>
#include <model/CHierarchicalResults.h>
#include <model/CMemoryUsageEstimator.h>
#include <model/CPartitioningFields.h>
#include <model/CProbabilityAndInfluenceCalculator.h>
#include <model/CResourceMonitor.h>
#include <model/CSearchKey.h>
#include <model/FrequencyPredicates.h>
#include <model/ModelTypes.h>
#include <model/SModelParams.h>
#include <boost/unordered_set.hpp>
#include <algorithm>
namespace ml {
namespace model {
namespace {
const std::string MODEL_TAG{"a"};
const std::string EMPTY;
const model_t::CResultType SKIP_SAMPLING_RESULT_TYPE;
const double SKIP_SAMPLING_WEIGHT{0.005};
const core_t::TTime APPLIED_DETECTION_RULE_EXPIRATION{31536000}; // 1 year
const CAnomalyDetectorModel::TStr1Vec EMPTY_STRING_LIST;
bool checkRules(const SModelParams::TDetectionRuleVec& detectionRules,
const CAnomalyDetectorModel& model,
model_t::EFeature feature,
CDetectionRule::ERuleAction action,
const model_t::CResultType& resultType,
std::size_t pid,
std::size_t cid,
core_t::TTime time) {
bool isIgnored{false};
for (auto& rule : detectionRules) {
isIgnored = isIgnored ||
rule.apply(action, model, feature, resultType, pid, cid, time);
}
return isIgnored;
}
bool checkScheduledEvents(const SModelParams::TStrDetectionRulePrVec& scheduledEvents,
const CAnomalyDetectorModel& model,
model_t::EFeature feature,
CDetectionRule::ERuleAction action,
const model_t::CResultType& resultType,
std::size_t pid,
std::size_t cid,
core_t::TTime time) {
bool isIgnored{false};
for (auto& event : scheduledEvents) {
isIgnored = isIgnored || event.second.apply(action, model, feature,
resultType, pid, cid, time);
}
return isIgnored;
}
void callbackRules(const SModelParams::TDetectionRuleVec& detectionRules,
CAnomalyDetectorModel& model,
core_t::TTime time) {
for (const auto& rule : detectionRules) {
rule.executeCallback(model, time);
}
}
void callbackScheduledEvents(const SModelParams::TStrDetectionRulePrVec& scheduledEvents,
CAnomalyDetectorModel& model,
core_t::TTime time) {
for (const auto& scheduledEvent : scheduledEvents) {
scheduledEvent.second.executeCallback(model, time);
}
}
}
CAnomalyDetectorModel::CAnomalyDetectorModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators)
: m_Params(params), m_DataGatherer(dataGatherer), m_BucketCount(0.0),
m_InfluenceCalculators(influenceCalculators) {
if (!m_DataGatherer) {
LOG_ABORT(<< "Must provide a data gatherer");
}
for (auto& calculators : m_InfluenceCalculators) {
std::sort(calculators.begin(), calculators.end(),
maths::common::COrderings::SFirstLess());
}
}
CAnomalyDetectorModel::CAnomalyDetectorModel(bool isForPersistence,
const CAnomalyDetectorModel& other)
: // The copy of m_DataGatherer is a shallow copy. This would be unacceptable
// if we were going to persist the data gatherer from within this class.
// We don't, so that's OK, but the next issue is that another thread will be
// modifying the data gatherer m_DataGatherer points to whilst this object
// is being persisted. Therefore, persistence must only call methods on the
// data gatherer that are invariant.
m_Params(other.m_Params), m_DataGatherer(other.m_DataGatherer),
m_PersonBucketCounts(other.m_PersonBucketCounts),
m_BucketCount(other.m_BucketCount) {
if (!isForPersistence) {
LOG_ABORT(<< "This constructor only creates clones for persistence");
}
}
std::string CAnomalyDetectorModel::description() const {
return m_DataGatherer->description();
}
const std::string& CAnomalyDetectorModel::personName(std::size_t pid) const {
return m_DataGatherer->personName(pid, core::CStringUtils::typeToString(pid));
}
const std::string& CAnomalyDetectorModel::personName(std::size_t pid,
const std::string& fallback) const {
return m_DataGatherer->personName(pid, fallback);
}
std::string CAnomalyDetectorModel::printPeople(const TSizeVec& pids, std::size_t limit) const {
if (pids.empty()) {
return std::string();
}
if (limit == 0) {
return core::CStringUtils::typeToString(pids.size()) + " in total";
}
std::string result{this->personName(pids[0])};
for (std::size_t i = 1; i < std::min(limit, pids.size()); ++i) {
result += ' ';
result += this->personName(pids[i]);
}
if (limit < pids.size()) {
result += " and ";
result += core::CStringUtils::typeToString(pids.size() - limit);
result += " others";
}
return result;
}
std::size_t CAnomalyDetectorModel::numberOfPeople() const {
return m_DataGatherer->numberActivePeople();
}
const std::string& CAnomalyDetectorModel::attributeName(std::size_t cid) const {
return m_DataGatherer->attributeName(cid, core::CStringUtils::typeToString(cid));
}
const std::string& CAnomalyDetectorModel::attributeName(std::size_t cid,
const std::string& fallback) const {
return m_DataGatherer->attributeName(cid, fallback);
}
std::string CAnomalyDetectorModel::printAttributes(const TSizeVec& cids,
std::size_t limit) const {
if (cids.empty()) {
return std::string();
}
if (limit == 0) {
return core::CStringUtils::typeToString(cids.size()) + " in total";
}
std::string result{this->attributeName(cids[0])};
for (std::size_t i = 1; i < std::min(limit, cids.size()); ++i) {
result += ' ';
result += this->attributeName(cids[i]);
}
if (limit < cids.size()) {
result += " and ";
result += core::CStringUtils::typeToString(cids.size() - limit);
result += " others";
}
return result;
}
void CAnomalyDetectorModel::sample(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& /*resourceMonitor*/) {
using TSizeUSet = boost::unordered_set<std::size_t>;
callbackScheduledEvents(this->params().s_ScheduledEvents.get(), std::ref(*this), startTime);
callbackRules(this->params().s_DetectionRules.get(), std::ref(*this), startTime);
const CDataGatherer& gatherer{this->dataGatherer()};
core_t::TTime bucketLength{this->bucketLength()};
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
const auto& counts = gatherer.bucketCounts(time);
TSizeUSet uniquePeople;
for (const auto& count : counts) {
std::size_t pid{CDataGatherer::extractPersonId(count)};
if (uniquePeople.insert(pid).second) {
m_PersonBucketCounts[pid] += 1.0;
}
}
m_BucketCount += 1.0;
double alpha{std::exp(-this->params().s_DecayRate)};
for (std::size_t pid = 0; pid < m_PersonBucketCounts.size(); ++pid) {
m_PersonBucketCounts[pid] *= alpha;
}
m_BucketCount *= alpha;
}
}
void CAnomalyDetectorModel::skipSampling(core_t::TTime endTime) {
CDataGatherer& gatherer{this->dataGatherer()};
core_t::TTime startTime{gatherer.earliestBucketStartTime()};
if (!gatherer.validateSampleTimes(startTime, endTime)) {
return;
}
gatherer.skipSampleNow(endTime);
this->doSkipSampling(startTime, endTime);
this->currentBucketStartTime(endTime - gatherer.bucketLength());
}
bool CAnomalyDetectorModel::addResults(core_t::TTime startTime,
core_t::TTime endTime,
std::size_t numberAttributeProbabilities,
CHierarchicalResults& results) const {
TSizeVec personIds;
if (!this->bucketStatsAvailable(startTime)) {
LOG_TRACE(<< "No stats available for time " << startTime);
return false;
}
this->currentBucketPersonIds(startTime, personIds);
LOG_TRACE(<< "Outputting results for " << personIds.size() << " people");
CPartitioningFields partitioningFields(m_DataGatherer->partitionFieldName(),
m_DataGatherer->partitionFieldValue());
partitioningFields.add(m_DataGatherer->personFieldName(), EMPTY);
for (auto pid : personIds) {
if (this->category() == model_t::E_Counting) {
SAnnotatedProbability annotatedProbability;
this->computeProbability(pid, startTime, endTime, partitioningFields,
numberAttributeProbabilities, annotatedProbability);
results.addSimpleCountResult(annotatedProbability, this, startTime);
} else {
LOG_TRACE(<< "AddResult, for time [" << startTime << "," << endTime << ")");
partitioningFields.back().second = std::cref(this->personName(pid));
std::for_each(m_DataGatherer->beginInfluencers(),
m_DataGatherer->endInfluencers(),
[&results](const std::string& influencer) {
results.addInfluencer(influencer);
});
SAnnotatedProbability annotatedProbability;
annotatedProbability.s_ResultType = results.resultType();
if (this->computeProbability(pid, startTime, endTime, partitioningFields,
numberAttributeProbabilities, annotatedProbability)) {
function_t::EFunction function{m_DataGatherer->function()};
results.addModelResult(
m_DataGatherer->searchKey().detectorIndex(),
this->isPopulation(), function_t::name(function), function,
m_DataGatherer->partitionFieldName(),
m_DataGatherer->partitionFieldValue(),
m_DataGatherer->personFieldName(), this->personName(pid),
m_DataGatherer->valueFieldName(), annotatedProbability, this, startTime);
}
}
}
return true;
}
std::size_t CAnomalyDetectorModel::defaultPruneWindow() const {
// The longest we'll consider keeping priors for is 1M buckets.
double decayRate{this->params().s_DecayRate};
double factor{this->params().s_PruneWindowScaleMaximum};
return (decayRate == 0.0)
? MAXIMUM_PERMITTED_AGE
: std::min(static_cast<std::size_t>(factor / decayRate), MAXIMUM_PERMITTED_AGE);
}
std::size_t CAnomalyDetectorModel::minimumPruneWindow() const {
double decayRate{this->params().s_DecayRate};
double factor{this->params().s_PruneWindowScaleMinimum};
return (decayRate == 0.0)
? MAXIMUM_PERMITTED_AGE
: std::min(static_cast<std::size_t>(factor / decayRate), MAXIMUM_PERMITTED_AGE);
}
void CAnomalyDetectorModel::prune() {
this->prune(this->defaultPruneWindow());
}
std::uint64_t CAnomalyDetectorModel::checksum(bool /*includeCurrentBucketStats*/) const {
using TStrCRefUInt64Map =
std::map<TStrCRef, std::uint64_t, maths::common::COrderings::SLess>;
std::uint64_t seed{m_DataGatherer->checksum()};
seed = maths::common::CChecksum::calculate(seed, m_Params);
seed = maths::common::CChecksum::calculate(seed, m_BucketCount);
TStrCRefUInt64Map hashes;
for (std::size_t pid = 0; pid < m_PersonBucketCounts.size(); ++pid) {
if (m_DataGatherer->isPersonActive(pid)) {
std::uint64_t& hash{hashes[std::cref(m_DataGatherer->personName(pid))]};
hash = maths::common::CChecksum::calculate(hash, m_PersonBucketCounts[pid]);
}
}
seed = maths::common::CChecksum::calculate(seed, m_AppliedRuleChecksums);
LOG_TRACE(<< "seed = " << seed);
LOG_TRACE(<< "checksums = " << hashes);
return maths::common::CChecksum::calculate(seed, hashes);
}
void CAnomalyDetectorModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CAnomalyDetectorModel");
core::memory_debug::dynamicSize("m_DataGatherer", m_DataGatherer, mem);
core::memory_debug::dynamicSize("m_Params", m_Params, mem);
core::memory_debug::dynamicSize("m_PersonBucketCounts", m_PersonBucketCounts, mem);
core::memory_debug::dynamicSize("m_InfluenceCalculators", m_InfluenceCalculators, mem);
core::memory_debug::dynamicSize("m_AppliedRuleChecksums", m_AppliedRuleChecksums, mem);
}
std::size_t CAnomalyDetectorModel::memoryUsage() const {
std::size_t mem{core::memory::dynamicSize(m_Params)};
mem += core::memory::dynamicSize(m_DataGatherer);
mem += core::memory::dynamicSize(m_PersonBucketCounts);
mem += core::memory::dynamicSize(m_InfluenceCalculators);
mem += core::memory::dynamicSize(m_AppliedRuleChecksums);
return mem;
}
CAnomalyDetectorModel::TOptionalSize
CAnomalyDetectorModel::estimateMemoryUsage(std::size_t numberPeople,
std::size_t numberAttributes,
std::size_t numberCorrelations) const {
CMemoryUsageEstimator::TSizeArray predictors;
predictors[CMemoryUsageEstimator::E_People] = numberPeople;
predictors[CMemoryUsageEstimator::E_Attributes] = numberAttributes;
predictors[CMemoryUsageEstimator::E_Correlations] = numberCorrelations;
return this->memoryUsageEstimator()->estimate(predictors);
}
std::size_t
CAnomalyDetectorModel::estimateMemoryUsageOrComputeAndUpdate(std::size_t numberPeople,
std::size_t numberAttributes,
std::size_t numberCorrelations) {
TOptionalSize estimate{this->estimateMemoryUsage(numberPeople, numberAttributes,
numberCorrelations)};
if (estimate) {
return *estimate;
}
std::size_t computed{this->computeMemoryUsage()};
CMemoryUsageEstimator::TSizeArray predictors;
predictors[CMemoryUsageEstimator::E_People] = numberPeople;
predictors[CMemoryUsageEstimator::E_Attributes] = numberAttributes;
predictors[CMemoryUsageEstimator::E_Correlations] = numberCorrelations;
this->memoryUsageEstimator()->addValue(predictors, computed);
return computed;
}
const CDataGatherer& CAnomalyDetectorModel::dataGatherer() const {
return *m_DataGatherer;
}
CDataGatherer& CAnomalyDetectorModel::dataGatherer() {
return *m_DataGatherer;
}
core_t::TTime CAnomalyDetectorModel::bucketLength() const {
return m_DataGatherer->bucketLength();
}
double CAnomalyDetectorModel::personFrequency(std::size_t pid) const {
return m_BucketCount <= 0.0 ? 0.5 : m_PersonBucketCounts[pid] / m_BucketCount;
}
bool CAnomalyDetectorModel::isTimeUnset(core_t::TTime time) {
return time == TIME_UNSET;
}
CPersonFrequencyGreaterThan CAnomalyDetectorModel::personFilter() const {
return CPersonFrequencyGreaterThan(*this, m_Params.get().s_ExcludePersonFrequency);
}
CAttributeFrequencyGreaterThan CAnomalyDetectorModel::attributeFilter() const {
return CAttributeFrequencyGreaterThan(*this, m_Params.get().s_ExcludeAttributeFrequency);
}
const SModelParams& CAnomalyDetectorModel::params() const {
return m_Params;
}
double CAnomalyDetectorModel::learnRate(model_t::EFeature feature) const {
return model_t::learnRate(feature, m_Params);
}
const CInfluenceCalculator*
CAnomalyDetectorModel::influenceCalculator(model_t::EFeature feature, std::size_t iid) const {
if (iid >= m_InfluenceCalculators.size()) {
LOG_ERROR(<< "Influencer identifier " << iid << " out of range");
return nullptr;
}
const TFeatureInfluenceCalculatorCPtrPrVec& calculators{m_InfluenceCalculators[iid]};
auto result = std::lower_bound(calculators.begin(), calculators.end(), feature,
maths::common::COrderings::SFirstLess());
return result != calculators.end() && result->first == feature
? result->second.get()
: nullptr;
}
const CAnomalyDetectorModel::TDoubleVec& CAnomalyDetectorModel::personBucketCounts() const {
return m_PersonBucketCounts;
}
CAnomalyDetectorModel::TDoubleVec& CAnomalyDetectorModel::personBucketCounts() {
return m_PersonBucketCounts;
}
void CAnomalyDetectorModel::windowBucketCount(double windowBucketCount) {
m_BucketCount = windowBucketCount;
}
double CAnomalyDetectorModel::windowBucketCount() const {
return m_BucketCount;
}
void CAnomalyDetectorModel::createNewModels(std::size_t n, std::size_t /*m*/) {
if (n > 0) {
n += m_PersonBucketCounts.size();
core::CAllocationStrategy::resize(m_PersonBucketCounts, n, 0.0);
}
}
void CAnomalyDetectorModel::updateRecycledModels() {
TSizeVec& people{m_DataGatherer->recycledPersonIds()};
for (auto pid : people) {
if (pid < m_PersonBucketCounts.size()) {
m_PersonBucketCounts[pid] = 0.0;
} else {
LOG_ERROR(<< "Recycled person identifier '" << pid << "' out-of-range [,"
<< m_PersonBucketCounts.size() << ")");
}
}
people.clear();
}
bool CAnomalyDetectorModel::shouldIgnoreResult(model_t::EFeature feature,
const model_t::CResultType& resultType,
std::size_t pid,
std::size_t cid,
core_t::TTime time) const {
bool shouldIgnore =
checkScheduledEvents(this->params().s_ScheduledEvents.get(),
std::cref(*this), feature, CDetectionRule::E_SkipResult,
resultType, pid, cid, time) ||
checkRules(this->params().s_DetectionRules.get(), std::cref(*this), feature,
CDetectionRule::E_SkipResult, resultType, pid, cid, time);
return shouldIgnore;
}
double CAnomalyDetectorModel::initialCountWeight(model_t::EFeature feature,
std::size_t pid,
std::size_t cid,
core_t::TTime time) const {
if (checkScheduledEvents(this->params().s_ScheduledEvents.get(), *this,
feature, CDetectionRule::E_SkipModelUpdate,
SKIP_SAMPLING_RESULT_TYPE, pid, cid, time) == true) {
return 0.0;
}
if (checkRules(this->params().s_DetectionRules.get(), *this, feature,
CDetectionRule::E_SkipModelUpdate, SKIP_SAMPLING_RESULT_TYPE,
pid, cid, time) == true) {
return SKIP_SAMPLING_WEIGHT;
}
return 1.0;
}
bool CAnomalyDetectorModel::shouldSkipUpdate(model_t::EFeature feature,
std::size_t pid,
std::size_t cid,
core_t::TTime time) const {
return this->initialCountWeight(feature, pid, cid, time) != 1.0;
}
const CAnomalyDetectorModel::TStr1Vec&
CAnomalyDetectorModel::scheduledEventDescriptions(core_t::TTime /*time*/) const {
return EMPTY_STRING_LIST;
}
maths::common::CModel* CAnomalyDetectorModel::tinyModel() {
return new maths::common::CModelStub;
}
const std::size_t CAnomalyDetectorModel::MAXIMUM_PERMITTED_AGE(1000000);
const core_t::TTime CAnomalyDetectorModel::TIME_UNSET(-1);
const std::string CAnomalyDetectorModel::EMPTY_STRING;
CAnomalyDetectorModel::SFeatureModels::SFeatureModels(model_t::EFeature feature,
TMathsModelSPtr newModel)
: s_Feature(feature), s_NewModel(newModel) {
}
bool CAnomalyDetectorModel::SFeatureModels::acceptRestoreTraverser(const SModelParams& params_,
core::CStateRestoreTraverser& traverser) {
maths_t::EDataType dataType{s_NewModel->dataType()};
maths::common::SModelRestoreParams params{
s_NewModel->params(), params_.decompositionRestoreParams(dataType),
params_.distributionRestoreParams(dataType)};
do {
if (traverser.name() == MODEL_TAG) {
TMathsModelUPtr model;
if (!traverser.traverseSubLevel(std::bind<bool>(
maths::time_series::CModelStateSerialiser(),
std::cref(params), std::ref(model), std::placeholders::_1))) {
return false;
}
s_Models.push_back(std::move(model));
}
} while (traverser.next());
return true;
}
void CAnomalyDetectorModel::SFeatureModels::persistModelsState(core::CStatePersistInserter& inserter) const {
for (const auto& model : s_Models) {
model->persistModelsState(inserter);
}
}
void CAnomalyDetectorModel::SFeatureModels::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
for (const auto& model : s_Models) {
inserter.insertLevel(
MODEL_TAG, std::bind<void>(maths::time_series::CModelStateSerialiser(),
std::cref(*model), std::placeholders::_1));
}
}
void CAnomalyDetectorModel::SFeatureModels::debugMemoryUsage(
const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("SFeatureModels");
core::memory_debug::dynamicSize("s_NewModel", s_NewModel, mem);
core::memory_debug::dynamicSize("s_Models", s_Models, mem);
}
std::size_t CAnomalyDetectorModel::SFeatureModels::memoryUsage() const {
return core::memory::dynamicSize(s_NewModel) + core::memory::dynamicSize(s_Models);
}
bool CAnomalyDetectorModel::SFeatureModels::shouldPersist() const {
return std::any_of(s_Models.begin(), s_Models.end(),
[](const auto& model) { return model->shouldPersist(); });
}
CAnomalyDetectorModel::SFeatureCorrelateModels::SFeatureCorrelateModels(
model_t::EFeature feature,
const TMultivariatePriorSPtr& modelPrior,
TCorrelationsPtr&& model)
: s_Feature(feature), s_ModelPrior(modelPrior), s_Models(std::move(model)) {
}
CAnomalyDetectorModel::SFeatureCorrelateModels::~SFeatureCorrelateModels() = default;
CAnomalyDetectorModel::SFeatureCorrelateModels::SFeatureCorrelateModels(SFeatureCorrelateModels&&) = default;
CAnomalyDetectorModel::SFeatureCorrelateModels& CAnomalyDetectorModel::SFeatureCorrelateModels::
operator=(SFeatureCorrelateModels&&) = default;
bool CAnomalyDetectorModel::SFeatureCorrelateModels::acceptRestoreTraverser(
const SModelParams& params_,
core::CStateRestoreTraverser& traverser) {
maths_t::EDataType dataType{s_ModelPrior->dataType()};
maths::common::SDistributionRestoreParams params{
params_.distributionRestoreParams(dataType)};
std::size_t count{0u};
do {
if (traverser.name() == MODEL_TAG) {
if (!traverser.traverseSubLevel(std::bind(
&maths::time_series::CTimeSeriesCorrelations::acceptRestoreTraverser,
s_Models.get(), std::cref(params), std::placeholders::_1)) ||
count++ > 0) {
return false;
}
}
} while (traverser.next());
return true;
}
void CAnomalyDetectorModel::SFeatureCorrelateModels::acceptPersistInserter(
core::CStatePersistInserter& inserter) const {
inserter.insertLevel(MODEL_TAG,
std::bind(&maths::time_series::CTimeSeriesCorrelations::acceptPersistInserter,
s_Models.get(), std::placeholders::_1));
}
void CAnomalyDetectorModel::SFeatureCorrelateModels::debugMemoryUsage(
const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("SFeatureCorrelateModels");
core::memory_debug::dynamicSize("s_ModelPrior", s_ModelPrior, mem);
core::memory_debug::dynamicSize("s_Models", s_Models, mem);
}
std::size_t CAnomalyDetectorModel::SFeatureCorrelateModels::memoryUsage() const {
return core::memory::dynamicSize(s_ModelPrior) + core::memory::dynamicSize(s_Models);
}
CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::CTimeSeriesCorrelateModelAllocator(
CResourceMonitor& resourceMonitor,
TMemoryUsage memoryUsage,
std::size_t resourceLimit,
std::size_t maxNumberCorrelations)
: m_ResourceMonitor(&resourceMonitor), m_MemoryUsage(memoryUsage),
m_ResourceLimit(resourceLimit), m_MaxNumberCorrelations(maxNumberCorrelations) {
}
bool CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::areAllocationsAllowed() const {
return m_ResourceMonitor->areAllocationsAllowed();
}
bool CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::exceedsLimit(std::size_t correlations) const {
return !m_ResourceMonitor->haveNoLimit() && m_MemoryUsage(correlations) >= m_ResourceLimit;
}
std::size_t CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::maxNumberCorrelations() const {
return m_MaxNumberCorrelations;
}
std::size_t CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::chunkSize() const {
return 500;
}
CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::TMultivariatePriorUPtr
CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::newPrior() const {
return TMultivariatePriorUPtr(m_PrototypePrior->clone());
}
void CAnomalyDetectorModel::CTimeSeriesCorrelateModelAllocator::prototypePrior(
const TMultivariatePriorSPtr& prior) {
m_PrototypePrior = prior;
}
bool CMemoryCircuitBreaker::areAllocationsAllowed() const {
return m_ResourceMonitor->areAllocationsAllowed();
}
CAnomalyDetectorModel::TUint64TTimePrVec& CAnomalyDetectorModel::appliedRuleChecksums() {
return m_AppliedRuleChecksums;
}
const CAnomalyDetectorModel::TUint64TTimePrVec&
CAnomalyDetectorModel::appliedRuleChecksums() const {
return m_AppliedRuleChecksums;
}
bool CAnomalyDetectorModel::checkRuleApplied(const CDetectionRule& rule) const {
auto checksum = rule.checksum();
return std::find_if(m_AppliedRuleChecksums.begin(),
m_AppliedRuleChecksums.end(), [checksum](const auto& pair) {
return pair.first == checksum;
}) != m_AppliedRuleChecksums.end();
}
void CAnomalyDetectorModel::markRuleApplied(const CDetectionRule& rule) {
auto currentTime = core::CTimeUtils::now();
m_AppliedRuleChecksums.emplace_back(rule.checksum(), currentTime);
// Remove all rules that are older than the expiration time
m_AppliedRuleChecksums.erase(
std::remove_if(m_AppliedRuleChecksums.begin(), m_AppliedRuleChecksums.end(),
[currentTime](const auto& pair) {
return currentTime - pair.second > APPLIED_DETECTION_RULE_EXPIRATION;
}),
m_AppliedRuleChecksums.end());
}
}
}