lib/model/CEventRateModel.cc (671 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CEventRateModel.h>
#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <core/CStatePersistInserter.h>
#include <core/CStateRestoreTraverser.h>
#include <core/CoreTypes.h>
#include <maths/common/CChecksum.h>
#include <maths/common/COrderings.h>
#include <maths/common/CRestoreParams.h>
#include <maths/common/CTools.h>
#include <maths/common/ProbabilityAggregators.h>
#include <model/CAnnotatedProbabilityBuilder.h>
#include <model/CAnnotation.h>
#include <model/CAnomalyDetectorModel.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CDataGatherer.h>
#include <model/CIndividualModelDetail.h>
#include <model/CInterimBucketCorrector.h>
#include <model/CModelDetailsView.h>
#include <model/CModelTools.h>
#include <model/CProbabilityAndInfluenceCalculator.h>
#include <model/CSearchKey.h>
#include <model/FrequencyPredicates.h>
#include <algorithm>
#include <limits>
#include <map>
#include <string>
#include <utility>
namespace ml {
namespace model {
namespace {
using TDouble2Vec = core::CSmallVector<double, 2>;
using TDouble2Vec1Vec = core::CSmallVector<TDouble2Vec, 1>;
using TTime2Vec = core::CSmallVector<core_t::TTime, 2>;
// We use short field names to reduce the state size
const std::string INDIVIDUAL_STATE_TAG("a");
const std::string PROBABILITY_PRIOR_TAG("b");
}
CEventRateModel::CEventRateModel(
const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TFeatureMathsModelSPtrPrVec& newFeatureModels,
const TFeatureMultivariatePriorSPtrPrVec& newFeatureCorrelateModelPriors,
TFeatureCorrelationsPtrPrVec&& featureCorrelatesModels,
const maths::common::CMultinomialConjugate& probabilityPrior,
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators,
const TInterimBucketCorrectorCPtr& interimBucketCorrector)
: CIndividualModel(params,
dataGatherer,
newFeatureModels,
newFeatureCorrelateModelPriors,
std::move(featureCorrelatesModels),
influenceCalculators),
m_CurrentBucketStats(CAnomalyDetectorModel::TIME_UNSET),
m_ProbabilityPrior(probabilityPrior),
m_InterimBucketCorrector(interimBucketCorrector) {
}
CEventRateModel::CEventRateModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TFeatureMathsModelSPtrPrVec& newFeatureModels,
const TFeatureMultivariatePriorSPtrPrVec& newFeatureCorrelateModelPriors,
TFeatureCorrelationsPtrPrVec&& featureCorrelatesModels,
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators,
const TInterimBucketCorrectorCPtr& interimBucketCorrector,
core::CStateRestoreTraverser& traverser)
: CIndividualModel(params,
dataGatherer,
newFeatureModels,
newFeatureCorrelateModelPriors,
std::move(featureCorrelatesModels),
influenceCalculators),
m_CurrentBucketStats(CAnomalyDetectorModel::TIME_UNSET),
m_InterimBucketCorrector(interimBucketCorrector) {
if (traverser.traverseSubLevel(std::bind(&CEventRateModel::acceptRestoreTraverser,
this, std::placeholders::_1)) == false) {
traverser.setBadState();
}
}
CEventRateModel::CEventRateModel(bool isForPersistence, const CEventRateModel& other)
: CIndividualModel(isForPersistence, other),
m_CurrentBucketStats(0), // Not needed for persistence so minimally constructed
m_ProbabilityPrior(other.m_ProbabilityPrior) {
if (!isForPersistence) {
LOG_ABORT(<< "This constructor only creates clones for persistence");
}
}
void CEventRateModel::persistModelsState(core::CStatePersistInserter& inserter) const {
this->doPersistModelsState(inserter);
}
void CEventRateModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertLevel(INDIVIDUAL_STATE_TAG,
std::bind(&CEventRateModel::doAcceptPersistInserter,
this, std::placeholders::_1));
inserter.insertLevel(PROBABILITY_PRIOR_TAG,
std::bind(&maths::common::CMultinomialConjugate::acceptPersistInserter,
&m_ProbabilityPrior, std::placeholders::_1));
}
bool CEventRateModel::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
do {
const std::string& name = traverser.name();
RESTORE(INDIVIDUAL_STATE_TAG,
traverser.traverseSubLevel(std::bind(&CEventRateModel::doAcceptRestoreTraverser,
this, std::placeholders::_1)))
RESTORE_NO_ERROR(
PROBABILITY_PRIOR_TAG,
m_ProbabilityPrior = maths::common::CMultinomialConjugate(
this->params().distributionRestoreParams(maths_t::E_DiscreteData), traverser))
} while (traverser.next());
return true;
}
CAnomalyDetectorModel* CEventRateModel::cloneForPersistence() const {
return new CEventRateModel(true, *this);
}
model_t::EModelType CEventRateModel::category() const {
return model_t::E_EventRateOnline;
}
bool CEventRateModel::isEventRate() const {
return true;
}
bool CEventRateModel::isMetric() const {
return false;
}
void CEventRateModel::currentBucketPersonIds(core_t::TTime time, TSizeVec& result) const {
this->CIndividualModel::currentBucketPersonIds(
time, m_CurrentBucketStats.s_FeatureData, result);
}
CEventRateModel::TOptionalDouble CEventRateModel::baselineBucketCount(std::size_t /*pid*/) const {
return TOptionalDouble();
}
CEventRateModel::TDouble1Vec CEventRateModel::currentBucketValue(model_t::EFeature feature,
std::size_t pid,
std::size_t /*cid*/,
core_t::TTime time) const {
const TFeatureData* data = this->featureData(feature, pid, time);
if (data) {
return TDouble1Vec(1, static_cast<double>(data->s_Count));
}
return TDouble1Vec();
}
CEventRateModel::TDouble1Vec
CEventRateModel::baselineBucketMean(model_t::EFeature feature,
std::size_t pid,
std::size_t cid,
model_t::CResultType type,
const TSizeDoublePr1Vec& correlated,
core_t::TTime time) const {
const maths::common::CModel* model{this->model(feature, pid)};
if (!model) {
return TDouble1Vec();
}
static const TSizeDoublePr1Vec NO_CORRELATED;
TDouble2Vec hint;
if (model_t::isDiurnal(feature)) {
hint = this->currentBucketValue(feature, pid, cid, time);
}
TDouble1Vec result(model->predict(
time, type.isUnconditional() ? NO_CORRELATED : correlated, hint));
double probability = 1.0;
if (model_t::isConstant(feature) && !m_Probabilities.lookup(pid, probability)) {
probability = 1.0;
}
for (auto& coord : result) {
coord = probability * model_t::inverseOffsetCountToZero(feature, coord);
}
this->correctBaselineForInterim(feature, pid, type, correlated,
this->currentBucketInterimCorrections(), result);
TDouble1VecDouble1VecPr support{model_t::support(feature)};
return maths::common::CTools::truncate(result, support.first, support.second);
}
void CEventRateModel::sampleBucketStatistics(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) {
this->createUpdateNewModels(startTime, resourceMonitor);
this->currentBucketInterimCorrections().clear();
this->CIndividualModel::sampleBucketStatistics(
startTime, endTime, this->personFilter(),
m_CurrentBucketStats.s_FeatureData, resourceMonitor);
}
void CEventRateModel::sample(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) {
CDataGatherer& gatherer = this->dataGatherer();
core_t::TTime bucketLength = gatherer.bucketLength();
if (!gatherer.validateSampleTimes(startTime, endTime)) {
return;
}
this->createUpdateNewModels(startTime, resourceMonitor);
this->currentBucketInterimCorrections().clear();
m_CurrentBucketStats.s_Annotations.clear();
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")");
gatherer.sampleNow(time);
gatherer.featureData(time, bucketLength, m_CurrentBucketStats.s_FeatureData);
const CIndividualModel::TTimeVec& preSampleLastBucketTimes = this->lastBucketTimes();
CIndividualModel::TSizeTimeUMap lastBucketTimesMap;
for (const auto& featureData : m_CurrentBucketStats.s_FeatureData) {
for (const auto& data : featureData.second) {
std::size_t pid = data.first;
lastBucketTimesMap[pid] = preSampleLastBucketTimes[pid];
}
}
this->CIndividualModel::sample(time, time + bucketLength, resourceMonitor);
// Declared outside the loop to minimize the number of times they are created.
maths::common::CModel::TTimeDouble2VecSizeTrVec values;
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec trendWeights;
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec priorWeights;
for (auto& featureData : m_CurrentBucketStats.s_FeatureData) {
model_t::EFeature feature = featureData.first;
TSizeFeatureDataPrVec& data = featureData.second;
std::size_t dimension = model_t::dimension(feature);
LOG_TRACE(<< model_t::print(feature) << ": " << data);
if (feature == model_t::E_IndividualTotalBucketCountByPerson) {
for (const auto& data_ : data) {
if (data_.second.s_Count > 0) {
LOG_TRACE(<< "person = " << this->personName(data_.first));
m_ProbabilityPrior.addSamples({static_cast<double>(data_.first)},
maths_t::CUnitWeights::SINGLE_UNIT);
}
}
if (!data.empty()) {
m_ProbabilityPrior.propagateForwardsByTime(1.0);
}
continue;
}
if (model_t::isCategorical(feature)) {
continue;
}
this->applyFilter(model_t::E_XF_By, true, this->personFilter(), data);
for (const auto& data_ : data) {
std::size_t pid = data_.first;
maths::common::CModel* model = this->model(feature, pid);
if (model == nullptr) {
LOG_ERROR(<< "Missing model for " << this->personName(pid));
continue;
}
// initialCountWeight returns a weight value as double:
// 0.0 if checkScheduledEvents is true
// 1.0 if both checkScheduledEvents and checkRules are false
// A small weight - 0.005 - if checkRules is true.
// This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier.
// This reduces the impact of the values affected by the skip_model_update rule
// on the model while not completely ignoring them. This still allows the model to
// learn from the affected values - addressing point 1. and 2. in
// https://github.com/elastic/ml-cpp/issues/1272, Namely
// 1. If you apply it from the start of the modelling it can stop the model learning anything at all.
// 2. It can stop the model ever adapting to some change in data characteristics
core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength);
double initialCountWeight = this->initialCountWeight(
feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, sampleTime);
if (initialCountWeight == 0.0) {
model->skipTime(sampleTime - lastBucketTimesMap[pid]);
continue;
}
// For sparse data we reduce the impact of samples from empty buckets.
// In effect, we smoothly transition to modeling only values from non-empty
// buckets as the data becomes sparse.
double emptyBucketWeight = this->emptyBucketWeight(feature, pid, time);
if (emptyBucketWeight == 0.0) {
continue;
}
double count = model_t::offsetCountToZero(
feature, static_cast<double>(data_.second.s_Count));
TDouble2Vec value{count};
double outlierWeightDerate = this->derate(pid, sampleTime);
double countWeight = initialCountWeight * this->learnRate(feature);
// Note we need to scale the amount of data we'll "age out" of the residual
// model in one bucket by the empty bucket weight so the posterior doesn't
// end up too flat.
double scaledInterval = emptyBucketWeight;
double scaledCountWeight = emptyBucketWeight * countWeight;
LOG_TRACE(<< "Bucket = " << this->printCurrentBucket()
<< ", feature = " << model_t::print(feature) << ", count = "
<< count << ", person = " << this->personName(pid)
<< ", empty bucket weight = " << emptyBucketWeight
<< ", count weight = " << countWeight
<< ", scaled count weight = " << scaledCountWeight
<< ", scaled interval = " << scaledInterval);
values.assign(1, core::make_triple(sampleTime, value, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID));
trendWeights.resize(1, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
priorWeights.resize(1, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
model->countWeights(sampleTime, value, countWeight, scaledCountWeight,
outlierWeightDerate, 1.0, // count variance scale
trendWeights[0], priorWeights[0]);
auto annotationCallback = [&](const std::string& annotation) {
if (this->params().s_AnnotationsEnabled) {
m_CurrentBucketStats.s_Annotations.emplace_back(
time, CAnnotation::E_ModelChange, annotation,
gatherer.searchKey().detectorIndex(),
gatherer.searchKey().partitionFieldName(),
gatherer.partitionFieldValue(),
gatherer.searchKey().overFieldName(), EMPTY_STRING,
gatherer.searchKey().byFieldName(), gatherer.personName(pid));
}
};
maths::common::CModelAddSamplesParams params;
auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor);
params.isInteger(true)
.isNonNegative(true)
.propagationInterval(scaledInterval)
.trendWeights(trendWeights)
.priorWeights(priorWeights)
.bucketOccupancy(model_t::includeEmptyBuckets(feature)
? this->personFrequency(pid)
: 1.0)
.firstValueTime(pid < this->firstBucketTimes().size()
? this->firstBucketTimes()[pid]
: std::numeric_limits<core_t::TTime>::min())
.annotationCallback([&](const std::string& annotation) {
annotationCallback(annotation);
})
.memoryCircuitBreaker(circuitBreaker);
if (model->addSamples(params, values) == maths::common::CModel::E_Reset) {
gatherer.resetSampleCount(pid);
}
}
}
this->sampleCorrelateModels();
m_Probabilities = TCategoryProbabilityCache(m_ProbabilityPrior);
}
}
bool CEventRateModel::computeProbability(std::size_t pid,
core_t::TTime startTime,
core_t::TTime endTime,
CPartitioningFields& partitioningFields,
std::size_t /*numberAttributeProbabilities*/,
SAnnotatedProbability& result) const {
const CDataGatherer& gatherer = this->dataGatherer();
core_t::TTime bucketLength = gatherer.bucketLength();
if (endTime != startTime + bucketLength) {
LOG_ERROR(<< "Can only compute probability for single bucket");
return false;
}
if (pid >= this->firstBucketTimes().size()) {
LOG_ERROR(<< "No first time for person = " << gatherer.personName(pid));
return false;
}
CAnnotatedProbabilityBuilder resultBuilder(result,
1, // # attribute probabilities
function_t::function(gatherer.features()));
CProbabilityAndInfluenceCalculator pJoint(this->params().s_InfluenceCutoff);
pJoint.addAggregator(maths::common::CJointProbabilityOfLessLikelySamples());
CProbabilityAndInfluenceCalculator pFeatures(this->params().s_InfluenceCutoff);
pFeatures.addAggregator(maths::common::CJointProbabilityOfLessLikelySamples());
pFeatures.addAggregator(maths::common::CProbabilityOfExtremeSample());
bool addPersonProbability{false};
bool skippedResults{false};
for (std::size_t i = 0u, n = gatherer.numberFeatures(); i < n; ++i) {
model_t::EFeature feature = gatherer.feature(i);
if (model_t::isCategorical(feature)) {
continue;
}
const TFeatureData* data = this->featureData(feature, pid, startTime);
if (!data) {
continue;
}
if (this->shouldSkipUpdate(feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID,
model_t::sampleTime(feature, startTime, bucketLength))) {
result.s_ShouldUpdateQuantiles = false;
}
if (this->shouldIgnoreResult(
feature, result.s_ResultType, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID,
model_t::sampleTime(feature, startTime, bucketLength))) {
skippedResults = true;
continue;
}
addPersonProbability = true;
LOG_TRACE(<< "value(" << this->personName(pid) << ") = " << data->print());
if (this->correlates(feature, pid, startTime)) {
CProbabilityAndInfluenceCalculator::SCorrelateParams params(partitioningFields);
TStrCRefDouble1VecDouble1VecPrPrVecVecVec influenceValues;
this->fill(feature, pid, startTime, result.isInterim(), params, influenceValues);
this->addProbabilityAndInfluences(pid, params, influenceValues,
pFeatures, resultBuilder);
} else {
CProbabilityAndInfluenceCalculator::SParams params(partitioningFields);
if (this->fill(feature, pid, startTime, result.isInterim(), params)) {
this->addProbabilityAndInfluences(pid, params, data->s_InfluenceValues,
pFeatures, resultBuilder);
}
}
}
TOptionalUInt64 count = this->currentBucketCount(pid, startTime);
pJoint.add(pFeatures);
if (addPersonProbability && count && *count != 0) {
double p;
if (m_Probabilities.lookup(pid, p)) {
LOG_TRACE(<< "P(" << gatherer.personName(pid) << ") = " << p);
pJoint.addProbability(p);
}
}
double p{1.0};
if (skippedResults && pJoint.empty()) {
// This means we have skipped results for all features.
// We set the probability to 1.0 here to ensure the
// quantiles are updated accordingly.
} else if (pJoint.empty()) {
LOG_TRACE(<< "No samples in [" << startTime << "," << endTime << ")");
return false;
} else if (!pJoint.calculate(p, result.s_Influences)) {
LOG_ERROR(<< "Failed to compute probability");
return false;
}
LOG_TRACE(<< "probability(" << this->personName(pid) << ") = " << p);
resultBuilder.probability(p);
double multiBucketImpact{-1.0 * CAnomalyDetectorModelConfig::MAXIMUM_MULTI_BUCKET_IMPACT_MAGNITUDE};
if (pJoint.calculateMultiBucketImpact(multiBucketImpact)) {
resultBuilder.multiBucketImpact(multiBucketImpact);
}
resultBuilder.anomalyScoreExplanation() = pJoint.anomalyScoreExplanation();
auto& anomalyScoreExplanation{resultBuilder.anomalyScoreExplanation()};
bool everSeenBefore = this->firstBucketTimes()[pid] != startTime;
auto typicalConcentration = m_Probabilities.medianConcentration();
double actualConcentration;
if (m_ProbabilityPrior.concentration(pid, actualConcentration) &&
typicalConcentration.has_value()) {
anomalyScoreExplanation.s_ByFieldActualConcentration = actualConcentration;
anomalyScoreExplanation.s_ByFieldTypicalConcentration =
typicalConcentration.value();
}
anomalyScoreExplanation.s_ByFieldFirstOccurrence = !everSeenBefore;
resultBuilder.build();
return true;
}
std::uint64_t CEventRateModel::checksum(bool includeCurrentBucketStats) const {
using TStrCRefUInt64Map =
std::map<TStrCRef, std::uint64_t, maths::common::COrderings::SLess>;
std::uint64_t seed = this->CIndividualModel::checksum(includeCurrentBucketStats);
TStrCRefUInt64Map hashes;
const TDoubleVec& categories = m_ProbabilityPrior.categories();
const TDoubleVec& concentrations = m_ProbabilityPrior.concentrations();
for (std::size_t i = 0; i < categories.size(); ++i) {
std::uint64_t& hash =
hashes[std::cref(this->personName(static_cast<std::size_t>(categories[i])))];
hash = maths::common::CChecksum::calculate(hash, concentrations[i]);
}
if (includeCurrentBucketStats) {
for (const auto& featureData_ : m_CurrentBucketStats.s_FeatureData) {
for (const auto& data : featureData_.second) {
std::uint64_t& hash = hashes[std::cref(this->personName(data.first))];
hash = maths::common::CChecksum::calculate(hash, data.second.s_Count);
}
}
}
LOG_TRACE(<< "seed = " << seed);
LOG_TRACE(<< "hashes = " << hashes);
return maths::common::CChecksum::calculate(seed, hashes);
}
void CEventRateModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CEventRateModel");
this->CIndividualModel::debugMemoryUsage(mem->addChild());
core::memory_debug::dynamicSize("m_CurrentBucketStats.s_PersonCounts",
m_CurrentBucketStats.s_PersonCounts, mem);
core::memory_debug::dynamicSize("m_CurrentBucketStats.s_FeatureData",
m_CurrentBucketStats.s_FeatureData, mem);
core::memory_debug::dynamicSize("m_CurrentBucketStats.s_InterimCorrections",
m_CurrentBucketStats.s_InterimCorrections, mem);
core::memory_debug::dynamicSize("m_CurrentBucketStats.s_Annotations",
m_CurrentBucketStats.s_Annotations, mem);
core::memory_debug::dynamicSize("s_Probabilities", m_Probabilities, mem);
core::memory_debug::dynamicSize("m_ProbabilityPrior", m_ProbabilityPrior, mem);
core::memory_debug::dynamicSize("m_InterimBucketCorrector",
m_InterimBucketCorrector, mem);
}
std::size_t CEventRateModel::memoryUsage() const {
return this->CIndividualModel::memoryUsage();
}
std::size_t CEventRateModel::staticSize() const {
return sizeof(*this);
}
std::size_t CEventRateModel::computeMemoryUsage() const {
std::size_t mem = this->CIndividualModel::computeMemoryUsage();
mem += core::memory::dynamicSize(m_CurrentBucketStats.s_PersonCounts);
mem += core::memory::dynamicSize(m_CurrentBucketStats.s_FeatureData);
mem += core::memory::dynamicSize(m_CurrentBucketStats.s_InterimCorrections);
mem += core::memory::dynamicSize(m_CurrentBucketStats.s_Annotations);
mem += core::memory::dynamicSize(m_Probabilities);
mem += core::memory::dynamicSize(m_ProbabilityPrior);
mem += core::memory::dynamicSize(m_InterimBucketCorrector);
return mem;
}
CEventRateModel::TModelDetailsViewUPtr CEventRateModel::details() const {
return TModelDetailsViewUPtr(new CEventRateModelDetailsView(*this));
}
const CEventRateModel::TFeatureData*
CEventRateModel::featureData(model_t::EFeature feature, std::size_t pid, core_t::TTime time) const {
return this->CIndividualModel::featureData(feature, pid, time,
m_CurrentBucketStats.s_FeatureData);
}
const CEventRateModel::TAnnotationVec& CEventRateModel::annotations() const {
return m_CurrentBucketStats.s_Annotations;
}
core_t::TTime CEventRateModel::currentBucketStartTime() const {
return m_CurrentBucketStats.s_StartTime;
}
void CEventRateModel::currentBucketStartTime(core_t::TTime time) {
m_CurrentBucketStats.s_StartTime = time;
}
const CEventRateModel::TSizeUInt64PrVec& CEventRateModel::currentBucketPersonCounts() const {
return m_CurrentBucketStats.s_PersonCounts;
}
CEventRateModel::TSizeUInt64PrVec& CEventRateModel::currentBucketPersonCounts() {
return m_CurrentBucketStats.s_PersonCounts;
}
CIndividualModel::TFeatureSizeSizeTrDouble1VecUMap&
CEventRateModel::currentBucketInterimCorrections() const {
return m_CurrentBucketStats.s_InterimCorrections;
}
void CEventRateModel::clearPrunedResources(const TSizeVec& people, const TSizeVec& attributes) {
CDataGatherer& gatherer = this->dataGatherer();
// Stop collecting for these people and add them to the free list.
gatherer.recyclePeople(people);
if (gatherer.dataAvailable(m_CurrentBucketStats.s_StartTime)) {
gatherer.featureData(m_CurrentBucketStats.s_StartTime, gatherer.bucketLength(),
m_CurrentBucketStats.s_FeatureData);
}
TDoubleVec categoriesToRemove;
categoriesToRemove.reserve(people.size());
for (std::size_t i = 0; i < people.size(); ++i) {
categoriesToRemove.push_back(static_cast<double>(people[i]));
}
m_ProbabilityPrior.removeCategories(categoriesToRemove);
m_Probabilities = TCategoryProbabilityCache(m_ProbabilityPrior);
this->CIndividualModel::clearPrunedResources(people, attributes);
}
const CInterimBucketCorrector& CEventRateModel::interimValueCorrector() const {
return *m_InterimBucketCorrector;
}
bool CEventRateModel::correlates(model_t::EFeature feature, std::size_t pid, core_t::TTime time) const {
if (model_t::dimension(feature) > 1 || !this->params().s_MultivariateByFields) {
return false;
}
const maths::common::CModel* model{this->model(feature, pid)};
for (const auto& correlate : model->correlates()) {
if (this->featureData(
feature, pid == correlate[0] ? correlate[1] : correlate[0], time)) {
return true;
}
}
return false;
}
bool CEventRateModel::fill(model_t::EFeature feature,
std::size_t pid,
core_t::TTime bucketTime,
bool interim,
CProbabilityAndInfluenceCalculator::SParams& params) const {
const TFeatureData* data{this->featureData(feature, pid, bucketTime)};
if (data == nullptr) {
LOG_TRACE(<< "Feature data unexpectedly null");
return false;
}
const maths::common::CModel* model{this->model(feature, pid)};
if (model == nullptr) {
LOG_TRACE(<< "Model unexpectedly null");
return false;
}
core_t::TTime time{model_t::sampleTime(feature, bucketTime, this->bucketLength())};
double value{model_t::offsetCountToZero(feature, static_cast<double>(data->s_Count))};
maths_t::TDouble2VecWeightsAry weights{[&] {
TDouble2Vec result;
model->seasonalWeight(maths::common::DEFAULT_SEASONAL_CONFIDENCE_INTERVAL,
time, result);
return maths_t::seasonalVarianceScaleWeight(result);
}()};
double initialCountWeight{this->initialCountWeight(
feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, time)};
params.s_Feature = feature;
params.s_Model = model;
params.s_ElapsedTime = bucketTime - this->firstBucketTimes()[pid];
params.s_Time.assign(1, TTime2Vec{time});
params.s_Value.assign(1, TDouble2Vec{value});
if (interim && model_t::requiresInterimResultAdjustment(feature)) {
double mode{params.s_Model->mode(time, weights)[0]};
TDouble2Vec correction{this->interimValueCorrector().corrections(mode, value)};
params.s_Value[0] += correction;
this->currentBucketInterimCorrections().emplace(
core::make_triple(feature, pid, pid), correction);
}
params.s_Count = 1.0;
params.s_ComputeProbabilityParams
.addCalculation(model_t::probabilityCalculation(feature))
.addWeights(weights)
.initialCountWeight(initialCountWeight);
return true;
}
void CEventRateModel::fill(model_t::EFeature feature,
std::size_t pid,
core_t::TTime bucketTime,
bool interim,
CProbabilityAndInfluenceCalculator::SCorrelateParams& params,
TStrCRefDouble1VecDouble1VecPrPrVecVecVec& influenceValues) const {
using TStrCRefDouble1VecDoublePrPr = std::pair<TStrCRef, TDouble1VecDoublePr>;
const CDataGatherer& gatherer{this->dataGatherer()};
const maths::common::CModel* model{this->model(feature, pid)};
const TSize2Vec1Vec& correlates{model->correlates()};
const TTimeVec& firstBucketTimes{this->firstBucketTimes()};
core_t::TTime time{model_t::sampleTime(feature, bucketTime, gatherer.bucketLength())};
double initialCountWeight{this->initialCountWeight(
feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, time)};
params.s_Feature = feature;
params.s_Model = model;
params.s_ElapsedTime = std::numeric_limits<core_t::TTime>::max();
params.s_Times.resize(correlates.size());
params.s_Values.resize(correlates.size());
params.s_Counts.resize(correlates.size());
params.s_Variables.resize(correlates.size());
params.s_CorrelatedLabels.resize(correlates.size());
params.s_Correlated.resize(correlates.size());
params.s_ComputeProbabilityParams
.addCalculation(model_t::probabilityCalculation(feature))
.initialCountWeight(initialCountWeight);
// These are indexed as follows:
// influenceValues["influencer name"]["correlate"]["influence value"]
// This is because we aren't guaranteed that each influence is present for
// each feature.
influenceValues.resize(
this->featureData(feature, pid, bucketTime)->s_InfluenceValues.size(),
TStrCRefDouble1VecDouble1VecPrPrVecVec(correlates.size()));
// Declared outside the loop to minimize the number of times it is created.
TDouble1VecDouble1VecPr value;
TDouble2Vec seasonalWeights[2];
for (std::size_t i = 0; i < correlates.size(); ++i) {
TSize2Vec variables = pid == correlates[i][0] ? TSize2Vec{0, 1}
: TSize2Vec{1, 0};
params.s_CorrelatedLabels[i] = gatherer.personName(correlates[i][variables[1]]);
params.s_Correlated[i] = correlates[i][variables[1]];
params.s_Variables[i] = variables;
const maths::common::CModel* models[]{
model, this->model(feature, correlates[i][variables[1]])};
models[0]->seasonalWeight(maths::common::DEFAULT_SEASONAL_CONFIDENCE_INTERVAL,
time, seasonalWeights[0]);
models[1]->seasonalWeight(maths::common::DEFAULT_SEASONAL_CONFIDENCE_INTERVAL,
time, seasonalWeights[1]);
maths_t::TDouble2Vec seasonalWeight(2);
seasonalWeight[variables[0]] = seasonalWeights[0][0];
seasonalWeight[variables[1]] = seasonalWeights[1][0];
params.s_ComputeProbabilityParams.addWeights(
maths_t::seasonalVarianceScaleWeight(seasonalWeight));
const TFeatureData* data[2];
data[0] = this->featureData(feature, correlates[i][0], bucketTime);
data[1] = this->featureData(feature, correlates[i][1], bucketTime);
if (data[0] && data[1]) {
params.s_ElapsedTime = std::min(
params.s_ElapsedTime, bucketTime - firstBucketTimes[correlates[i][0]]);
params.s_ElapsedTime = std::min(
params.s_ElapsedTime, bucketTime - firstBucketTimes[correlates[i][1]]);
params.s_Times[i] = TTime2Vec(2, time);
params.s_Values[i] = TDouble2Vec{
model_t::offsetCountToZero(feature, static_cast<double>(data[0]->s_Count)),
model_t::offsetCountToZero(feature, static_cast<double>(data[1]->s_Count))};
for (std::size_t j = 0; j < data[0]->s_InfluenceValues.size(); ++j) {
for (const auto& influenceValue : data[0]->s_InfluenceValues[j]) {
TStrCRef influence = influenceValue.first;
std::size_t match = static_cast<std::size_t>(
std::find_if(data[1]->s_InfluenceValues[j].begin(),
data[1]->s_InfluenceValues[j].end(),
[influence](const TStrCRefDouble1VecDoublePrPr& value_) {
return value_.first.get() == influence.get();
}) -
data[1]->s_InfluenceValues[j].begin());
if (match < data[1]->s_InfluenceValues[j].size()) {
const TDouble1VecDoublePr& value0 = influenceValue.second;
const TDouble1VecDoublePr& value1 =
data[1]->s_InfluenceValues[j][match].second;
value.first = TDouble1Vec{value0.first[0], value1.first[0]};
value.second = TDouble1Vec{value0.second, value1.second};
influenceValues[j][i].emplace_back(influence, value);
}
}
}
}
}
if (interim && model_t::requiresInterimResultAdjustment(feature)) {
TDouble2Vec1Vec modes = params.s_Model->correlateModes(
time, params.s_ComputeProbabilityParams.weights());
for (std::size_t i = 0; i < modes.size(); ++i) {
TDouble2Vec& value_ = params.s_Values[i];
if (!value_.empty()) {
TDouble2Vec correction(
this->interimValueCorrector().corrections(modes[i], value_));
value_ += correction;
this->currentBucketInterimCorrections().emplace(
core::make_triple(feature, pid, params.s_Correlated[i]),
TDouble1Vec{correction[params.s_Variables[i][0]]});
}
}
}
}
void CEventRateModel::addAnnotation(core_t::TTime time,
CAnnotation::EEvent type,
const std::string& annotation) {
m_CurrentBucketStats.s_Annotations.emplace_back(
time, type, annotation, this->dataGatherer().searchKey().detectorIndex(), EMPTY_STRING,
EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING);
}
////////// CEventRateModel::SBucketStats Implementation //////////
CEventRateModel::SBucketStats::SBucketStats(core_t::TTime startTime)
: s_StartTime(startTime), s_InterimCorrections(1) {
}
}
}