lib/model/CMetricModel.cc (636 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CMetricModel.h>
#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <core/CStatePersistInserter.h>
#include <core/CStateRestoreTraverser.h>
#include <core/CoreTypes.h>
#include <maths/common/CChecksum.h>
#include <maths/common/COrderings.h>
#include <maths/common/CTools.h>
#include <maths/common/ProbabilityAggregators.h>
#include <model/CAnnotatedProbabilityBuilder.h>
#include <model/CAnnotation.h>
#include <model/CAnomalyDetectorModel.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CDataGatherer.h>
#include <model/CGathererTools.h>
#include <model/CIndividualModelDetail.h>
#include <model/CInterimBucketCorrector.h>
#include <model/CModelDetailsView.h>
#include <model/CProbabilityAndInfluenceCalculator.h>
#include <model/CSearchKey.h>
#include <model/FrequencyPredicates.h>
#include <algorithm>
#include <map>
#include <string>
#include <utility>
#include <vector>
namespace ml {
namespace model {
namespace {
using TTime1Vec = core::CSmallVector<core_t::TTime, 1>;
using TDouble1Vec = core::CSmallVector<double, 1>;
using TDouble2Vec = core::CSmallVector<double, 2>;
using TDouble2Vec1Vec = core::CSmallVector<TDouble2Vec, 1>;
// We use short field names to reduce the state size
const std::string INDIVIDUAL_STATE_TAG("a");
}
CMetricModel::CMetricModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TFeatureMathsModelSPtrPrVec& newFeatureModels,
const TFeatureMultivariatePriorSPtrPrVec& newFeatureCorrelateModelPriors,
TFeatureCorrelationsPtrPrVec&& featureCorrelatesModels,
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators,
const TInterimBucketCorrectorCPtr& interimBucketCorrector)
: CIndividualModel(params,
dataGatherer,
newFeatureModels,
newFeatureCorrelateModelPriors,
std::move(featureCorrelatesModels),
influenceCalculators),
m_CurrentBucketStats(CAnomalyDetectorModel::TIME_UNSET),
m_InterimBucketCorrector(interimBucketCorrector) {
}
CMetricModel::CMetricModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TFeatureMathsModelSPtrPrVec& newFeatureModels,
const TFeatureMultivariatePriorSPtrPrVec& newFeatureCorrelateModelPriors,
TFeatureCorrelationsPtrPrVec&& featureCorrelatesModels,
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators,
const TInterimBucketCorrectorCPtr& interimBucketCorrector,
core::CStateRestoreTraverser& traverser)
: CIndividualModel(params,
dataGatherer,
newFeatureModels,
newFeatureCorrelateModelPriors,
std::move(featureCorrelatesModels),
influenceCalculators),
m_CurrentBucketStats(CAnomalyDetectorModel::TIME_UNSET),
m_InterimBucketCorrector(interimBucketCorrector) {
if (traverser.traverseSubLevel(std::bind(&CMetricModel::acceptRestoreTraverser,
this, std::placeholders::_1)) == false) {
traverser.setBadState();
}
}
CMetricModel::CMetricModel(bool isForPersistence, const CMetricModel& other)
: CIndividualModel(isForPersistence, other),
m_CurrentBucketStats(0) // Not needed for persistence so minimally constructed
{
if (!isForPersistence) {
LOG_ABORT(<< "This constructor only creates clones for persistence");
}
}
void CMetricModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertLevel(INDIVIDUAL_STATE_TAG, std::bind(&CMetricModel::doAcceptPersistInserter,
this, std::placeholders::_1));
}
bool CMetricModel::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
do {
const std::string& name = traverser.name();
RESTORE(INDIVIDUAL_STATE_TAG,
traverser.traverseSubLevel(std::bind(&CMetricModel::doAcceptRestoreTraverser,
this, std::placeholders::_1)))
} while (traverser.next());
return true;
}
CAnomalyDetectorModel* CMetricModel::cloneForPersistence() const {
return new CMetricModel(true, *this);
}
model_t::EModelType CMetricModel::category() const {
return model_t::E_MetricOnline;
}
bool CMetricModel::isEventRate() const {
return false;
}
bool CMetricModel::isMetric() const {
return true;
}
void CMetricModel::currentBucketPersonIds(core_t::TTime time, TSizeVec& result) const {
this->CIndividualModel::currentBucketPersonIds(
time, m_CurrentBucketStats.s_FeatureData, result);
}
CMetricModel::TOptionalDouble CMetricModel::baselineBucketCount(const std::size_t /*pid*/) const {
return TOptionalDouble();
}
CMetricModel::TDouble1Vec CMetricModel::currentBucketValue(model_t::EFeature feature,
std::size_t pid,
std::size_t /*cid*/,
core_t::TTime time) const {
const TFeatureData* data = this->featureData(feature, pid, time);
if (data != nullptr) {
const TOptionalSample& value = data->s_BucketValue;
return value ? value->value(model_t::dimension(feature)) : TDouble1Vec();
}
return TDouble1Vec();
}
CMetricModel::TDouble1Vec CMetricModel::baselineBucketMean(model_t::EFeature feature,
std::size_t pid,
std::size_t /*cid*/,
model_t::CResultType type,
const TSizeDoublePr1Vec& correlated,
core_t::TTime time) const {
const maths::common::CModel* model{this->model(feature, pid)};
if (model == nullptr) {
return TDouble1Vec();
}
static const TSizeDoublePr1Vec NO_CORRELATED;
TDouble1Vec result(model->predict(time, type.isUnconditional() ? NO_CORRELATED : correlated));
this->correctBaselineForInterim(feature, pid, type, correlated,
this->currentBucketInterimCorrections(), result);
TDouble1VecDouble1VecPr support = model_t::support(feature);
return maths::common::CTools::truncate(result, support.first, support.second);
}
void CMetricModel::sampleBucketStatistics(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) {
this->createUpdateNewModels(startTime, resourceMonitor);
m_CurrentBucketStats.s_InterimCorrections.clear();
this->CIndividualModel::sampleBucketStatistics(
startTime, endTime, this->personFilter(),
m_CurrentBucketStats.s_FeatureData, resourceMonitor);
}
void CMetricModel::sample(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) {
CDataGatherer& gatherer = this->dataGatherer();
core_t::TTime bucketLength = gatherer.bucketLength();
m_CurrentBucketStats.s_StartTime = std::max(m_CurrentBucketStats.s_StartTime, startTime);
this->createUpdateNewModels(startTime, resourceMonitor);
m_CurrentBucketStats.s_InterimCorrections.clear();
m_CurrentBucketStats.s_Annotations.clear();
m_CurrentBucketStats.s_FeatureData.clear();
m_CurrentBucketStats.s_PersonCounts.clear();
if (!gatherer.validateSampleTimes(startTime, endTime)) {
return;
}
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")");
gatherer.sampleNow(time);
gatherer.featureData(time, bucketLength, m_CurrentBucketStats.s_FeatureData);
const TTimeVec& preSampleLastBucketTimes = this->lastBucketTimes();
TSizeTimeUMap lastBucketTimesMap;
for (const auto& featureData : m_CurrentBucketStats.s_FeatureData) {
for (const auto& data : featureData.second) {
std::size_t pid = data.first;
lastBucketTimesMap[pid] = preSampleLastBucketTimes[pid];
}
}
this->CIndividualModel::sample(time, time + bucketLength, resourceMonitor);
// Declared outside the loop to minimize the number of times they are created.
maths::common::CModel::TTimeDouble2VecSizeTrVec values;
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec trendWeights;
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec priorWeights;
for (auto& featureData : m_CurrentBucketStats.s_FeatureData) {
model_t::EFeature feature = featureData.first;
TSizeFeatureDataPrVec& data = featureData.second;
std::size_t dimension = model_t::dimension(feature);
LOG_TRACE(<< model_t::print(feature) << " data = " << data);
this->applyFilter(model_t::E_XF_By, true, this->personFilter(), data);
for (const auto& data_ : data) {
std::size_t pid = data_.first;
const CGathererTools::TSampleVec& samples = data_.second.s_Samples;
maths::common::CModel* model = this->model(feature, pid);
if (model == nullptr) {
LOG_ERROR(<< "Missing model for " << this->personName(pid));
continue;
}
// initialCountWeight returns a weight value as double:
// 0.0 if checkScheduledEvents is true
// 1.0 if both checkScheduledEvents and checkRules are false
// A small weight - 0.005 - if checkRules is true.
// This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier.
// This reduces the impact of the values affected by the skip_model_update rule
// on the model while not completely ignoring them. This still allows the model to
// learn from the affected values - addressing point 1. and 2. in
// https://github.com/elastic/ml-cpp/issues/1272, Namely
// 1. If you apply it from the start of the modelling it can stop the model learning anything at all.
// 2. It can stop the model ever adapting to some change in data characteristics
core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength);
double initialCountWeight{this->initialCountWeight(
feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, sampleTime)};
if (initialCountWeight == 0.0) {
model->skipTime(time - lastBucketTimesMap[pid]);
continue;
}
const TOptionalSample& bucket = data_.second.s_BucketValue;
if (model_t::isSampled(feature) && bucket != std::nullopt) {
values.assign(1, core::make_triple(
bucket->time(), TDouble2Vec(bucket->value(dimension)),
model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID));
model->addBucketValue(values);
}
// For sparse data we reduce the impact of samples from empty buckets.
// In effect, we smoothly transition to modeling only values from non-empty
// buckets as the data becomes sparse.
double emptyBucketWeight = this->emptyBucketWeight(feature, pid, time);
if (emptyBucketWeight == 0.0) {
continue;
}
std::size_t n = samples.size();
double countWeight =
(this->params().s_MaximumUpdatesPerBucket > 0.0 && n > 0
? this->params().s_MaximumUpdatesPerBucket / static_cast<double>(n)
: 1.0) *
this->learnRate(feature) * initialCountWeight;
double outlierWeightDerate = this->derate(pid, sampleTime);
// Note we need to scale the amount of data we'll "age out" of the residual
// model in one bucket by the empty bucket weight so the posterior doesn't
// end up too flat.
double scaledInterval = emptyBucketWeight;
double scaledCountWeight = emptyBucketWeight * countWeight;
LOG_TRACE(<< "Bucket = " << gatherer.printCurrentBucket()
<< ", feature = " << model_t::print(feature) << ", samples = "
<< samples << ", isInteger = " << data_.second.s_IsInteger
<< ", person = " << this->personName(pid)
<< ", dimension = " << dimension << ", count weight = " << countWeight
<< ", scaled count weight = " << scaledCountWeight
<< ", scaled interval = " << scaledInterval);
values.resize(n);
trendWeights.resize(n, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
priorWeights.resize(n, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
for (std::size_t i = 0; i < n; ++i) {
core_t::TTime ithSampleTime = samples[i].time();
TDouble2Vec ithSampleValue(samples[i].value(dimension));
double countVarianceScale = samples[i].varianceScale();
values[i] = core::make_triple(
model_t::sampleTime(feature, time, bucketLength, ithSampleTime),
ithSampleValue, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID);
model->countWeights(ithSampleTime, ithSampleValue,
countWeight, scaledCountWeight,
outlierWeightDerate, countVarianceScale,
trendWeights[i], priorWeights[i]);
}
auto annotationCallback = [&](const std::string& annotation) {
if (this->params().s_AnnotationsEnabled) {
m_CurrentBucketStats.s_Annotations.emplace_back(
time, CAnnotation::E_ModelChange, annotation,
gatherer.searchKey().detectorIndex(),
gatherer.searchKey().partitionFieldName(),
gatherer.partitionFieldValue(),
gatherer.searchKey().overFieldName(), EMPTY_STRING,
gatherer.searchKey().byFieldName(), gatherer.personName(pid));
}
};
maths::common::CModelAddSamplesParams params;
auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor);
params.isInteger(data_.second.s_IsInteger)
.isNonNegative(data_.second.s_IsNonNegative)
.propagationInterval(scaledInterval)
.trendWeights(trendWeights)
.priorWeights(priorWeights)
.bucketOccupancy(model_t::includeEmptyBuckets(feature)
? this->personFrequency(pid)
: 1.0)
.firstValueTime(pid < this->firstBucketTimes().size()
? this->firstBucketTimes()[pid]
: std::numeric_limits<core_t::TTime>::min())
.annotationCallback([&](const std::string& annotation) {
annotationCallback(annotation);
})
.memoryCircuitBreaker(circuitBreaker);
if (model->addSamples(params, values) == maths::common::CModel::E_Reset) {
gatherer.resetSampleCount(pid);
}
}
}
this->sampleCorrelateModels();
}
}
bool CMetricModel::computeProbability(const std::size_t pid,
core_t::TTime startTime,
core_t::TTime endTime,
CPartitioningFields& partitioningFields,
const std::size_t /*numberAttributeProbabilities*/,
SAnnotatedProbability& result) const {
CAnnotatedProbabilityBuilder resultBuilder(result);
const CDataGatherer& gatherer = this->dataGatherer();
core_t::TTime bucketLength = gatherer.bucketLength();
if (endTime != startTime + bucketLength) {
LOG_ERROR(<< "Can only compute probability for single bucket");
return false;
}
if (pid >= this->firstBucketTimes().size()) {
LOG_ERROR(<< "No first time for person = " << gatherer.personName(pid));
return false;
}
CProbabilityAndInfluenceCalculator pJoint(this->params().s_InfluenceCutoff);
pJoint.addAggregator(maths::common::CJointProbabilityOfLessLikelySamples());
pJoint.addAggregator(maths::common::CProbabilityOfExtremeSample());
bool skippedResults{false};
for (std::size_t i = 0, n = gatherer.numberFeatures(); i < n; ++i) {
model_t::EFeature feature = gatherer.feature(i);
if (model_t::isCategorical(feature)) {
continue;
}
const TFeatureData* data = this->featureData(feature, pid, startTime);
if ((data == nullptr) || !data->s_BucketValue) {
continue;
}
const TOptionalSample& bucket = data->s_BucketValue;
if (this->shouldSkipUpdate(feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID,
model_t::sampleTime(feature, startTime, bucketLength))) {
result.s_ShouldUpdateQuantiles = false;
}
if (this->shouldIgnoreResult(
feature, result.s_ResultType, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID,
model_t::sampleTime(feature, startTime, bucketLength, bucket->time()))) {
skippedResults = true;
continue;
}
LOG_TRACE(<< "Compute probability for " << data->print());
if (this->correlates(feature, pid, startTime)) {
CProbabilityAndInfluenceCalculator::SCorrelateParams params(partitioningFields);
TStrCRefDouble1VecDouble1VecPrPrVecVecVec influenceValues;
this->fill(feature, pid, startTime, result.isInterim(), params, influenceValues);
this->addProbabilityAndInfluences(pid, params, influenceValues,
pJoint, resultBuilder);
} else {
CProbabilityAndInfluenceCalculator::SParams params(partitioningFields);
if (this->fill(feature, pid, startTime, result.isInterim(), params)) {
this->addProbabilityAndInfluences(pid, params, data->s_InfluenceValues,
pJoint, resultBuilder);
}
}
}
double p{1.0};
if (skippedResults && pJoint.empty()) {
// This means we have skipped results for all features.
// We set the probability to 1.0 here to ensure the
// quantiles are updated accordingly.
} else if (pJoint.empty()) {
LOG_TRACE(<< "No samples in [" << startTime << "," << endTime << ")");
return false;
} else if (!pJoint.calculate(p, result.s_Influences)) {
LOG_ERROR(<< "Failed to compute probability");
return false;
}
LOG_TRACE(<< "probability(" << this->personName(pid) << ") = " << p);
resultBuilder.probability(p);
resultBuilder.anomalyScoreExplanation() = pJoint.anomalyScoreExplanation();
double multiBucketImpact{-1.0 * CAnomalyDetectorModelConfig::MAXIMUM_MULTI_BUCKET_IMPACT_MAGNITUDE};
if (pJoint.calculateMultiBucketImpact(multiBucketImpact)) {
resultBuilder.multiBucketImpact(multiBucketImpact);
}
resultBuilder.build();
return true;
}
std::uint64_t CMetricModel::checksum(bool includeCurrentBucketStats) const {
using TStrCRefUInt64Map =
std::map<TStrCRef, std::uint64_t, maths::common::COrderings::SLess>;
std::uint64_t seed = this->CIndividualModel::checksum(includeCurrentBucketStats);
#define KEY(pid) std::cref(this->personName(pid))
TStrCRefUInt64Map hashes;
if (includeCurrentBucketStats) {
const TFeatureSizeFeatureDataPrVecPrVec& featureData =
m_CurrentBucketStats.s_FeatureData;
for (std::size_t i = 0; i < featureData.size(); ++i) {
for (std::size_t j = 0; j < featureData[i].second.size(); ++j) {
std::uint64_t& hash = hashes[KEY(featureData[i].second[j].first)];
const TFeatureData& data = featureData[i].second[j].second;
hash = maths::common::CChecksum::calculate(hash, data.s_BucketValue);
hash = core::CHashing::hashCombine(
hash, static_cast<std::uint64_t>(data.s_IsInteger));
hash = maths::common::CChecksum::calculate(hash, data.s_Samples);
}
}
}
#undef KEY
LOG_TRACE(<< "seed = " << seed);
LOG_TRACE(<< "hashes = " << hashes);
return maths::common::CChecksum::calculate(seed, hashes);
}
void CMetricModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CMetricModel");
this->CIndividualModel::debugMemoryUsage(mem->addChild());
core::memory_debug::dynamicSize("m_CurrentBucketStats.s_PersonCounts",
m_CurrentBucketStats.s_PersonCounts, mem);
core::memory_debug::dynamicSize("m_CurrentBucketStats.s_FeatureData",
m_CurrentBucketStats.s_FeatureData, mem);
core::memory_debug::dynamicSize("m_CurrentBucketStats.s_InterimCorrections",
m_CurrentBucketStats.s_InterimCorrections, mem);
core::memory_debug::dynamicSize("m_CurrentBucketStats.s_Annotations",
m_CurrentBucketStats.s_Annotations, mem);
core::memory_debug::dynamicSize("m_InterimBucketCorrector",
m_InterimBucketCorrector, mem);
}
std::size_t CMetricModel::memoryUsage() const {
return this->CIndividualModel::memoryUsage();
}
std::size_t CMetricModel::computeMemoryUsage() const {
std::size_t mem = this->CIndividualModel::computeMemoryUsage();
mem += core::memory::dynamicSize(m_CurrentBucketStats.s_PersonCounts);
mem += core::memory::dynamicSize(m_CurrentBucketStats.s_FeatureData);
mem += core::memory::dynamicSize(m_CurrentBucketStats.s_InterimCorrections);
mem += core::memory::dynamicSize(m_CurrentBucketStats.s_Annotations);
mem += core::memory::dynamicSize(m_InterimBucketCorrector);
return mem;
}
std::size_t CMetricModel::staticSize() const {
return sizeof(*this);
}
CMetricModel::TModelDetailsViewUPtr CMetricModel::details() const {
return TModelDetailsViewUPtr(new CMetricModelDetailsView(*this));
}
const CMetricModel::TFeatureData*
CMetricModel::featureData(model_t::EFeature feature, std::size_t pid, core_t::TTime time) const {
return this->CIndividualModel::featureData(feature, pid, time,
m_CurrentBucketStats.s_FeatureData);
}
const CMetricModel::TAnnotationVec& CMetricModel::annotations() const {
return m_CurrentBucketStats.s_Annotations;
}
core_t::TTime CMetricModel::currentBucketStartTime() const {
return m_CurrentBucketStats.s_StartTime;
}
void CMetricModel::currentBucketStartTime(core_t::TTime time) {
m_CurrentBucketStats.s_StartTime = time;
}
CIndividualModel::TFeatureSizeSizeTrDouble1VecUMap&
CMetricModel::currentBucketInterimCorrections() const {
return m_CurrentBucketStats.s_InterimCorrections;
}
const CMetricModel::TSizeUInt64PrVec& CMetricModel::currentBucketPersonCounts() const {
return m_CurrentBucketStats.s_PersonCounts;
}
CMetricModel::TSizeUInt64PrVec& CMetricModel::currentBucketPersonCounts() {
return m_CurrentBucketStats.s_PersonCounts;
}
void CMetricModel::clearPrunedResources(const TSizeVec& people, const TSizeVec& attributes) {
CDataGatherer& gatherer = this->dataGatherer();
// Stop collecting for these people and add them to the free list.
gatherer.recyclePeople(people);
if (gatherer.dataAvailable(m_CurrentBucketStats.s_StartTime)) {
gatherer.featureData(m_CurrentBucketStats.s_StartTime, gatherer.bucketLength(),
m_CurrentBucketStats.s_FeatureData);
}
this->CIndividualModel::clearPrunedResources(people, attributes);
}
const CInterimBucketCorrector& CMetricModel::interimValueCorrector() const {
return *m_InterimBucketCorrector;
}
bool CMetricModel::correlates(model_t::EFeature feature, std::size_t pid, core_t::TTime time) const {
if (model_t::dimension(feature) > 1 || !this->params().s_MultivariateByFields) {
return false;
}
const maths::common::CModel* model{this->model(feature, pid)};
for (const auto& correlate : model->correlates()) {
if (this->featureData(
feature, pid == correlate[0] ? correlate[1] : correlate[0], time)) {
return true;
}
}
return false;
}
bool CMetricModel::fill(model_t::EFeature feature,
std::size_t pid,
core_t::TTime bucketTime,
bool interim,
CProbabilityAndInfluenceCalculator::SParams& params) const {
std::size_t dimension{model_t::dimension(feature)};
const TFeatureData* data{this->featureData(feature, pid, bucketTime)};
if (data == nullptr) {
LOG_TRACE(<< "data unexpectedly null");
return false;
}
const TOptionalSample& bucket{data->s_BucketValue};
const maths::common::CModel* model{this->model(feature, pid)};
if (model == nullptr) {
LOG_TRACE(<< "model unexpectedly null");
return false;
}
core_t::TTime time{model_t::sampleTime(feature, bucketTime,
this->bucketLength(), bucket->time())};
maths_t::TDouble2VecWeightsAry weights{maths_t::CUnitWeights::unit<TDouble2Vec>(dimension)};
TDouble2Vec seasonalWeight;
model->seasonalWeight(maths::common::DEFAULT_SEASONAL_CONFIDENCE_INTERVAL,
time, seasonalWeight);
maths_t::setSeasonalVarianceScale(seasonalWeight, weights);
maths_t::setCountVarianceScale(TDouble2Vec(dimension, bucket->varianceScale()), weights);
double initialCountWeight{this->initialCountWeight(
feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, time)};
params.s_Feature = feature;
params.s_Model = model;
params.s_ElapsedTime = bucketTime - this->firstBucketTimes()[pid];
params.s_Time.assign(1, TTime2Vec{time});
params.s_Value.assign(1, bucket->value());
if (interim && model_t::requiresInterimResultAdjustment(feature)) {
TDouble2Vec mode(params.s_Model->mode(time, weights));
TDouble2Vec correction(this->interimValueCorrector().corrections(
mode, bucket->value(dimension)));
params.s_Value[0] += correction;
this->currentBucketInterimCorrections().emplace(
core::make_triple(feature, pid, pid), correction);
}
params.s_Count = bucket->count();
params.s_ComputeProbabilityParams
.addCalculation(model_t::probabilityCalculation(feature))
.addWeights(weights)
.initialCountWeight(initialCountWeight);
return true;
}
void CMetricModel::fill(model_t::EFeature feature,
std::size_t pid,
core_t::TTime bucketTime,
bool interim,
CProbabilityAndInfluenceCalculator::SCorrelateParams& params,
TStrCRefDouble1VecDouble1VecPrPrVecVecVec& influenceValues) const {
using TStrCRefDouble1VecDoublePrPr = std::pair<TStrCRef, TDouble1VecDoublePr>;
const CDataGatherer& gatherer{this->dataGatherer()};
const maths::common::CModel* model{this->model(feature, pid)};
const TSize2Vec1Vec& correlates{model->correlates()};
const TTimeVec& firstBucketTimes{this->firstBucketTimes()};
core_t::TTime bucketLength{gatherer.bucketLength()};
double initialCountWeight{this->initialCountWeight(
feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID,
model_t::sampleTime(feature, bucketTime, bucketLength))};
params.s_Feature = feature;
params.s_Model = model;
params.s_ElapsedTime = std::numeric_limits<core_t::TTime>::max();
params.s_Times.resize(correlates.size());
params.s_Values.resize(correlates.size());
params.s_Counts.resize(correlates.size());
params.s_Variables.resize(correlates.size());
params.s_CorrelatedLabels.resize(correlates.size());
params.s_Correlated.resize(correlates.size());
params.s_ComputeProbabilityParams
.addCalculation(model_t::probabilityCalculation(feature))
.initialCountWeight(initialCountWeight);
// These are indexed as follows:
// influenceValues["influencer name"]["correlate"]["influence value"]
// This is because we aren't guaranteed that each influence is present for
// each feature.
influenceValues.resize(
this->featureData(feature, pid, bucketTime)->s_InfluenceValues.size(),
TStrCRefDouble1VecDouble1VecPrPrVecVec(correlates.size()));
// Declared outside the loop to minimize the number of times it is created.
TDouble1VecDouble1VecPr value;
TDouble2Vec seasonalWeights[2];
TDouble2Vec weight(2);
for (std::size_t i = 0; i < correlates.size(); ++i) {
TSize2Vec variables(pid == correlates[i][0] ? TSize2Vec{0, 1} : TSize2Vec{1, 0});
params.s_CorrelatedLabels[i] = gatherer.personName(correlates[i][variables[1]]);
params.s_Correlated[i] = correlates[i][variables[1]];
params.s_Variables[i] = variables;
const maths::common::CModel* models[]{
model, this->model(feature, correlates[i][variables[1]])};
maths_t::TDouble2VecWeightsAry weights(maths_t::CUnitWeights::unit<TDouble2Vec>(2));
models[0]->seasonalWeight(maths::common::DEFAULT_SEASONAL_CONFIDENCE_INTERVAL,
bucketTime, seasonalWeights[0]);
models[1]->seasonalWeight(maths::common::DEFAULT_SEASONAL_CONFIDENCE_INTERVAL,
bucketTime, seasonalWeights[1]);
weight[variables[0]] = seasonalWeights[0][0];
weight[variables[1]] = seasonalWeights[1][0];
maths_t::setSeasonalVarianceScale(weight, weights);
const TFeatureData* data[2];
data[0] = this->featureData(feature, correlates[i][0], bucketTime);
data[1] = this->featureData(feature, correlates[i][1], bucketTime);
if (data[0] && data[1] && data[0]->s_BucketValue && data[1]->s_BucketValue) {
const TOptionalSample& bucket0{data[0]->s_BucketValue};
const TOptionalSample& bucket1{data[1]->s_BucketValue};
core_t::TTime times[] = {
model_t::sampleTime(feature, bucketTime, bucketLength, bucket0->time()),
model_t::sampleTime(feature, bucketTime, bucketLength, bucket1->time())};
params.s_ElapsedTime = std::min(
params.s_ElapsedTime, times[0] - firstBucketTimes[correlates[i][0]]);
params.s_ElapsedTime = std::min(
params.s_ElapsedTime, times[1] - firstBucketTimes[correlates[i][1]]);
params.s_Times[i] = TTime2Vec{times[0], times[1]};
params.s_Values[i].resize(2 * bucket0->value().size());
for (std::size_t j = 0; j < bucket0->value().size(); ++j) {
params.s_Values[i][2 * j + 0] = bucket0->value()[j];
params.s_Values[i][2 * j + 1] = bucket1->value()[j];
}
params.s_Counts[i] = TDouble2Vec{bucket0->count(), bucket1->count()};
weight[variables[0]] = bucket0->varianceScale();
weight[variables[1]] = bucket1->varianceScale();
maths_t::setCountVarianceScale(weight, weights);
for (std::size_t j = 0; j < data[0]->s_InfluenceValues.size(); ++j) {
for (const auto& influenceValue : data[0]->s_InfluenceValues[j]) {
TStrCRef influence = influenceValue.first;
std::size_t match = static_cast<std::size_t>(
std::find_if(data[1]->s_InfluenceValues[j].begin(),
data[1]->s_InfluenceValues[j].end(),
[influence](const TStrCRefDouble1VecDoublePrPr& value_) {
return value_.first.get() == influence.get();
}) -
data[1]->s_InfluenceValues[j].begin());
if (match < data[1]->s_InfluenceValues[j].size()) {
const TDouble1VecDoublePr& value0 = influenceValue.second;
const TDouble1VecDoublePr& value1 =
data[1]->s_InfluenceValues[j][match].second;
value.first.resize(2 * value0.first.size());
for (std::size_t k = 0; k < value0.first.size(); ++k) {
value.first[2 * k + 0] = value0.first[k];
value.first[2 * k + 1] = value1.first[k];
}
value.second = TDouble1Vec{value0.second, value1.second};
influenceValues[j][i].emplace_back(influence, value);
}
}
}
}
params.s_ComputeProbabilityParams.addWeights(weights);
}
if (interim && model_t::requiresInterimResultAdjustment(feature)) {
core_t::TTime time{bucketTime + bucketLength / 2};
TDouble2Vec1Vec modes(params.s_Model->correlateModes(
time, params.s_ComputeProbabilityParams.weights()));
for (std::size_t i = 0; i < modes.size(); ++i) {
if (!params.s_Values.empty()) {
TDouble2Vec value_{params.s_Values[i][0], params.s_Values[i][1]};
TDouble2Vec correction(
this->interimValueCorrector().corrections(modes[i], value_));
for (std::size_t j = 0; j < 2; ++j) {
params.s_Values[i][j] += correction[j];
}
this->currentBucketInterimCorrections().emplace(
core::make_triple(feature, pid, params.s_Correlated[i]),
TDouble1Vec{correction[params.s_Variables[i][0]]});
}
}
}
}
void CMetricModel::addAnnotation(core_t::TTime time,
CAnnotation::EEvent type,
const std::string& annotation) {
m_CurrentBucketStats.s_Annotations.emplace_back(
time, type, annotation, this->dataGatherer().searchKey().detectorIndex(), EMPTY_STRING,
EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING);
}
////////// CMetricModel::SBucketStats Implementation //////////
CMetricModel::SBucketStats::SBucketStats(core_t::TTime startTime)
: s_StartTime(startTime), s_InterimCorrections(1) {
}
}
}