lib/model/CMetricPopulationModel.cc (892 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <model/CMetricPopulationModel.h> #include <core/CAllocationStrategy.h> #include <core/CLogger.h> #include <core/CMemoryDefStd.h> #include <core/CStatePersistInserter.h> #include <core/CStateRestoreTraverser.h> #include <core/RestoreMacros.h> #include <maths/common/CChecksum.h> #include <maths/common/CMultivariatePrior.h> #include <maths/common/COrderings.h> #include <maths/common/CTools.h> #include <maths/common/ProbabilityAggregators.h> #include <model/CAnnotatedProbabilityBuilder.h> #include <model/CAnnotation.h> #include <model/CDataGatherer.h> #include <model/CGathererTools.h> #include <model/CInterimBucketCorrector.h> #include <model/CModelDetailsView.h> #include <model/CModelFactory.h> #include <model/CPartitioningFields.h> #include <model/CPopulationModelDetail.h> #include <model/CProbabilityAndInfluenceCalculator.h> #include <model/CSearchKey.h> #include <model/FrequencyPredicates.h> #include <boost/unordered_map.hpp> #include <map> #include <optional> namespace ml { namespace model { namespace { using TDouble2Vec = core::CSmallVector<double, 2>; using TDouble2Vec1Vec = core::CSmallVector<TDouble2Vec, 1>; using TTime2Vec = core::CSmallVector<core_t::TTime, 2>; using TOptionalSample = std::optional<CSample>; using TSizeSizePrFeatureDataPrVec = CMetricPopulationModel::TSizeSizePrFeatureDataPrVec; using TFeatureSizeSizePrFeatureDataPrVecPr = std::pair<model_t::EFeature, TSizeSizePrFeatureDataPrVec>; using TFeatureSizeSizePrFeatureDataPrVecPrVec = std::vector<TFeatureSizeSizePrFeatureDataPrVecPr>; using TSizeFuzzyDeduplicateUMap = boost::unordered_map<std::size_t, CModelTools::CFuzzyDeduplicate>; //! \brief The values and weights for an attribute. struct SValuesAndWeights { SValuesAndWeights() : s_IsInteger(false), s_IsNonNegative(false) {} bool s_IsInteger, s_IsNonNegative; maths::common::CModel::TTimeDouble2VecSizeTrVec s_BucketValues; maths::common::CModel::TTimeDouble2VecSizeTrVec s_Values; maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec s_TrendWeights; maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec s_ResidualWeights; }; using TSizeValuesAndWeightsUMap = boost::unordered_map<std::size_t, SValuesAndWeights>; // We use short field names to reduce the state size const std::string POPULATION_STATE_TAG("a"); const std::string FEATURE_MODELS_TAG("b"); const std::string FEATURE_CORRELATE_MODELS_TAG("c"); const std::string MEMORY_ESTIMATOR_TAG("d"); } // unnamed:: CMetricPopulationModel::CMetricPopulationModel( const SModelParams& params, const TDataGathererPtr& dataGatherer, const TFeatureMathsModelSPtrPrVec& newFeatureModels, const TFeatureMultivariatePriorSPtrPrVec& newFeatureCorrelateModelPriors, TFeatureCorrelationsPtrPrVec&& featureCorrelatesModels, const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators, const TInterimBucketCorrectorCPtr& interimBucketCorrector) : CPopulationModel(params, dataGatherer, influenceCalculators), m_CurrentBucketStats(dataGatherer->currentBucketStartTime() - dataGatherer->bucketLength()), m_InterimBucketCorrector(interimBucketCorrector), m_Probabilities(0.05) { this->initialize(newFeatureModels, newFeatureCorrelateModelPriors, std::move(featureCorrelatesModels)); } CMetricPopulationModel::CMetricPopulationModel( const SModelParams& params, const TDataGathererPtr& dataGatherer, const TFeatureMathsModelSPtrPrVec& newFeatureModels, const TFeatureMultivariatePriorSPtrPrVec& newFeatureCorrelateModelPriors, TFeatureCorrelationsPtrPrVec&& featureCorrelatesModels, const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators, const TInterimBucketCorrectorCPtr& interimBucketCorrector, core::CStateRestoreTraverser& traverser) : CPopulationModel(params, dataGatherer, influenceCalculators), m_CurrentBucketStats(dataGatherer->currentBucketStartTime() - dataGatherer->bucketLength()), m_InterimBucketCorrector(interimBucketCorrector), m_Probabilities(0.05) { this->initialize(newFeatureModels, newFeatureCorrelateModelPriors, std::move(featureCorrelatesModels)); if (traverser.traverseSubLevel(std::bind(&CMetricPopulationModel::acceptRestoreTraverser, this, std::placeholders::_1)) == false) { traverser.setBadState(); } } void CMetricPopulationModel::initialize(const TFeatureMathsModelSPtrPrVec& newFeatureModels, const TFeatureMultivariatePriorSPtrPrVec& newFeatureCorrelateModelPriors, TFeatureCorrelationsPtrPrVec&& featureCorrelatesModels) { m_FeatureModels.reserve(newFeatureModels.size()); for (const auto& model : newFeatureModels) { m_FeatureModels.emplace_back(model.first, model.second); } std::sort(m_FeatureModels.begin(), m_FeatureModels.end(), [](const SFeatureModels& lhs, const SFeatureModels& rhs) { return lhs.s_Feature < rhs.s_Feature; }); if (this->params().s_MultivariateByFields) { m_FeatureCorrelatesModels.reserve(featureCorrelatesModels.size()); for (std::size_t i = 0; i < featureCorrelatesModels.size(); ++i) { m_FeatureCorrelatesModels.emplace_back( featureCorrelatesModels[i].first, newFeatureCorrelateModelPriors[i].second, std::move(featureCorrelatesModels[i].second)); } std::sort(m_FeatureCorrelatesModels.begin(), m_FeatureCorrelatesModels.end(), [](const SFeatureCorrelateModels& lhs, const SFeatureCorrelateModels& rhs) { return lhs.s_Feature < rhs.s_Feature; }); } } CMetricPopulationModel::CMetricPopulationModel(bool isForPersistence, const CMetricPopulationModel& other) : CPopulationModel(isForPersistence, other), m_CurrentBucketStats(0), // Not needed for persistence so minimally constructed m_Probabilities(0.05), // Not needed for persistence so minimally construct m_MemoryEstimator(other.m_MemoryEstimator) { if (!isForPersistence) { LOG_ABORT(<< "This constructor only creates clones for persistence"); } m_FeatureModels.reserve(m_FeatureModels.size()); for (const auto& feature : other.m_FeatureModels) { m_FeatureModels.emplace_back(feature.s_Feature, feature.s_NewModel); m_FeatureModels.back().s_Models.reserve(feature.s_Models.size()); for (const auto& model : feature.s_Models) { m_FeatureModels.back().s_Models.emplace_back(model->cloneForPersistence()); } } m_FeatureCorrelatesModels.reserve(other.m_FeatureCorrelatesModels.size()); for (const auto& feature : other.m_FeatureCorrelatesModels) { m_FeatureCorrelatesModels.emplace_back( feature.s_Feature, feature.s_ModelPrior, TCorrelationsPtr(feature.s_Models->cloneForPersistence())); } } bool CMetricPopulationModel::shouldPersist() const { return std::any_of(m_FeatureModels.begin(), m_FeatureModels.end(), [](const auto& model) { return model.shouldPersist(); }); } void CMetricPopulationModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertLevel(POPULATION_STATE_TAG, std::bind(&CMetricPopulationModel::doAcceptPersistInserter, this, std::placeholders::_1)); for (const auto& feature : m_FeatureModels) { inserter.insertLevel(FEATURE_MODELS_TAG, std::bind(&SFeatureModels::acceptPersistInserter, &feature, std::placeholders::_1)); } for (const auto& feature : m_FeatureCorrelatesModels) { inserter.insertLevel(FEATURE_CORRELATE_MODELS_TAG, std::bind(&SFeatureCorrelateModels::acceptPersistInserter, &feature, std::placeholders::_1)); } core::CPersistUtils::persist(MEMORY_ESTIMATOR_TAG, m_MemoryEstimator, inserter); } bool CMetricPopulationModel::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { std::size_t i = 0u, j = 0; do { const std::string& name = traverser.name(); RESTORE(POPULATION_STATE_TAG, traverser.traverseSubLevel(std::bind(&CMetricPopulationModel::doAcceptRestoreTraverser, this, std::placeholders::_1))) RESTORE(FEATURE_MODELS_TAG, i == m_FeatureModels.size() || traverser.traverseSubLevel(std::bind( &SFeatureModels::acceptRestoreTraverser, &m_FeatureModels[i++], std::cref(this->params()), std::placeholders::_1))) RESTORE(FEATURE_CORRELATE_MODELS_TAG, j == m_FeatureCorrelatesModels.size() || traverser.traverseSubLevel(std::bind( &SFeatureCorrelateModels::acceptRestoreTraverser, &m_FeatureCorrelatesModels[j++], std::cref(this->params()), std::placeholders::_1))) RESTORE(MEMORY_ESTIMATOR_TAG, core::CPersistUtils::restore(MEMORY_ESTIMATOR_TAG, m_MemoryEstimator, traverser)) } while (traverser.next()); for (auto& feature : m_FeatureModels) { for (auto& model : feature.s_Models) { for (const auto& correlates : m_FeatureCorrelatesModels) { if (feature.s_Feature == correlates.s_Feature) { model->modelCorrelations(*correlates.s_Models); } } } } return true; } CAnomalyDetectorModel* CMetricPopulationModel::cloneForPersistence() const { return new CMetricPopulationModel(true, *this); } model_t::EModelType CMetricPopulationModel::category() const { return model_t::E_MetricOnline; } bool CMetricPopulationModel::isEventRate() const { return false; } bool CMetricPopulationModel::isMetric() const { return true; } CMetricPopulationModel::TDouble1Vec CMetricPopulationModel::currentBucketValue(model_t::EFeature feature, std::size_t pid, std::size_t cid, core_t::TTime time) const { const TSizeSizePrFeatureDataPrVec& featureData = this->featureData(feature, time); auto i = find(featureData, pid, cid); return i != featureData.end() ? extractValue(feature, *i) : TDouble1Vec(); } CMetricPopulationModel::TDouble1Vec CMetricPopulationModel::baselineBucketMean(model_t::EFeature feature, std::size_t pid, std::size_t cid, model_t::CResultType type, const TSizeDoublePr1Vec& correlated, core_t::TTime time) const { const maths::common::CModel* model{this->model(feature, cid)}; if (model == nullptr) { return TDouble1Vec(); } static const TSizeDoublePr1Vec NO_CORRELATED; TDouble1Vec result(model->predict(time, type.isUnconditional() ? NO_CORRELATED : correlated)); this->correctBaselineForInterim(feature, pid, cid, type, correlated, this->currentBucketInterimCorrections(), result); TDouble1VecDouble1VecPr support = model_t::support(feature); return maths::common::CTools::truncate(result, support.first, support.second); } bool CMetricPopulationModel::bucketStatsAvailable(core_t::TTime time) const { return time >= m_CurrentBucketStats.s_StartTime && time < m_CurrentBucketStats.s_StartTime + this->bucketLength(); } void CMetricPopulationModel::sampleBucketStatistics(core_t::TTime startTime, core_t::TTime endTime, CResourceMonitor& resourceMonitor) { CDataGatherer& gatherer = this->dataGatherer(); core_t::TTime bucketLength = gatherer.bucketLength(); if (!gatherer.dataAvailable(startTime)) { return; } this->createUpdateNewModels(startTime, resourceMonitor); this->currentBucketInterimCorrections().clear(); for (core_t::TTime time = startTime; time < endTime; time += bucketLength) { // Currently, we only remember one bucket. m_CurrentBucketStats.s_StartTime = time; TSizeUInt64PrVec& personCounts = m_CurrentBucketStats.s_PersonCounts; gatherer.personNonZeroCounts(time, personCounts); this->applyFilter(model_t::E_XF_Over, false, this->personFilter(), personCounts); TFeatureSizeSizePrFeatureDataPrVecPrVec featureData; gatherer.featureData(time, bucketLength, featureData); for (auto& featureData_ : featureData) { model_t::EFeature feature = featureData_.first; TSizeSizePrFeatureDataPrVec& data = m_CurrentBucketStats.s_FeatureData[feature]; data.swap(featureData_.second); LOG_TRACE(<< model_t::print(feature) << ": " << data); this->applyFilters(false, this->personFilter(), this->attributeFilter(), data); } } } void CMetricPopulationModel::sample(core_t::TTime startTime, core_t::TTime endTime, CResourceMonitor& resourceMonitor) { CDataGatherer& gatherer = this->dataGatherer(); core_t::TTime bucketLength = gatherer.bucketLength(); if (!gatherer.validateSampleTimes(startTime, endTime)) { return; } this->createUpdateNewModels(startTime, resourceMonitor); this->currentBucketInterimCorrections().clear(); m_CurrentBucketStats.s_Annotations.clear(); for (core_t::TTime time = startTime; time < endTime; time += bucketLength) { LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")"); gatherer.sampleNow(time); TFeatureSizeSizePrFeatureDataPrVecPrVec featureData; gatherer.featureData(time, bucketLength, featureData); const TTimeVec& preSampleAttributeLastBucketTimes = this->attributeLastBucketTimes(); TSizeTimeUMap attributeLastBucketTimesMap; for (const auto& featureData_ : featureData) { TSizeSizePrFeatureDataPrVec& data = m_CurrentBucketStats.s_FeatureData[featureData_.first]; for (const auto& data_ : data) { std::size_t cid = CDataGatherer::extractAttributeId(data_); attributeLastBucketTimesMap[cid] = preSampleAttributeLastBucketTimes[cid]; } } this->CPopulationModel::sample(time, time + bucketLength, resourceMonitor); // Currently, we only remember one bucket. m_CurrentBucketStats.s_StartTime = time; TSizeUInt64PrVec& personCounts = m_CurrentBucketStats.s_PersonCounts; gatherer.personNonZeroCounts(time, personCounts); this->applyFilter(model_t::E_XF_Over, true, this->personFilter(), personCounts); const TTimeVec& attributeLastBucketTimes = this->attributeLastBucketTimes(); for (auto& featureData_ : featureData) { model_t::EFeature feature = featureData_.first; std::size_t dimension = model_t::dimension(feature); TSizeSizePrFeatureDataPrVec& data = m_CurrentBucketStats.s_FeatureData[feature]; data.swap(featureData_.second); LOG_TRACE(<< model_t::print(feature) << ": " << data); this->applyFilters(true, this->personFilter(), this->attributeFilter(), data); TSizeValuesAndWeightsUMap attributeValuesAndWeights; TSizeFuzzyDeduplicateUMap duplicates; if (data.size() >= this->params().s_MinimumToFuzzyDeduplicate) { // Set up fuzzy de-duplication. for (const auto& data_ : data) { std::size_t cid = CDataGatherer::extractAttributeId(data_); const CGathererTools::TSampleVec& samples = CDataGatherer::extractData(data_).s_Samples; for (const auto& sample : samples) { duplicates[cid].add(TDouble2Vec(sample.value(dimension))); } } for (auto& attribute : duplicates) { attribute.second.computeEpsilons( bucketLength, this->params().s_MinimumToFuzzyDeduplicate); } } for (const auto& data_ : data) { std::size_t pid = CDataGatherer::extractPersonId(data_); std::size_t cid = CDataGatherer::extractAttributeId(data_); maths::common::CModel* model{this->model(feature, cid)}; if (model == nullptr) { LOG_ERROR(<< "Missing model for " << this->attributeName(cid)); continue; } // initialCountWeight returns a weight value as double: // 0.0 if checkScheduledEvents is true // 1.0 if both checkScheduledEvents and checkRules are false // A small weight - 0.005 - if checkRules is true. // This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier. // This reduces the impact of the values affected by the skip_model_update rule // on the model while not completely ignoring them. This still allows the model to // learn from the affected values - addressing point 1. and 2. in // https://github.com/elastic/ml-cpp/issues/1272, Namely // 1. If you apply it from the start of the modelling it can stop the model learning anything at all. // 2. It can stop the model ever adapting to some change in data characteristics core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength); double initialCountWeight{ this->initialCountWeight(feature, pid, cid, sampleTime)}; if (initialCountWeight == 0.0) { core_t::TTime skipTime = sampleTime - attributeLastBucketTimesMap[cid]; if (skipTime > 0) { model->skipTime(skipTime); // Update the last time so we don't advance the same model // multiple times (once per person) attributeLastBucketTimesMap[cid] = sampleTime; } continue; } const TOptionalSample& bucket = CDataGatherer::extractData(data_).s_BucketValue; const CGathererTools::TSampleVec& samples = CDataGatherer::extractData(data_).s_Samples; bool isInteger = CDataGatherer::extractData(data_).s_IsInteger; bool isNonNegative = CDataGatherer::extractData(data_).s_IsNonNegative; core_t::TTime cutoff = attributeLastBucketTimes[cid] - this->params().s_SamplingAgeCutoff; LOG_TRACE(<< "Adding " << CDataGatherer::extractData(data_) << " for person = " << gatherer.personName(pid) << " and attribute = " << gatherer.attributeName(cid)); SValuesAndWeights& attribute = attributeValuesAndWeights[cid]; attribute.s_IsInteger &= isInteger; attribute.s_IsNonNegative &= isNonNegative; if (model_t::isSampled(feature) && bucket) { attribute.s_BucketValues.emplace_back( bucket->time(), TDouble2Vec(bucket->value(dimension)), pid); } std::size_t n = std::count_if(samples.begin(), samples.end(), [cutoff](const CSample& sample) { return sample.time() >= cutoff; }); double updatesPerBucket = this->params().s_MaximumUpdatesPerBucket; double countWeight = initialCountWeight * this->sampleRateWeight(pid, cid) * this->learnRate(feature) * (updatesPerBucket > 0.0 && n > 0 ? updatesPerBucket / static_cast<double>(n) : 1.0); LOG_TRACE(<< "countWeight = " << countWeight); for (const auto& sample : samples) { if (sample.time() < cutoff) { continue; } double countVarianceScale = sample.varianceScale(); TDouble2Vec value(sample.value(dimension)); std::size_t duplicate = duplicates[cid].duplicate(sample.time(), value); if (duplicate < attribute.s_Values.size()) { model->addCountWeights(sample.time(), countWeight, countWeight, countVarianceScale, attribute.s_TrendWeights[duplicate], attribute.s_ResidualWeights[duplicate]); } else { attribute.s_Values.emplace_back(sample.time(), value, pid); attribute.s_TrendWeights.push_back( maths_t::CUnitWeights::unit<TDouble2Vec>(dimension)); attribute.s_ResidualWeights.push_back( maths_t::CUnitWeights::unit<TDouble2Vec>(dimension)); auto& trendWeight = attribute.s_TrendWeights.back(); auto& residualWeight = attribute.s_ResidualWeights.back(); model->countWeights(sample.time(), value, countWeight, countWeight, 1.0, // outlier weight derate countVarianceScale, trendWeight, residualWeight); } } } for (auto& attribute : attributeValuesAndWeights) { std::size_t cid = attribute.first; core_t::TTime latest = std::numeric_limits<core_t::TTime>::lowest(); for (const auto& value : attribute.second.s_Values) { latest = std::max(latest, value.first); } auto annotationCallback = [&](const std::string& annotation) { if (this->params().s_AnnotationsEnabled) { m_CurrentBucketStats.s_Annotations.emplace_back( time, CAnnotation::E_ModelChange, annotation, gatherer.searchKey().detectorIndex(), gatherer.searchKey().partitionFieldName(), gatherer.partitionFieldValue(), gatherer.searchKey().overFieldName(), gatherer.attributeName(cid), gatherer.searchKey().byFieldName(), EMPTY_STRING); } }; maths::common::CModelAddSamplesParams params; auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor); params.isInteger(attribute.second.s_IsInteger) .isNonNegative(attribute.second.s_IsNonNegative) .propagationInterval(this->propagationTime(cid, latest)) .trendWeights(attribute.second.s_TrendWeights) .priorWeights(attribute.second.s_ResidualWeights) .firstValueTime(cid < this->attributeFirstBucketTimes().size() ? this->attributeFirstBucketTimes()[cid] : std::numeric_limits<core_t::TTime>::min()) .annotationCallback([&](const std::string& annotation) { annotationCallback(annotation); }) .memoryCircuitBreaker(circuitBreaker); maths::common::CModel* model{this->model(feature, cid)}; if (model == nullptr) { LOG_TRACE(<< "Model unexpectedly null"); return; } if (model->addSamples(params, attribute.second.s_Values) == maths::common::CModel::E_Reset) { gatherer.resetSampleCount(cid); } } } for (const auto& feature : m_FeatureCorrelatesModels) { feature.s_Models->processSamples(); } m_Probabilities.clear(); } } void CMetricPopulationModel::prune(std::size_t maximumAge) { CDataGatherer& gatherer = this->dataGatherer(); TSizeVec peopleToRemove; TSizeVec attributesToRemove; this->peopleAndAttributesToRemove(m_CurrentBucketStats.s_StartTime, maximumAge, peopleToRemove, attributesToRemove); if (peopleToRemove.empty() && attributesToRemove.empty()) { return; } std::sort(peopleToRemove.begin(), peopleToRemove.end()); std::sort(attributesToRemove.begin(), attributesToRemove.end()); LOG_DEBUG(<< "Removing people {" << this->printPeople(peopleToRemove, 20) << '}'); LOG_DEBUG(<< "Removing attributes {" << this->printAttributes(attributesToRemove, 20) << '}'); // Stop collecting for these people/attributes and add them // to the free list. gatherer.recyclePeople(peopleToRemove); gatherer.recycleAttributes(attributesToRemove); if (gatherer.dataAvailable(m_CurrentBucketStats.s_StartTime)) { TFeatureSizeSizePrFeatureDataPrVecPrVec featureData; gatherer.featureData(m_CurrentBucketStats.s_StartTime, gatherer.bucketLength(), featureData); for (auto& feature : featureData) { m_CurrentBucketStats.s_FeatureData[feature.first].swap(feature.second); } } this->clearPrunedResources(peopleToRemove, attributesToRemove); this->removePeople(peopleToRemove); } bool CMetricPopulationModel::computeProbability(std::size_t pid, core_t::TTime startTime, core_t::TTime endTime, CPartitioningFields& partitioningFields, std::size_t numberAttributeProbabilities, SAnnotatedProbability& result) const { const CDataGatherer& gatherer = this->dataGatherer(); core_t::TTime bucketLength = gatherer.bucketLength(); if (endTime != startTime + bucketLength) { LOG_ERROR(<< "Can only compute probability for single bucket"); return false; } if (pid > gatherer.numberPeople()) { LOG_TRACE(<< "No person for pid = " << pid); return false; } using TOptionalStr = std::optional<std::string>; using TOptionalStr1Vec = core::CSmallVector<TOptionalStr, 1>; static const TOptionalStr1Vec NO_CORRELATED_ATTRIBUTES; static const TSizeDoublePr1Vec NO_CORRELATES; partitioningFields.add(gatherer.attributeFieldName(), EMPTY_STRING); CAnnotatedProbabilityBuilder resultBuilder( result, std::max(numberAttributeProbabilities, std::size_t(1)), function_t::function(gatherer.features())); LOG_TRACE(<< "computeProbability(" << gatherer.personName(pid) << ")"); CProbabilityAndInfluenceCalculator pJoint(this->params().s_InfluenceCutoff); pJoint.addAggregator(maths::common::CJointProbabilityOfLessLikelySamples()); pJoint.addAggregator(maths::common::CProbabilityOfExtremeSample()); if (this->params().s_CacheProbabilities) { pJoint.addCache(m_Probabilities); } for (std::size_t i = 0; i < gatherer.numberFeatures(); ++i) { model_t::EFeature feature = gatherer.feature(i); if (model_t::isCategorical(feature)) { continue; } LOG_TRACE(<< "feature = " << model_t::print(feature)); const TSizeSizePrFeatureDataPrVec& featureData = this->featureData(feature, startTime); TSizeSizePr range = personRange(featureData, pid); for (std::size_t j = range.first; j < range.second; ++j) { std::size_t cid = CDataGatherer::extractAttributeId(featureData[j]); partitioningFields.back().second = TStrCRef(gatherer.attributeName(cid)); const TOptionalSample& bucket = CDataGatherer::extractData(featureData[j]).s_BucketValue; if (!bucket) { LOG_ERROR(<< "Expected a value for feature = " << model_t::print(feature) << ", person = " << gatherer.personName(pid) << ", attribute = " << gatherer.attributeName(cid)); continue; } if (this->shouldSkipUpdate(feature, pid, cid, model_t::sampleTime(feature, startTime, bucketLength))) { result.s_ShouldUpdateQuantiles = false; } if (this->shouldIgnoreResult(feature, result.s_ResultType, pid, cid, model_t::sampleTime(feature, startTime, bucketLength, bucket->time()))) { continue; } if (this->correlates(feature, pid, cid, startTime)) { // TODO } else { CProbabilityAndInfluenceCalculator::SParams params(partitioningFields); if (this->fill(feature, pid, cid, startTime, result.isInterim(), params) == false) { continue; } model_t::CResultType type; TSize1Vec mostAnomalousCorrelate; if (pJoint.addProbability(feature, cid, *params.s_Model, params.s_ElapsedTime, params.s_ComputeProbabilityParams, params.s_Time, params.s_Value, params.s_Probability, params.s_Tail, type, mostAnomalousCorrelate)) { LOG_TRACE(<< "P(" << params.describe() << ", attribute = " << gatherer.attributeName(cid) << ", person = " << this->personName(pid) << ") = " << params.s_Probability); const auto& influenceValues = CDataGatherer::extractData(featureData[j]).s_InfluenceValues; for (std::size_t k = 0; k < influenceValues.size(); ++k) { if (const CInfluenceCalculator* influenceCalculator = this->influenceCalculator(feature, k)) { pJoint.plugin(*influenceCalculator); pJoint.addInfluences(*(gatherer.beginInfluencers() + k), influenceValues[k], params); } } resultBuilder.addAttributeProbability( cid, gatherer.attributeName(cid), params.s_Probability, model_t::CResultType::E_Unconditional, feature, NO_CORRELATED_ATTRIBUTES, NO_CORRELATES); } else { LOG_ERROR(<< "Failed to compute P(" << params.describe() << ", attribute = " << gatherer.attributeName(cid) << ", person = " << this->personName(pid) << ")"); } } } } if (pJoint.empty()) { LOG_TRACE(<< "No samples in [" << startTime << "," << endTime << ")"); return false; } double p; if (!pJoint.calculate(p, result.s_Influences)) { LOG_ERROR(<< "Failed to compute probability of " << this->personName(pid)); return false; } LOG_TRACE(<< "probability(" << this->personName(pid) << ") = " << p); resultBuilder.probability(p); resultBuilder.build(); return true; } bool CMetricPopulationModel::computeTotalProbability(const std::string& /*person*/, std::size_t /*numberAttributeProbabilities*/, TOptionalDouble& probability, TAttributeProbability1Vec& attributeProbabilities) const { probability = TOptionalDouble(); attributeProbabilities.clear(); return true; } std::uint64_t CMetricPopulationModel::checksum(bool includeCurrentBucketStats) const { std::uint64_t seed = this->CPopulationModel::checksum(includeCurrentBucketStats); if (includeCurrentBucketStats) { seed = maths::common::CChecksum::calculate(seed, m_CurrentBucketStats.s_StartTime); } using TStrCRefStrCRefPr = std::pair<TStrCRef, TStrCRef>; using TStrCRefStrCRefPrUInt64Map = std::map<TStrCRefStrCRefPr, std::uint64_t, maths::common::COrderings::SLess>; const CDataGatherer& gatherer = this->dataGatherer(); TStrCRefStrCRefPrUInt64Map hashes; for (const auto& feature : m_FeatureModels) { for (std::size_t cid = 0; cid < feature.s_Models.size(); ++cid) { if (gatherer.isAttributeActive(cid)) { std::uint64_t& hash = hashes[{std::cref(EMPTY_STRING), std::cref(gatherer.attributeName(cid))}]; hash = maths::common::CChecksum::calculate(hash, feature.s_Models[cid]); } } } for (const auto& feature : m_FeatureCorrelatesModels) { for (const auto& model : feature.s_Models->correlationModels()) { std::size_t cids[]{model.first.first, model.first.second}; if (gatherer.isAttributeActive(cids[0]) && gatherer.isAttributeActive(cids[1])) { std::uint64_t& hash = hashes[{std::cref(gatherer.attributeName(cids[0])), std::cref(gatherer.attributeName(cids[1]))}]; hash = maths::common::CChecksum::calculate(hash, model.second); } } } if (includeCurrentBucketStats) { for (const auto& personCount : this->personCounts()) { std::uint64_t& hash = hashes[{std::cref(gatherer.personName(personCount.first)), std::cref(EMPTY_STRING)}]; hash = maths::common::CChecksum::calculate(hash, personCount.second); } for (const auto& feature : m_CurrentBucketStats.s_FeatureData) { for (const auto& data_ : feature.second) { std::size_t pid = CDataGatherer::extractPersonId(data_); std::size_t cid = CDataGatherer::extractAttributeId(data_); const TFeatureData& data = CDataGatherer::extractData(data_); std::uint64_t& hash = hashes[{std::cref(this->personName(pid)), std::cref(this->attributeName(cid))}]; hash = maths::common::CChecksum::calculate(hash, data.s_BucketValue); hash = maths::common::CChecksum::calculate(hash, data.s_IsInteger); hash = maths::common::CChecksum::calculate(hash, data.s_Samples); } } } LOG_TRACE(<< "seed = " << seed); LOG_TRACE(<< "hashes = " << hashes); return maths::common::CChecksum::calculate(seed, hashes); } void CMetricPopulationModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CMetricPopulationModel"); this->CPopulationModel::debugMemoryUsage(mem->addChild()); core::memory_debug::dynamicSize("m_CurrentBucketStats.s_PersonCounts", m_CurrentBucketStats.s_PersonCounts, mem); core::memory_debug::dynamicSize("m_CurrentBucketStats.s_FeatureData", m_CurrentBucketStats.s_FeatureData, mem); core::memory_debug::dynamicSize("m_CurrentBucketStats.s_InterimCorrections", m_CurrentBucketStats.s_InterimCorrections, mem); core::memory_debug::dynamicSize("m_CurrentBucketStats.s_Annotations", m_CurrentBucketStats.s_Annotations, mem); core::memory_debug::dynamicSize("m_FeatureModels", m_FeatureModels, mem); core::memory_debug::dynamicSize("m_FeatureCorrelatesModels", m_FeatureCorrelatesModels, mem); core::memory_debug::dynamicSize("m_InterimBucketCorrector", m_InterimBucketCorrector, mem); core::memory_debug::dynamicSize("m_MemoryEstimator", m_MemoryEstimator, mem); } std::size_t CMetricPopulationModel::memoryUsage() const { const CDataGatherer& gatherer = this->dataGatherer(); TOptionalSize estimate = this->estimateMemoryUsage( gatherer.numberActivePeople(), gatherer.numberActiveAttributes(), 0); // # correlations return estimate ? *estimate : this->computeMemoryUsage(); } std::size_t CMetricPopulationModel::computeMemoryUsage() const { std::size_t mem = this->CPopulationModel::memoryUsage(); mem += core::memory::dynamicSize(m_CurrentBucketStats.s_PersonCounts); mem += core::memory::dynamicSize(m_CurrentBucketStats.s_FeatureData); mem += core::memory::dynamicSize(m_CurrentBucketStats.s_InterimCorrections); mem += core::memory::dynamicSize(m_CurrentBucketStats.s_Annotations); mem += core::memory::dynamicSize(m_FeatureModels); mem += core::memory::dynamicSize(m_FeatureCorrelatesModels); mem += core::memory::dynamicSize(m_InterimBucketCorrector); mem += core::memory::dynamicSize(m_MemoryEstimator); return mem; } const CMetricPopulationModel::TAnnotationVec& CMetricPopulationModel::annotations() const { return m_CurrentBucketStats.s_Annotations; } CMemoryUsageEstimator* CMetricPopulationModel::memoryUsageEstimator() const { return &m_MemoryEstimator; } std::size_t CMetricPopulationModel::staticSize() const { return sizeof(*this); } CMetricPopulationModel::TModelDetailsViewUPtr CMetricPopulationModel::details() const { return TModelDetailsViewUPtr(new CMetricPopulationModelDetailsView(*this)); } const TSizeSizePrFeatureDataPrVec& CMetricPopulationModel::featureData(model_t::EFeature feature, core_t::TTime time) const { static const TSizeSizePrFeatureDataPrVec EMPTY; if (!this->bucketStatsAvailable(time)) { LOG_ERROR(<< "No statistics at " << time << ", current bucket = [" << m_CurrentBucketStats.s_StartTime << "," << m_CurrentBucketStats.s_StartTime + this->bucketLength() << ")"); return EMPTY; } auto result = m_CurrentBucketStats.s_FeatureData.find(feature); return result == m_CurrentBucketStats.s_FeatureData.end() ? EMPTY : result->second; } core_t::TTime CMetricPopulationModel::currentBucketStartTime() const { return m_CurrentBucketStats.s_StartTime; } void CMetricPopulationModel::currentBucketStartTime(core_t::TTime startTime) { m_CurrentBucketStats.s_StartTime = startTime; } const CMetricPopulationModel::TSizeUInt64PrVec& CMetricPopulationModel::personCounts() const { return m_CurrentBucketStats.s_PersonCounts; } CPopulationModel::TCorrectionKeyDouble1VecUMap& CMetricPopulationModel::currentBucketInterimCorrections() const { return m_CurrentBucketStats.s_InterimCorrections; } void CMetricPopulationModel::createNewModels(std::size_t n, std::size_t m) { if (m > 0) { for (auto& feature : m_FeatureModels) { std::size_t newM = feature.s_Models.size() + m; core::CAllocationStrategy::reserve(feature.s_Models, newM); for (std::size_t cid = feature.s_Models.size(); cid < newM; ++cid) { feature.s_Models.emplace_back(feature.s_NewModel->clone(cid)); for (const auto& correlates : m_FeatureCorrelatesModels) { if (feature.s_Feature == correlates.s_Feature) { feature.s_Models.back()->modelCorrelations(*correlates.s_Models); } } } } } this->CPopulationModel::createNewModels(n, m); } void CMetricPopulationModel::updateRecycledModels() { CDataGatherer& gatherer = this->dataGatherer(); for (auto cid : gatherer.recycledAttributeIds()) { for (auto& feature : m_FeatureModels) { if (cid < feature.s_Models.size()) { feature.s_Models[cid].reset(feature.s_NewModel->clone(cid)); for (const auto& correlates : m_FeatureCorrelatesModels) { if (feature.s_Feature == correlates.s_Feature) { feature.s_Models.back()->modelCorrelations(*correlates.s_Models); } } } } } this->CPopulationModel::updateRecycledModels(); } void CMetricPopulationModel::refreshCorrelationModels(std::size_t resourceLimit, CResourceMonitor& resourceMonitor) { std::size_t n = this->numberOfPeople(); double maxNumberCorrelations = this->params().s_CorrelationModelsOverhead * static_cast<double>(n); auto memoryUsage = std::bind(&CAnomalyDetectorModel::estimateMemoryUsageOrComputeAndUpdate, this, n, 0, std::placeholders::_1); CTimeSeriesCorrelateModelAllocator allocator( resourceMonitor, memoryUsage, resourceLimit, static_cast<std::size_t>(maxNumberCorrelations + 0.5)); for (auto& feature : m_FeatureCorrelatesModels) { allocator.prototypePrior(feature.s_ModelPrior); feature.s_Models->refresh(allocator); } } void CMetricPopulationModel::clearPrunedResources(const TSizeVec& /*people*/, const TSizeVec& /*attributes*/) { CDataGatherer& gatherer = this->dataGatherer(); for (auto cid : gatherer.recycledAttributeIds()) { for (auto& feature : m_FeatureModels) { if (cid < feature.s_Models.size()) { feature.s_Models[cid].reset(feature.s_NewModel->clone(cid)); for (const auto& correlates : m_FeatureCorrelatesModels) { if (feature.s_Feature == correlates.s_Feature) { feature.s_Models.back()->modelCorrelations(*correlates.s_Models); } } } } } } const CInterimBucketCorrector& CMetricPopulationModel::interimValueCorrector() const { return *m_InterimBucketCorrector; } void CMetricPopulationModel::doSkipSampling(core_t::TTime startTime, core_t::TTime endTime) { core_t::TTime gap = endTime - startTime; for (auto& feature : m_FeatureModels) { for (auto& model : feature.s_Models) { model->skipTime(gap); } } this->CPopulationModel::doSkipSampling(startTime, endTime); } const maths::common::CModel* CMetricPopulationModel::model(model_t::EFeature feature, std::size_t cid) const { return const_cast<CMetricPopulationModel*>(this)->model(feature, cid); } maths::common::CModel* CMetricPopulationModel::model(model_t::EFeature feature, std::size_t cid) { auto i = std::find_if(m_FeatureModels.begin(), m_FeatureModels.end(), [feature](const SFeatureModels& model) { return model.s_Feature == feature; }); return i != m_FeatureModels.end() && cid < i->s_Models.size() ? i->s_Models[cid].get() : nullptr; } bool CMetricPopulationModel::correlates(model_t::EFeature feature, std::size_t pid, std::size_t cid, core_t::TTime time) const { if (model_t::dimension(feature) > 1 || !this->params().s_MultivariateByFields) { return false; } const maths::common::CModel* model{this->model(feature, cid)}; if (model == nullptr) { LOG_TRACE(<< "Model unexpectedly null"); return false; } const TSizeSizePrFeatureDataPrVec& data = this->featureData(feature, time); TSizeSizePr range = personRange(data, pid); for (std::size_t j = range.first; j < range.second; ++j) { std::size_t cids[]{cid, CDataGatherer::extractAttributeId(data[j])}; for (const auto& correlate : model->correlates()) { if ((cids[0] == correlate[0] && cids[1] == correlate[1]) || (cids[1] == correlate[0] && cids[0] == correlate[1])) { return true; } } } return false; } bool CMetricPopulationModel::fill(model_t::EFeature feature, std::size_t pid, std::size_t cid, core_t::TTime bucketTime, bool interim, CProbabilityAndInfluenceCalculator::SParams& params) const { std::size_t dimension{model_t::dimension(feature)}; auto data = find(this->featureData(feature, bucketTime), pid, cid); const maths::common::CModel* model{this->model(feature, cid)}; if (model == nullptr) { LOG_TRACE(<< "Model unexpectedly null"); return false; } const TOptionalSample& bucket{CDataGatherer::extractData(*data).s_BucketValue}; core_t::TTime time{model_t::sampleTime(feature, bucketTime, this->bucketLength(), bucket->time())}; maths_t::TDouble2VecWeightsAry weights(maths_t::CUnitWeights::unit<TDouble2Vec>(dimension)); TDouble2Vec seasonalWeight; model->seasonalWeight(maths::common::DEFAULT_SEASONAL_CONFIDENCE_INTERVAL, time, seasonalWeight); maths_t::setSeasonalVarianceScale(seasonalWeight, weights); maths_t::setCountVarianceScale(TDouble2Vec(dimension, bucket->varianceScale()), weights); double initialCountWeight{this->initialCountWeight(feature, pid, cid, time)}; params.s_Feature = feature; params.s_Model = model; params.s_ElapsedTime = time - this->attributeFirstBucketTimes()[cid]; params.s_Time.assign(1, {time}); params.s_Value.assign(1, bucket->value()); if (interim && model_t::requiresInterimResultAdjustment(feature)) { TDouble2Vec mode(params.s_Model->mode(time, weights)); TDouble2Vec correction(this->interimValueCorrector().corrections( mode, bucket->value(dimension))); params.s_Value[0] += correction; this->currentBucketInterimCorrections().emplace( CCorrectionKey(feature, pid, cid), correction); } params.s_Count = bucket->count(); params.s_ComputeProbabilityParams .addCalculation(model_t::probabilityCalculation(feature)) .addWeights(weights) .initialCountWeight(initialCountWeight); return true; } void CMetricPopulationModel::addAnnotation(core_t::TTime time, CAnnotation::EEvent type, const std::string& annotation) { m_CurrentBucketStats.s_Annotations.emplace_back( time, type, annotation, this->dataGatherer().searchKey().detectorIndex(), EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING); } void CMetricPopulationModel::shiftTime(core_t::TTime time, core_t::TTime shift) { for (auto& feature : m_FeatureModels) { for (auto& model : feature.s_Models) { model->shiftTime(time, shift); } } this->addAnnotation(time, CAnnotation::E_ModelChange, "Model shifted time by " + std::to_string(shift) + " seconds"); } ////////// CMetricPopulationModel::SBucketStats Implementation ////////// CMetricPopulationModel::SBucketStats::SBucketStats(core_t::TTime startTime) : s_StartTime(startTime), s_InterimCorrections(1) { } } }