lib/model/CIndividualModel.cc (559 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CIndividualModel.h>
#include <core/CAllocationStrategy.h>
#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <core/CProgramCounters.h>
#include <core/Constants.h>
#include <core/RestoreMacros.h>
#include <maths/common/CChecksum.h>
#include <maths/common/CMultivariatePrior.h>
#include <maths/common/COrderings.h>
#include <model/CAnnotatedProbability.h>
#include <model/CDataGatherer.h>
#include <model/CResourceMonitor.h>
#include <model/FrequencyPredicates.h>
#include <algorithm>
#include <map>
namespace ml {
namespace model {
namespace {
using TDouble2Vec = core::CSmallVector<double, 2>;
using TStrCRef = std::reference_wrapper<const std::string>;
using TStrCRefUInt64Map = std::map<TStrCRef, std::uint64_t, maths::common::COrderings::SLess>;
using TStrCRefStrCRefPr = std::pair<TStrCRef, TStrCRef>;
using TStrCRefStrCRefPrUInt64Map =
std::map<TStrCRefStrCRefPr, std::uint64_t, maths::common::COrderings::SLess>;
//! Update \p hashes with the hashes of the active people in \p values.
template<typename T>
void hashActive(const CDataGatherer& gatherer,
const std::vector<T>& values,
TStrCRefUInt64Map& hashes) {
for (std::size_t pid = 0; pid < values.size(); ++pid) {
if (gatherer.isPersonActive(pid)) {
std::uint64_t& hash = hashes[std::cref(gatherer.personName(pid))];
hash = maths::common::CChecksum::calculate(hash, values[pid]);
}
}
}
const std::size_t CHUNK_SIZE = 500;
// We use short field names to reduce the state size
const std::string WINDOW_BUCKET_COUNT_TAG("a");
const std::string PERSON_BUCKET_COUNT_TAG("b");
const std::string FIRST_BUCKET_TIME_TAG("c");
const std::string LAST_BUCKET_TIME_TAG("d");
const std::string FEATURE_MODELS_TAG("e");
const std::string FEATURE_CORRELATE_MODELS_TAG("f");
//const std::string INTERIM_BUCKET_CORRECTOR_TAG("h");
const std::string MEMORY_ESTIMATOR_TAG("i");
const std::string UPGRADING_PRE_7_5_STATE("j");
const std::string APPLIED_DETECTION_RULE_CHECKSUMS_TAG("k");
}
CIndividualModel::CIndividualModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TFeatureMathsModelSPtrPrVec& newFeatureModels,
const TFeatureMultivariatePriorSPtrPrVec& newFeatureCorrelateModelPriors,
TFeatureCorrelationsPtrPrVec&& featureCorrelatesModels,
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators)
: CAnomalyDetectorModel(params, dataGatherer, influenceCalculators) {
m_FeatureModels.reserve(newFeatureModels.size());
for (const auto& model : newFeatureModels) {
m_FeatureModels.emplace_back(model.first, model.second);
}
std::sort(m_FeatureModels.begin(), m_FeatureModels.end(),
[](const SFeatureModels& lhs, const SFeatureModels& rhs) {
return lhs.s_Feature < rhs.s_Feature;
});
if (this->params().s_MultivariateByFields) {
m_FeatureCorrelatesModels.reserve(featureCorrelatesModels.size());
for (std::size_t i = 0; i < featureCorrelatesModels.size(); ++i) {
m_FeatureCorrelatesModels.emplace_back(
featureCorrelatesModels[i].first,
newFeatureCorrelateModelPriors[i].second,
std::move(featureCorrelatesModels[i].second));
}
std::sort(m_FeatureCorrelatesModels.begin(), m_FeatureCorrelatesModels.end(),
[](const SFeatureCorrelateModels& lhs, const SFeatureCorrelateModels& rhs) {
return lhs.s_Feature < rhs.s_Feature;
});
}
}
CIndividualModel::CIndividualModel(bool isForPersistence, const CIndividualModel& other)
: CAnomalyDetectorModel(isForPersistence, other),
m_FirstBucketTimes(other.m_FirstBucketTimes),
m_LastBucketTimes(other.m_LastBucketTimes),
m_MemoryEstimator(other.m_MemoryEstimator) {
if (!isForPersistence) {
LOG_ABORT(<< "This constructor only creates clones for persistence");
}
m_FeatureModels.reserve(m_FeatureModels.size());
for (const auto& feature : other.m_FeatureModels) {
m_FeatureModels.emplace_back(feature.s_Feature, feature.s_NewModel);
m_FeatureModels.back().s_Models.reserve(feature.s_Models.size());
for (const auto& model : feature.s_Models) {
m_FeatureModels.back().s_Models.emplace_back(model->cloneForPersistence());
}
}
m_FeatureCorrelatesModels.reserve(other.m_FeatureCorrelatesModels.size());
for (const auto& feature : other.m_FeatureCorrelatesModels) {
m_FeatureCorrelatesModels.emplace_back(
feature.s_Feature, feature.s_ModelPrior,
TCorrelationsPtr(feature.s_Models->cloneForPersistence()));
}
}
bool CIndividualModel::isPopulation() const {
return false;
}
CIndividualModel::TOptionalUInt64
CIndividualModel::currentBucketCount(std::size_t pid, core_t::TTime time) const {
if (!this->bucketStatsAvailable(time)) {
LOG_WARN(<< "No statistics at " << time << " for " << this->description()
<< ", current bucket = " << this->printCurrentBucket()
<< ", partitionFieldValue = " << this->dataGatherer().partitionFieldValue()
<< ", personName = " << this->dataGatherer().personName(pid));
return TOptionalUInt64();
}
auto result = std::lower_bound(this->currentBucketPersonCounts().begin(),
this->currentBucketPersonCounts().end(), pid,
maths::common::COrderings::SFirstLess());
return result != this->currentBucketPersonCounts().end() && result->first == pid
? result->second
: static_cast<std::uint64_t>(0);
}
bool CIndividualModel::bucketStatsAvailable(core_t::TTime time) const {
return time >= this->currentBucketStartTime() &&
time < this->currentBucketStartTime() + this->bucketLength();
}
void CIndividualModel::sampleBucketStatistics(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& /*resourceMonitor*/) {
CDataGatherer& gatherer = this->dataGatherer();
if (!gatherer.dataAvailable(startTime)) {
return;
}
for (core_t::TTime time = startTime, bucketLength = gatherer.bucketLength();
time < endTime; time += bucketLength) {
// Currently, we only remember one bucket.
this->currentBucketStartTime(time);
TSizeUInt64PrVec& personCounts = this->currentBucketPersonCounts();
gatherer.personNonZeroCounts(time, personCounts);
this->applyFilter(model_t::E_XF_By, false, this->personFilter(), personCounts);
}
}
void CIndividualModel::sample(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) {
const CDataGatherer& gatherer = this->dataGatherer();
for (core_t::TTime time = startTime, bucketLength = gatherer.bucketLength();
time < endTime; time += bucketLength) {
this->CAnomalyDetectorModel::sample(time, time + bucketLength, resourceMonitor);
this->currentBucketStartTime(time);
TSizeUInt64PrVec& personCounts = this->currentBucketPersonCounts();
gatherer.personNonZeroCounts(time, personCounts);
for (const auto& count : personCounts) {
std::size_t pid = count.first;
if (CAnomalyDetectorModel::isTimeUnset(m_FirstBucketTimes[pid])) {
m_FirstBucketTimes[pid] = time;
}
m_LastBucketTimes[pid] = time;
}
this->applyFilter(model_t::E_XF_By, true, this->personFilter(), personCounts);
}
}
void CIndividualModel::prune(std::size_t maximumAge) {
core_t::TTime time = this->currentBucketStartTime();
if (time <= 0) {
return;
}
CDataGatherer& gatherer = this->dataGatherer();
TSizeVec peopleToRemove;
for (std::size_t pid = 0; pid < m_LastBucketTimes.size(); ++pid) {
if (gatherer.isPersonActive(pid) &&
!CAnomalyDetectorModel::isTimeUnset(m_LastBucketTimes[pid])) {
std::size_t bucketsSinceLastEvent = static_cast<std::size_t>(
(time - m_LastBucketTimes[pid]) / gatherer.bucketLength());
if (bucketsSinceLastEvent > maximumAge) {
LOG_TRACE(<< gatherer.personName(pid) << ", bucketsSinceLastEvent = " << bucketsSinceLastEvent
<< ", maximumAge = " << maximumAge);
peopleToRemove.push_back(pid);
}
}
}
if (peopleToRemove.empty()) {
return;
}
std::sort(peopleToRemove.begin(), peopleToRemove.end());
LOG_DEBUG(<< "Removing people {" << this->printPeople(peopleToRemove, 20) << '}');
// We clear large state objects from removed people's model
// and reinitialize it when they are recycled.
this->clearPrunedResources(peopleToRemove, TSizeVec());
}
bool CIndividualModel::computeTotalProbability(const std::string& /*person*/,
std::size_t /*numberAttributeProbabilities*/,
TOptionalDouble& probability,
TAttributeProbability1Vec& attributeProbabilities) const {
probability = TOptionalDouble();
attributeProbabilities.clear();
return true;
}
std::uint64_t CIndividualModel::checksum(bool includeCurrentBucketStats) const {
std::uint64_t seed = this->CAnomalyDetectorModel::checksum(includeCurrentBucketStats);
TStrCRefUInt64Map hashes1;
const CDataGatherer& gatherer = this->dataGatherer();
hashActive(gatherer, m_FirstBucketTimes, hashes1);
hashActive(gatherer, m_LastBucketTimes, hashes1);
for (const auto& feature : m_FeatureModels) {
hashActive(gatherer, feature.s_Models, hashes1);
}
TStrCRefStrCRefPrUInt64Map hashes2;
for (const auto& feature : m_FeatureCorrelatesModels) {
for (const auto& model : feature.s_Models->correlationModels()) {
std::size_t pids[]{model.first.first, model.first.second};
if (gatherer.isPersonActive(pids[0]) && gatherer.isPersonActive(pids[1])) {
std::uint64_t& hash =
hashes2[{std::cref(this->personName(pids[0])), std::cref(this->personName(pids[1]))}];
hash = maths::common::CChecksum::calculate(hash, model.second);
}
}
}
if (includeCurrentBucketStats) {
seed = maths::common::CChecksum::calculate(seed, this->currentBucketStartTime());
const TSizeUInt64PrVec& personCounts = this->currentBucketPersonCounts();
for (const auto& count : personCounts) {
std::uint64_t& hash = hashes1[std::cref(this->personName(count.first))];
hash = maths::common::CChecksum::calculate(hash, count.second);
}
}
LOG_TRACE(<< "seed = " << seed);
LOG_TRACE(<< "hashes1 = " << hashes1);
LOG_TRACE(<< "hashes2 = " << hashes2);
seed = maths::common::CChecksum::calculate(seed, hashes1);
return maths::common::CChecksum::calculate(seed, hashes2);
}
void CIndividualModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CIndividualModel");
this->CAnomalyDetectorModel::debugMemoryUsage(mem->addChild());
core::memory_debug::dynamicSize("m_FirstBucketTimes", m_FirstBucketTimes, mem);
core::memory_debug::dynamicSize("m_LastBucketTimes", m_LastBucketTimes, mem);
core::memory_debug::dynamicSize("m_FeatureModels", m_FeatureModels, mem);
core::memory_debug::dynamicSize("m_FeatureCorrelatesModels",
m_FeatureCorrelatesModels, mem);
core::memory_debug::dynamicSize("m_MemoryEstimator", m_MemoryEstimator, mem);
}
std::size_t CIndividualModel::memoryUsage() const {
const CDataGatherer& gatherer = this->dataGatherer();
TOptionalSize estimate = this->estimateMemoryUsage(
gatherer.numberActivePeople(), gatherer.numberActiveAttributes(),
this->numberCorrelations());
return estimate ? *estimate : this->computeMemoryUsage();
}
std::size_t CIndividualModel::computeMemoryUsage() const {
std::size_t mem = this->CAnomalyDetectorModel::memoryUsage();
mem += core::memory::dynamicSize(m_FirstBucketTimes);
mem += core::memory::dynamicSize(m_LastBucketTimes);
mem += core::memory::dynamicSize(m_FeatureModels);
mem += core::memory::dynamicSize(m_FeatureCorrelatesModels);
mem += core::memory::dynamicSize(m_MemoryEstimator);
return mem;
}
CMemoryUsageEstimator* CIndividualModel::memoryUsageEstimator() const {
return &m_MemoryEstimator;
}
std::size_t CIndividualModel::staticSize() const {
return sizeof(*this);
}
const CIndividualModel::TTimeVec& CIndividualModel::firstBucketTimes() const {
return m_FirstBucketTimes;
}
const CIndividualModel::TTimeVec& CIndividualModel::lastBucketTimes() const {
return m_LastBucketTimes;
}
void CIndividualModel::doPersistModelsState(core::CStatePersistInserter& inserter) const {
for (const auto& feature : m_FeatureModels) {
feature.persistModelsState(inserter);
}
}
bool CIndividualModel::shouldPersist() const {
return std::any_of(m_FeatureModels.begin(), m_FeatureModels.end(),
[](const auto& model) { return model.shouldPersist(); });
}
void CIndividualModel::doAcceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(WINDOW_BUCKET_COUNT_TAG, this->windowBucketCount(),
core::CIEEE754::E_SinglePrecision);
core::CPersistUtils::persist(PERSON_BUCKET_COUNT_TAG,
this->personBucketCounts(), inserter);
core::CPersistUtils::persist(FIRST_BUCKET_TIME_TAG, m_FirstBucketTimes, inserter);
core::CPersistUtils::persist(LAST_BUCKET_TIME_TAG, m_LastBucketTimes, inserter);
for (const auto& feature : m_FeatureModels) {
inserter.insertLevel(FEATURE_MODELS_TAG,
std::bind(&SFeatureModels::acceptPersistInserter,
&feature, std::placeholders::_1));
}
for (const auto& feature : m_FeatureCorrelatesModels) {
inserter.insertLevel(FEATURE_CORRELATE_MODELS_TAG,
std::bind(&SFeatureCorrelateModels::acceptPersistInserter,
&feature, std::placeholders::_1));
}
core::CPersistUtils::persist(MEMORY_ESTIMATOR_TAG, m_MemoryEstimator, inserter);
inserter.insertValue(UPGRADING_PRE_7_5_STATE, false);
core::CPersistUtils::persist(APPLIED_DETECTION_RULE_CHECKSUMS_TAG,
this->appliedRuleChecksums(), inserter);
}
bool CIndividualModel::doAcceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
std::size_t i{0}, j{0};
bool upgradingPre7p5State{true};
do {
const std::string& name = traverser.name();
RESTORE_SETUP_TEARDOWN(WINDOW_BUCKET_COUNT_TAG, double count,
core::CStringUtils::stringToType(traverser.value(), count),
this->windowBucketCount(count))
RESTORE(PERSON_BUCKET_COUNT_TAG,
core::CPersistUtils::restore(name, this->personBucketCounts(), traverser))
RESTORE(FIRST_BUCKET_TIME_TAG,
core::CPersistUtils::restore(name, m_FirstBucketTimes, traverser))
RESTORE(LAST_BUCKET_TIME_TAG,
core::CPersistUtils::restore(name, m_LastBucketTimes, traverser))
RESTORE(FEATURE_MODELS_TAG,
i == m_FeatureModels.size() ||
traverser.traverseSubLevel(std::bind(
&SFeatureModels::acceptRestoreTraverser, &m_FeatureModels[i++],
std::cref(this->params()), std::placeholders::_1)))
RESTORE(FEATURE_CORRELATE_MODELS_TAG,
j == m_FeatureCorrelatesModels.size() ||
traverser.traverseSubLevel(std::bind(
&SFeatureCorrelateModels::acceptRestoreTraverser,
&m_FeatureCorrelatesModels[j++],
std::cref(this->params()), std::placeholders::_1)))
RESTORE(MEMORY_ESTIMATOR_TAG,
core::CPersistUtils::restore(MEMORY_ESTIMATOR_TAG, m_MemoryEstimator, traverser))
RESTORE_BUILT_IN(UPGRADING_PRE_7_5_STATE, upgradingPre7p5State)
RESTORE(APPLIED_DETECTION_RULE_CHECKSUMS_TAG,
core::CPersistUtils::restore(name, this->appliedRuleChecksums(), traverser));
} while (traverser.next());
if (traverser.haveBadState()) {
return false;
}
const double DEFAULT_CUTOFF_TO_MODEL_EMPTY_BUCKETS{0.2};
for (auto& feature : m_FeatureModels) {
std::size_t dimension{model_t::dimension(feature.s_Feature)};
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec weights{
maths_t::CUnitWeights::unit<TDouble2Vec>(dimension)};
maths_t::setCount(TDouble2Vec(dimension, 50.0), weights[0]);
maths::common::CModelAddSamplesParams params;
params.isInteger(true)
.isNonNegative(true)
.propagationInterval(1.0)
.trendWeights(weights)
.priorWeights(weights);
for (std::size_t pid = 0; pid < feature.s_Models.size(); ++pid) {
if (upgradingPre7p5State &&
this->personFrequency(pid) < DEFAULT_CUTOFF_TO_MODEL_EMPTY_BUCKETS) {
maths::common::CModel::TTimeDouble2VecSizeTrVec value{core::make_triple(
this->lastBucketTimes()[pid], TDouble2Vec(dimension, 0.0),
model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID)};
feature.s_Models[pid]->addSamples(params, value);
}
for (const auto& correlates : m_FeatureCorrelatesModels) {
if (feature.s_Feature == correlates.s_Feature) {
feature.s_Models[pid]->modelCorrelations(*correlates.s_Models);
}
}
}
}
VIOLATES_INVARIANT(m_FirstBucketTimes.size(), !=, m_LastBucketTimes.size());
return true;
}
void CIndividualModel::createUpdateNewModels(core_t::TTime time,
CResourceMonitor& resourceMonitor) {
this->updateRecycledModels();
CDataGatherer& gatherer = this->dataGatherer();
std::size_t numberExistingPeople = m_FirstBucketTimes.size();
std::size_t numberCorrelations = this->numberCorrelations();
TOptionalSize usageEstimate = this->estimateMemoryUsage(
std::min(numberExistingPeople, gatherer.numberActivePeople()),
0, // # attributes
numberCorrelations);
std::size_t ourUsage = usageEstimate ? *usageEstimate : this->computeMemoryUsage();
std::size_t resourceLimit = ourUsage + resourceMonitor.allocationLimit();
std::size_t numberNewPeople = gatherer.numberPeople();
numberNewPeople = numberNewPeople > numberExistingPeople ? numberNewPeople - numberExistingPeople
: 0;
while (numberNewPeople > 0 && resourceMonitor.areAllocationsAllowed() &&
(resourceMonitor.haveNoLimit() || ourUsage < resourceLimit)) {
// We batch people in CHUNK_SIZE (500) and create models in chunks
// and test usage after each chunk.
std::size_t numberToCreate = std::min(numberNewPeople, CHUNK_SIZE);
LOG_TRACE(<< "Creating batch of " << numberToCreate
<< " people of remaining " << numberNewPeople << ". "
<< resourceLimit - ourUsage << " free bytes remaining");
this->createNewModels(numberToCreate, 0);
numberExistingPeople += numberToCreate;
numberNewPeople -= numberToCreate;
if (numberNewPeople > 0 && resourceMonitor.haveNoLimit() == false) {
ourUsage = this->estimateMemoryUsageOrComputeAndUpdate(
numberExistingPeople, 0, numberCorrelations);
}
}
this->estimateMemoryUsageOrComputeAndUpdate(numberExistingPeople, 0, numberCorrelations);
if (numberNewPeople > 0) {
resourceMonitor.acceptAllocationFailureResult(time);
LOG_DEBUG(<< "Not enough memory to create models");
core::CProgramCounters::counter(
counter_t::E_TSADNumberMemoryLimitModelCreationFailures) += numberNewPeople;
std::size_t toRemove = gatherer.numberPeople() - numberNewPeople;
gatherer.removePeople(toRemove);
}
this->refreshCorrelationModels(resourceLimit, resourceMonitor);
}
void CIndividualModel::createNewModels(std::size_t n, std::size_t m) {
if (n > 0) {
std::size_t newN = m_FirstBucketTimes.size() + n;
core::CAllocationStrategy::resize(m_FirstBucketTimes, newN,
CAnomalyDetectorModel::TIME_UNSET);
core::CAllocationStrategy::resize(m_LastBucketTimes, newN,
CAnomalyDetectorModel::TIME_UNSET);
for (auto& feature : m_FeatureModels) {
core::CAllocationStrategy::reserve(feature.s_Models, newN);
for (std::size_t pid = feature.s_Models.size(); pid < newN; ++pid) {
feature.s_Models.emplace_back(feature.s_NewModel->clone(pid));
for (const auto& correlates : m_FeatureCorrelatesModels) {
if (feature.s_Feature == correlates.s_Feature) {
feature.s_Models.back()->modelCorrelations(*correlates.s_Models);
}
}
}
}
}
this->CAnomalyDetectorModel::createNewModels(n, m);
}
void CIndividualModel::updateRecycledModels() {
for (auto pid : this->dataGatherer().recycledPersonIds()) {
if (pid < m_FirstBucketTimes.size()) {
m_FirstBucketTimes[pid] = CAnomalyDetectorModel::TIME_UNSET;
m_LastBucketTimes[pid] = CAnomalyDetectorModel::TIME_UNSET;
for (auto& feature : m_FeatureModels) {
feature.s_Models[pid].reset(feature.s_NewModel->clone(pid));
for (const auto& correlates : m_FeatureCorrelatesModels) {
if (feature.s_Feature == correlates.s_Feature) {
feature.s_Models.back()->modelCorrelations(*correlates.s_Models);
}
}
}
}
}
this->CAnomalyDetectorModel::updateRecycledModels();
}
void CIndividualModel::refreshCorrelationModels(std::size_t resourceLimit,
CResourceMonitor& resourceMonitor) {
std::size_t n = this->numberOfPeople();
double maxNumberCorrelations = this->params().s_CorrelationModelsOverhead *
static_cast<double>(n);
auto memoryUsage = std::bind(&CAnomalyDetectorModel::estimateMemoryUsageOrComputeAndUpdate,
this, n, 0, std::placeholders::_1);
CTimeSeriesCorrelateModelAllocator allocator(
resourceMonitor, memoryUsage, resourceLimit,
static_cast<std::size_t>(maxNumberCorrelations));
for (auto& feature : m_FeatureCorrelatesModels) {
allocator.prototypePrior(feature.s_ModelPrior);
feature.s_Models->refresh(allocator);
}
}
void CIndividualModel::clearPrunedResources(const TSizeVec& people,
const TSizeVec& /*attributes*/) {
for (auto pid : people) {
for (auto& feature : m_FeatureModels) {
if (pid < feature.s_Models.size()) {
feature.s_Models[pid].reset(this->tinyModel());
}
}
}
}
double CIndividualModel::emptyBucketWeight(model_t::EFeature feature,
std::size_t pid,
core_t::TTime time) const {
double weight{1.0};
if (model_t::includeEmptyBuckets(feature)) {
TOptionalUInt64 count{this->currentBucketCount(pid, time)};
if (count == std::nullopt || *count == 0) {
weight = maths::common::CModel::emptyBucketWeight(this->personFrequency(pid));
}
}
return weight;
}
const maths::common::CModel* CIndividualModel::model(model_t::EFeature feature,
std::size_t pid) const {
return const_cast<CIndividualModel*>(this)->model(feature, pid);
}
maths::common::CModel* CIndividualModel::model(model_t::EFeature feature, std::size_t pid) {
auto i = std::find_if(m_FeatureModels.begin(), m_FeatureModels.end(),
[feature](const SFeatureModels& model) {
return model.s_Feature == feature;
});
return i != m_FeatureModels.end() && pid < i->s_Models.size()
? i->s_Models[pid].get()
: nullptr;
}
void CIndividualModel::sampleCorrelateModels() {
for (const auto& feature : m_FeatureCorrelatesModels) {
feature.s_Models->processSamples();
}
}
void CIndividualModel::correctBaselineForInterim(model_t::EFeature feature,
std::size_t pid,
model_t::CResultType type,
const TSizeDoublePr1Vec& correlated,
const TFeatureSizeSizeTrDouble1VecUMap& corrections,
TDouble1Vec& result) const {
if (type.isInterim() && model_t::requiresInterimResultAdjustment(feature)) {
TFeatureSizeSizeTr key(feature, pid, pid);
switch (type.asConditionalOrUnconditional()) {
case model_t::CResultType::E_Unconditional:
break;
case model_t::CResultType::E_Conditional:
if (!correlated.empty()) {
key.third = correlated[0].first;
}
break;
}
auto correction = corrections.find(key);
if (correction != corrections.end()) {
result -= correction->second;
}
}
}
double CIndividualModel::derate(std::size_t pid, core_t::TTime time) const {
return std::max(1.0 - static_cast<double>(time - m_FirstBucketTimes[pid]) /
static_cast<double>(3 * core::constants::WEEK),
0.0);
}
std::string CIndividualModel::printCurrentBucket() const {
std::ostringstream result;
result << "[" << this->currentBucketStartTime() << ","
<< this->currentBucketStartTime() + this->bucketLength() << ")";
return result.str();
}
std::size_t CIndividualModel::numberCorrelations() const {
std::size_t result = 0;
for (const auto& feature : m_FeatureCorrelatesModels) {
result += feature.s_Models->correlationModels().size();
}
return result;
}
double CIndividualModel::attributeFrequency(std::size_t /*cid*/) const {
return 1.0;
}
void CIndividualModel::doSkipSampling(core_t::TTime startTime, core_t::TTime endTime) {
core_t::TTime gap = endTime - startTime;
for (auto& time : m_LastBucketTimes) {
if (!CAnomalyDetectorModel::isTimeUnset(time)) {
time = time + gap;
}
}
for (auto& feature : m_FeatureModels) {
for (auto& model : feature.s_Models) {
model->skipTime(gap);
}
}
}
void CIndividualModel::shiftTime(core_t::TTime time, core_t::TTime shift) {
for (auto& fm : m_FeatureModels) {
for (auto& model : fm.s_Models) {
model->shiftTime(time, shift);
}
}
this->addAnnotation(time, CAnnotation::E_ModelChange,
"Model shifted time by " + std::to_string(shift) + " seconds");
}
}
}