lib/model/CEventRateModel.cc (671 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <model/CEventRateModel.h> #include <core/CLogger.h> #include <core/CMemoryDef.h> #include <core/CStatePersistInserter.h> #include <core/CStateRestoreTraverser.h> #include <core/CoreTypes.h> #include <maths/common/CChecksum.h> #include <maths/common/COrderings.h> #include <maths/common/CRestoreParams.h> #include <maths/common/CTools.h> #include <maths/common/ProbabilityAggregators.h> #include <model/CAnnotatedProbabilityBuilder.h> #include <model/CAnnotation.h> #include <model/CAnomalyDetectorModel.h> #include <model/CAnomalyDetectorModelConfig.h> #include <model/CDataGatherer.h> #include <model/CIndividualModelDetail.h> #include <model/CInterimBucketCorrector.h> #include <model/CModelDetailsView.h> #include <model/CModelTools.h> #include <model/CProbabilityAndInfluenceCalculator.h> #include <model/CSearchKey.h> #include <model/FrequencyPredicates.h> #include <algorithm> #include <limits> #include <map> #include <string> #include <utility> namespace ml { namespace model { namespace { using TDouble2Vec = core::CSmallVector<double, 2>; using TDouble2Vec1Vec = core::CSmallVector<TDouble2Vec, 1>; using TTime2Vec = core::CSmallVector<core_t::TTime, 2>; // We use short field names to reduce the state size const std::string INDIVIDUAL_STATE_TAG("a"); const std::string PROBABILITY_PRIOR_TAG("b"); } CEventRateModel::CEventRateModel( const SModelParams& params, const TDataGathererPtr& dataGatherer, const TFeatureMathsModelSPtrPrVec& newFeatureModels, const TFeatureMultivariatePriorSPtrPrVec& newFeatureCorrelateModelPriors, TFeatureCorrelationsPtrPrVec&& featureCorrelatesModels, const maths::common::CMultinomialConjugate& probabilityPrior, const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators, const TInterimBucketCorrectorCPtr& interimBucketCorrector) : CIndividualModel(params, dataGatherer, newFeatureModels, newFeatureCorrelateModelPriors, std::move(featureCorrelatesModels), influenceCalculators), m_CurrentBucketStats(CAnomalyDetectorModel::TIME_UNSET), m_ProbabilityPrior(probabilityPrior), m_InterimBucketCorrector(interimBucketCorrector) { } CEventRateModel::CEventRateModel(const SModelParams& params, const TDataGathererPtr& dataGatherer, const TFeatureMathsModelSPtrPrVec& newFeatureModels, const TFeatureMultivariatePriorSPtrPrVec& newFeatureCorrelateModelPriors, TFeatureCorrelationsPtrPrVec&& featureCorrelatesModels, const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators, const TInterimBucketCorrectorCPtr& interimBucketCorrector, core::CStateRestoreTraverser& traverser) : CIndividualModel(params, dataGatherer, newFeatureModels, newFeatureCorrelateModelPriors, std::move(featureCorrelatesModels), influenceCalculators), m_CurrentBucketStats(CAnomalyDetectorModel::TIME_UNSET), m_InterimBucketCorrector(interimBucketCorrector) { if (traverser.traverseSubLevel(std::bind(&CEventRateModel::acceptRestoreTraverser, this, std::placeholders::_1)) == false) { traverser.setBadState(); } } CEventRateModel::CEventRateModel(bool isForPersistence, const CEventRateModel& other) : CIndividualModel(isForPersistence, other), m_CurrentBucketStats(0), // Not needed for persistence so minimally constructed m_ProbabilityPrior(other.m_ProbabilityPrior) { if (!isForPersistence) { LOG_ABORT(<< "This constructor only creates clones for persistence"); } } void CEventRateModel::persistModelsState(core::CStatePersistInserter& inserter) const { this->doPersistModelsState(inserter); } void CEventRateModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertLevel(INDIVIDUAL_STATE_TAG, std::bind(&CEventRateModel::doAcceptPersistInserter, this, std::placeholders::_1)); inserter.insertLevel(PROBABILITY_PRIOR_TAG, std::bind(&maths::common::CMultinomialConjugate::acceptPersistInserter, &m_ProbabilityPrior, std::placeholders::_1)); } bool CEventRateModel::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { do { const std::string& name = traverser.name(); RESTORE(INDIVIDUAL_STATE_TAG, traverser.traverseSubLevel(std::bind(&CEventRateModel::doAcceptRestoreTraverser, this, std::placeholders::_1))) RESTORE_NO_ERROR( PROBABILITY_PRIOR_TAG, m_ProbabilityPrior = maths::common::CMultinomialConjugate( this->params().distributionRestoreParams(maths_t::E_DiscreteData), traverser)) } while (traverser.next()); return true; } CAnomalyDetectorModel* CEventRateModel::cloneForPersistence() const { return new CEventRateModel(true, *this); } model_t::EModelType CEventRateModel::category() const { return model_t::E_EventRateOnline; } bool CEventRateModel::isEventRate() const { return true; } bool CEventRateModel::isMetric() const { return false; } void CEventRateModel::currentBucketPersonIds(core_t::TTime time, TSizeVec& result) const { this->CIndividualModel::currentBucketPersonIds( time, m_CurrentBucketStats.s_FeatureData, result); } CEventRateModel::TOptionalDouble CEventRateModel::baselineBucketCount(std::size_t /*pid*/) const { return TOptionalDouble(); } CEventRateModel::TDouble1Vec CEventRateModel::currentBucketValue(model_t::EFeature feature, std::size_t pid, std::size_t /*cid*/, core_t::TTime time) const { const TFeatureData* data = this->featureData(feature, pid, time); if (data) { return TDouble1Vec(1, static_cast<double>(data->s_Count)); } return TDouble1Vec(); } CEventRateModel::TDouble1Vec CEventRateModel::baselineBucketMean(model_t::EFeature feature, std::size_t pid, std::size_t cid, model_t::CResultType type, const TSizeDoublePr1Vec& correlated, core_t::TTime time) const { const maths::common::CModel* model{this->model(feature, pid)}; if (!model) { return TDouble1Vec(); } static const TSizeDoublePr1Vec NO_CORRELATED; TDouble2Vec hint; if (model_t::isDiurnal(feature)) { hint = this->currentBucketValue(feature, pid, cid, time); } TDouble1Vec result(model->predict( time, type.isUnconditional() ? NO_CORRELATED : correlated, hint)); double probability = 1.0; if (model_t::isConstant(feature) && !m_Probabilities.lookup(pid, probability)) { probability = 1.0; } for (auto& coord : result) { coord = probability * model_t::inverseOffsetCountToZero(feature, coord); } this->correctBaselineForInterim(feature, pid, type, correlated, this->currentBucketInterimCorrections(), result); TDouble1VecDouble1VecPr support{model_t::support(feature)}; return maths::common::CTools::truncate(result, support.first, support.second); } void CEventRateModel::sampleBucketStatistics(core_t::TTime startTime, core_t::TTime endTime, CResourceMonitor& resourceMonitor) { this->createUpdateNewModels(startTime, resourceMonitor); this->currentBucketInterimCorrections().clear(); this->CIndividualModel::sampleBucketStatistics( startTime, endTime, this->personFilter(), m_CurrentBucketStats.s_FeatureData, resourceMonitor); } void CEventRateModel::sample(core_t::TTime startTime, core_t::TTime endTime, CResourceMonitor& resourceMonitor) { CDataGatherer& gatherer = this->dataGatherer(); core_t::TTime bucketLength = gatherer.bucketLength(); if (!gatherer.validateSampleTimes(startTime, endTime)) { return; } this->createUpdateNewModels(startTime, resourceMonitor); this->currentBucketInterimCorrections().clear(); m_CurrentBucketStats.s_Annotations.clear(); for (core_t::TTime time = startTime; time < endTime; time += bucketLength) { LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")"); gatherer.sampleNow(time); gatherer.featureData(time, bucketLength, m_CurrentBucketStats.s_FeatureData); const CIndividualModel::TTimeVec& preSampleLastBucketTimes = this->lastBucketTimes(); CIndividualModel::TSizeTimeUMap lastBucketTimesMap; for (const auto& featureData : m_CurrentBucketStats.s_FeatureData) { for (const auto& data : featureData.second) { std::size_t pid = data.first; lastBucketTimesMap[pid] = preSampleLastBucketTimes[pid]; } } this->CIndividualModel::sample(time, time + bucketLength, resourceMonitor); // Declared outside the loop to minimize the number of times they are created. maths::common::CModel::TTimeDouble2VecSizeTrVec values; maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec trendWeights; maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec priorWeights; for (auto& featureData : m_CurrentBucketStats.s_FeatureData) { model_t::EFeature feature = featureData.first; TSizeFeatureDataPrVec& data = featureData.second; std::size_t dimension = model_t::dimension(feature); LOG_TRACE(<< model_t::print(feature) << ": " << data); if (feature == model_t::E_IndividualTotalBucketCountByPerson) { for (const auto& data_ : data) { if (data_.second.s_Count > 0) { LOG_TRACE(<< "person = " << this->personName(data_.first)); m_ProbabilityPrior.addSamples({static_cast<double>(data_.first)}, maths_t::CUnitWeights::SINGLE_UNIT); } } if (!data.empty()) { m_ProbabilityPrior.propagateForwardsByTime(1.0); } continue; } if (model_t::isCategorical(feature)) { continue; } this->applyFilter(model_t::E_XF_By, true, this->personFilter(), data); for (const auto& data_ : data) { std::size_t pid = data_.first; maths::common::CModel* model = this->model(feature, pid); if (model == nullptr) { LOG_ERROR(<< "Missing model for " << this->personName(pid)); continue; } // initialCountWeight returns a weight value as double: // 0.0 if checkScheduledEvents is true // 1.0 if both checkScheduledEvents and checkRules are false // A small weight - 0.005 - if checkRules is true. // This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier. // This reduces the impact of the values affected by the skip_model_update rule // on the model while not completely ignoring them. This still allows the model to // learn from the affected values - addressing point 1. and 2. in // https://github.com/elastic/ml-cpp/issues/1272, Namely // 1. If you apply it from the start of the modelling it can stop the model learning anything at all. // 2. It can stop the model ever adapting to some change in data characteristics core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength); double initialCountWeight = this->initialCountWeight( feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, sampleTime); if (initialCountWeight == 0.0) { model->skipTime(sampleTime - lastBucketTimesMap[pid]); continue; } // For sparse data we reduce the impact of samples from empty buckets. // In effect, we smoothly transition to modeling only values from non-empty // buckets as the data becomes sparse. double emptyBucketWeight = this->emptyBucketWeight(feature, pid, time); if (emptyBucketWeight == 0.0) { continue; } double count = model_t::offsetCountToZero( feature, static_cast<double>(data_.second.s_Count)); TDouble2Vec value{count}; double outlierWeightDerate = this->derate(pid, sampleTime); double countWeight = initialCountWeight * this->learnRate(feature); // Note we need to scale the amount of data we'll "age out" of the residual // model in one bucket by the empty bucket weight so the posterior doesn't // end up too flat. double scaledInterval = emptyBucketWeight; double scaledCountWeight = emptyBucketWeight * countWeight; LOG_TRACE(<< "Bucket = " << this->printCurrentBucket() << ", feature = " << model_t::print(feature) << ", count = " << count << ", person = " << this->personName(pid) << ", empty bucket weight = " << emptyBucketWeight << ", count weight = " << countWeight << ", scaled count weight = " << scaledCountWeight << ", scaled interval = " << scaledInterval); values.assign(1, core::make_triple(sampleTime, value, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID)); trendWeights.resize(1, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension)); priorWeights.resize(1, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension)); model->countWeights(sampleTime, value, countWeight, scaledCountWeight, outlierWeightDerate, 1.0, // count variance scale trendWeights[0], priorWeights[0]); auto annotationCallback = [&](const std::string& annotation) { if (this->params().s_AnnotationsEnabled) { m_CurrentBucketStats.s_Annotations.emplace_back( time, CAnnotation::E_ModelChange, annotation, gatherer.searchKey().detectorIndex(), gatherer.searchKey().partitionFieldName(), gatherer.partitionFieldValue(), gatherer.searchKey().overFieldName(), EMPTY_STRING, gatherer.searchKey().byFieldName(), gatherer.personName(pid)); } }; maths::common::CModelAddSamplesParams params; auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor); params.isInteger(true) .isNonNegative(true) .propagationInterval(scaledInterval) .trendWeights(trendWeights) .priorWeights(priorWeights) .bucketOccupancy(model_t::includeEmptyBuckets(feature) ? this->personFrequency(pid) : 1.0) .firstValueTime(pid < this->firstBucketTimes().size() ? this->firstBucketTimes()[pid] : std::numeric_limits<core_t::TTime>::min()) .annotationCallback([&](const std::string& annotation) { annotationCallback(annotation); }) .memoryCircuitBreaker(circuitBreaker); if (model->addSamples(params, values) == maths::common::CModel::E_Reset) { gatherer.resetSampleCount(pid); } } } this->sampleCorrelateModels(); m_Probabilities = TCategoryProbabilityCache(m_ProbabilityPrior); } } bool CEventRateModel::computeProbability(std::size_t pid, core_t::TTime startTime, core_t::TTime endTime, CPartitioningFields& partitioningFields, std::size_t /*numberAttributeProbabilities*/, SAnnotatedProbability& result) const { const CDataGatherer& gatherer = this->dataGatherer(); core_t::TTime bucketLength = gatherer.bucketLength(); if (endTime != startTime + bucketLength) { LOG_ERROR(<< "Can only compute probability for single bucket"); return false; } if (pid >= this->firstBucketTimes().size()) { LOG_ERROR(<< "No first time for person = " << gatherer.personName(pid)); return false; } CAnnotatedProbabilityBuilder resultBuilder(result, 1, // # attribute probabilities function_t::function(gatherer.features())); CProbabilityAndInfluenceCalculator pJoint(this->params().s_InfluenceCutoff); pJoint.addAggregator(maths::common::CJointProbabilityOfLessLikelySamples()); CProbabilityAndInfluenceCalculator pFeatures(this->params().s_InfluenceCutoff); pFeatures.addAggregator(maths::common::CJointProbabilityOfLessLikelySamples()); pFeatures.addAggregator(maths::common::CProbabilityOfExtremeSample()); bool addPersonProbability{false}; bool skippedResults{false}; for (std::size_t i = 0u, n = gatherer.numberFeatures(); i < n; ++i) { model_t::EFeature feature = gatherer.feature(i); if (model_t::isCategorical(feature)) { continue; } const TFeatureData* data = this->featureData(feature, pid, startTime); if (!data) { continue; } if (this->shouldSkipUpdate(feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, model_t::sampleTime(feature, startTime, bucketLength))) { result.s_ShouldUpdateQuantiles = false; } if (this->shouldIgnoreResult( feature, result.s_ResultType, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, model_t::sampleTime(feature, startTime, bucketLength))) { skippedResults = true; continue; } addPersonProbability = true; LOG_TRACE(<< "value(" << this->personName(pid) << ") = " << data->print()); if (this->correlates(feature, pid, startTime)) { CProbabilityAndInfluenceCalculator::SCorrelateParams params(partitioningFields); TStrCRefDouble1VecDouble1VecPrPrVecVecVec influenceValues; this->fill(feature, pid, startTime, result.isInterim(), params, influenceValues); this->addProbabilityAndInfluences(pid, params, influenceValues, pFeatures, resultBuilder); } else { CProbabilityAndInfluenceCalculator::SParams params(partitioningFields); if (this->fill(feature, pid, startTime, result.isInterim(), params)) { this->addProbabilityAndInfluences(pid, params, data->s_InfluenceValues, pFeatures, resultBuilder); } } } TOptionalUInt64 count = this->currentBucketCount(pid, startTime); pJoint.add(pFeatures); if (addPersonProbability && count && *count != 0) { double p; if (m_Probabilities.lookup(pid, p)) { LOG_TRACE(<< "P(" << gatherer.personName(pid) << ") = " << p); pJoint.addProbability(p); } } double p{1.0}; if (skippedResults && pJoint.empty()) { // This means we have skipped results for all features. // We set the probability to 1.0 here to ensure the // quantiles are updated accordingly. } else if (pJoint.empty()) { LOG_TRACE(<< "No samples in [" << startTime << "," << endTime << ")"); return false; } else if (!pJoint.calculate(p, result.s_Influences)) { LOG_ERROR(<< "Failed to compute probability"); return false; } LOG_TRACE(<< "probability(" << this->personName(pid) << ") = " << p); resultBuilder.probability(p); double multiBucketImpact{-1.0 * CAnomalyDetectorModelConfig::MAXIMUM_MULTI_BUCKET_IMPACT_MAGNITUDE}; if (pJoint.calculateMultiBucketImpact(multiBucketImpact)) { resultBuilder.multiBucketImpact(multiBucketImpact); } resultBuilder.anomalyScoreExplanation() = pJoint.anomalyScoreExplanation(); auto& anomalyScoreExplanation{resultBuilder.anomalyScoreExplanation()}; bool everSeenBefore = this->firstBucketTimes()[pid] != startTime; auto typicalConcentration = m_Probabilities.medianConcentration(); double actualConcentration; if (m_ProbabilityPrior.concentration(pid, actualConcentration) && typicalConcentration.has_value()) { anomalyScoreExplanation.s_ByFieldActualConcentration = actualConcentration; anomalyScoreExplanation.s_ByFieldTypicalConcentration = typicalConcentration.value(); } anomalyScoreExplanation.s_ByFieldFirstOccurrence = !everSeenBefore; resultBuilder.build(); return true; } std::uint64_t CEventRateModel::checksum(bool includeCurrentBucketStats) const { using TStrCRefUInt64Map = std::map<TStrCRef, std::uint64_t, maths::common::COrderings::SLess>; std::uint64_t seed = this->CIndividualModel::checksum(includeCurrentBucketStats); TStrCRefUInt64Map hashes; const TDoubleVec& categories = m_ProbabilityPrior.categories(); const TDoubleVec& concentrations = m_ProbabilityPrior.concentrations(); for (std::size_t i = 0; i < categories.size(); ++i) { std::uint64_t& hash = hashes[std::cref(this->personName(static_cast<std::size_t>(categories[i])))]; hash = maths::common::CChecksum::calculate(hash, concentrations[i]); } if (includeCurrentBucketStats) { for (const auto& featureData_ : m_CurrentBucketStats.s_FeatureData) { for (const auto& data : featureData_.second) { std::uint64_t& hash = hashes[std::cref(this->personName(data.first))]; hash = maths::common::CChecksum::calculate(hash, data.second.s_Count); } } } LOG_TRACE(<< "seed = " << seed); LOG_TRACE(<< "hashes = " << hashes); return maths::common::CChecksum::calculate(seed, hashes); } void CEventRateModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CEventRateModel"); this->CIndividualModel::debugMemoryUsage(mem->addChild()); core::memory_debug::dynamicSize("m_CurrentBucketStats.s_PersonCounts", m_CurrentBucketStats.s_PersonCounts, mem); core::memory_debug::dynamicSize("m_CurrentBucketStats.s_FeatureData", m_CurrentBucketStats.s_FeatureData, mem); core::memory_debug::dynamicSize("m_CurrentBucketStats.s_InterimCorrections", m_CurrentBucketStats.s_InterimCorrections, mem); core::memory_debug::dynamicSize("m_CurrentBucketStats.s_Annotations", m_CurrentBucketStats.s_Annotations, mem); core::memory_debug::dynamicSize("s_Probabilities", m_Probabilities, mem); core::memory_debug::dynamicSize("m_ProbabilityPrior", m_ProbabilityPrior, mem); core::memory_debug::dynamicSize("m_InterimBucketCorrector", m_InterimBucketCorrector, mem); } std::size_t CEventRateModel::memoryUsage() const { return this->CIndividualModel::memoryUsage(); } std::size_t CEventRateModel::staticSize() const { return sizeof(*this); } std::size_t CEventRateModel::computeMemoryUsage() const { std::size_t mem = this->CIndividualModel::computeMemoryUsage(); mem += core::memory::dynamicSize(m_CurrentBucketStats.s_PersonCounts); mem += core::memory::dynamicSize(m_CurrentBucketStats.s_FeatureData); mem += core::memory::dynamicSize(m_CurrentBucketStats.s_InterimCorrections); mem += core::memory::dynamicSize(m_CurrentBucketStats.s_Annotations); mem += core::memory::dynamicSize(m_Probabilities); mem += core::memory::dynamicSize(m_ProbabilityPrior); mem += core::memory::dynamicSize(m_InterimBucketCorrector); return mem; } CEventRateModel::TModelDetailsViewUPtr CEventRateModel::details() const { return TModelDetailsViewUPtr(new CEventRateModelDetailsView(*this)); } const CEventRateModel::TFeatureData* CEventRateModel::featureData(model_t::EFeature feature, std::size_t pid, core_t::TTime time) const { return this->CIndividualModel::featureData(feature, pid, time, m_CurrentBucketStats.s_FeatureData); } const CEventRateModel::TAnnotationVec& CEventRateModel::annotations() const { return m_CurrentBucketStats.s_Annotations; } core_t::TTime CEventRateModel::currentBucketStartTime() const { return m_CurrentBucketStats.s_StartTime; } void CEventRateModel::currentBucketStartTime(core_t::TTime time) { m_CurrentBucketStats.s_StartTime = time; } const CEventRateModel::TSizeUInt64PrVec& CEventRateModel::currentBucketPersonCounts() const { return m_CurrentBucketStats.s_PersonCounts; } CEventRateModel::TSizeUInt64PrVec& CEventRateModel::currentBucketPersonCounts() { return m_CurrentBucketStats.s_PersonCounts; } CIndividualModel::TFeatureSizeSizeTrDouble1VecUMap& CEventRateModel::currentBucketInterimCorrections() const { return m_CurrentBucketStats.s_InterimCorrections; } void CEventRateModel::clearPrunedResources(const TSizeVec& people, const TSizeVec& attributes) { CDataGatherer& gatherer = this->dataGatherer(); // Stop collecting for these people and add them to the free list. gatherer.recyclePeople(people); if (gatherer.dataAvailable(m_CurrentBucketStats.s_StartTime)) { gatherer.featureData(m_CurrentBucketStats.s_StartTime, gatherer.bucketLength(), m_CurrentBucketStats.s_FeatureData); } TDoubleVec categoriesToRemove; categoriesToRemove.reserve(people.size()); for (std::size_t i = 0; i < people.size(); ++i) { categoriesToRemove.push_back(static_cast<double>(people[i])); } m_ProbabilityPrior.removeCategories(categoriesToRemove); m_Probabilities = TCategoryProbabilityCache(m_ProbabilityPrior); this->CIndividualModel::clearPrunedResources(people, attributes); } const CInterimBucketCorrector& CEventRateModel::interimValueCorrector() const { return *m_InterimBucketCorrector; } bool CEventRateModel::correlates(model_t::EFeature feature, std::size_t pid, core_t::TTime time) const { if (model_t::dimension(feature) > 1 || !this->params().s_MultivariateByFields) { return false; } const maths::common::CModel* model{this->model(feature, pid)}; for (const auto& correlate : model->correlates()) { if (this->featureData( feature, pid == correlate[0] ? correlate[1] : correlate[0], time)) { return true; } } return false; } bool CEventRateModel::fill(model_t::EFeature feature, std::size_t pid, core_t::TTime bucketTime, bool interim, CProbabilityAndInfluenceCalculator::SParams& params) const { const TFeatureData* data{this->featureData(feature, pid, bucketTime)}; if (data == nullptr) { LOG_TRACE(<< "Feature data unexpectedly null"); return false; } const maths::common::CModel* model{this->model(feature, pid)}; if (model == nullptr) { LOG_TRACE(<< "Model unexpectedly null"); return false; } core_t::TTime time{model_t::sampleTime(feature, bucketTime, this->bucketLength())}; double value{model_t::offsetCountToZero(feature, static_cast<double>(data->s_Count))}; maths_t::TDouble2VecWeightsAry weights{[&] { TDouble2Vec result; model->seasonalWeight(maths::common::DEFAULT_SEASONAL_CONFIDENCE_INTERVAL, time, result); return maths_t::seasonalVarianceScaleWeight(result); }()}; double initialCountWeight{this->initialCountWeight( feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, time)}; params.s_Feature = feature; params.s_Model = model; params.s_ElapsedTime = bucketTime - this->firstBucketTimes()[pid]; params.s_Time.assign(1, TTime2Vec{time}); params.s_Value.assign(1, TDouble2Vec{value}); if (interim && model_t::requiresInterimResultAdjustment(feature)) { double mode{params.s_Model->mode(time, weights)[0]}; TDouble2Vec correction{this->interimValueCorrector().corrections(mode, value)}; params.s_Value[0] += correction; this->currentBucketInterimCorrections().emplace( core::make_triple(feature, pid, pid), correction); } params.s_Count = 1.0; params.s_ComputeProbabilityParams .addCalculation(model_t::probabilityCalculation(feature)) .addWeights(weights) .initialCountWeight(initialCountWeight); return true; } void CEventRateModel::fill(model_t::EFeature feature, std::size_t pid, core_t::TTime bucketTime, bool interim, CProbabilityAndInfluenceCalculator::SCorrelateParams& params, TStrCRefDouble1VecDouble1VecPrPrVecVecVec& influenceValues) const { using TStrCRefDouble1VecDoublePrPr = std::pair<TStrCRef, TDouble1VecDoublePr>; const CDataGatherer& gatherer{this->dataGatherer()}; const maths::common::CModel* model{this->model(feature, pid)}; const TSize2Vec1Vec& correlates{model->correlates()}; const TTimeVec& firstBucketTimes{this->firstBucketTimes()}; core_t::TTime time{model_t::sampleTime(feature, bucketTime, gatherer.bucketLength())}; double initialCountWeight{this->initialCountWeight( feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, time)}; params.s_Feature = feature; params.s_Model = model; params.s_ElapsedTime = std::numeric_limits<core_t::TTime>::max(); params.s_Times.resize(correlates.size()); params.s_Values.resize(correlates.size()); params.s_Counts.resize(correlates.size()); params.s_Variables.resize(correlates.size()); params.s_CorrelatedLabels.resize(correlates.size()); params.s_Correlated.resize(correlates.size()); params.s_ComputeProbabilityParams .addCalculation(model_t::probabilityCalculation(feature)) .initialCountWeight(initialCountWeight); // These are indexed as follows: // influenceValues["influencer name"]["correlate"]["influence value"] // This is because we aren't guaranteed that each influence is present for // each feature. influenceValues.resize( this->featureData(feature, pid, bucketTime)->s_InfluenceValues.size(), TStrCRefDouble1VecDouble1VecPrPrVecVec(correlates.size())); // Declared outside the loop to minimize the number of times it is created. TDouble1VecDouble1VecPr value; TDouble2Vec seasonalWeights[2]; for (std::size_t i = 0; i < correlates.size(); ++i) { TSize2Vec variables = pid == correlates[i][0] ? TSize2Vec{0, 1} : TSize2Vec{1, 0}; params.s_CorrelatedLabels[i] = gatherer.personName(correlates[i][variables[1]]); params.s_Correlated[i] = correlates[i][variables[1]]; params.s_Variables[i] = variables; const maths::common::CModel* models[]{ model, this->model(feature, correlates[i][variables[1]])}; models[0]->seasonalWeight(maths::common::DEFAULT_SEASONAL_CONFIDENCE_INTERVAL, time, seasonalWeights[0]); models[1]->seasonalWeight(maths::common::DEFAULT_SEASONAL_CONFIDENCE_INTERVAL, time, seasonalWeights[1]); maths_t::TDouble2Vec seasonalWeight(2); seasonalWeight[variables[0]] = seasonalWeights[0][0]; seasonalWeight[variables[1]] = seasonalWeights[1][0]; params.s_ComputeProbabilityParams.addWeights( maths_t::seasonalVarianceScaleWeight(seasonalWeight)); const TFeatureData* data[2]; data[0] = this->featureData(feature, correlates[i][0], bucketTime); data[1] = this->featureData(feature, correlates[i][1], bucketTime); if (data[0] && data[1]) { params.s_ElapsedTime = std::min( params.s_ElapsedTime, bucketTime - firstBucketTimes[correlates[i][0]]); params.s_ElapsedTime = std::min( params.s_ElapsedTime, bucketTime - firstBucketTimes[correlates[i][1]]); params.s_Times[i] = TTime2Vec(2, time); params.s_Values[i] = TDouble2Vec{ model_t::offsetCountToZero(feature, static_cast<double>(data[0]->s_Count)), model_t::offsetCountToZero(feature, static_cast<double>(data[1]->s_Count))}; for (std::size_t j = 0; j < data[0]->s_InfluenceValues.size(); ++j) { for (const auto& influenceValue : data[0]->s_InfluenceValues[j]) { TStrCRef influence = influenceValue.first; std::size_t match = static_cast<std::size_t>( std::find_if(data[1]->s_InfluenceValues[j].begin(), data[1]->s_InfluenceValues[j].end(), [influence](const TStrCRefDouble1VecDoublePrPr& value_) { return value_.first.get() == influence.get(); }) - data[1]->s_InfluenceValues[j].begin()); if (match < data[1]->s_InfluenceValues[j].size()) { const TDouble1VecDoublePr& value0 = influenceValue.second; const TDouble1VecDoublePr& value1 = data[1]->s_InfluenceValues[j][match].second; value.first = TDouble1Vec{value0.first[0], value1.first[0]}; value.second = TDouble1Vec{value0.second, value1.second}; influenceValues[j][i].emplace_back(influence, value); } } } } } if (interim && model_t::requiresInterimResultAdjustment(feature)) { TDouble2Vec1Vec modes = params.s_Model->correlateModes( time, params.s_ComputeProbabilityParams.weights()); for (std::size_t i = 0; i < modes.size(); ++i) { TDouble2Vec& value_ = params.s_Values[i]; if (!value_.empty()) { TDouble2Vec correction( this->interimValueCorrector().corrections(modes[i], value_)); value_ += correction; this->currentBucketInterimCorrections().emplace( core::make_triple(feature, pid, params.s_Correlated[i]), TDouble1Vec{correction[params.s_Variables[i][0]]}); } } } } void CEventRateModel::addAnnotation(core_t::TTime time, CAnnotation::EEvent type, const std::string& annotation) { m_CurrentBucketStats.s_Annotations.emplace_back( time, type, annotation, this->dataGatherer().searchKey().detectorIndex(), EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING); } ////////// CEventRateModel::SBucketStats Implementation ////////// CEventRateModel::SBucketStats::SBucketStats(core_t::TTime startTime) : s_StartTime(startTime), s_InterimCorrections(1) { } } }