in lib/model/CMetricModel.cc [187:355]
void CMetricModel::sample(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) {
CDataGatherer& gatherer = this->dataGatherer();
core_t::TTime bucketLength = gatherer.bucketLength();
m_CurrentBucketStats.s_StartTime = std::max(m_CurrentBucketStats.s_StartTime, startTime);
this->createUpdateNewModels(startTime, resourceMonitor);
m_CurrentBucketStats.s_InterimCorrections.clear();
m_CurrentBucketStats.s_Annotations.clear();
m_CurrentBucketStats.s_FeatureData.clear();
m_CurrentBucketStats.s_PersonCounts.clear();
if (!gatherer.validateSampleTimes(startTime, endTime)) {
return;
}
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")");
gatherer.sampleNow(time);
gatherer.featureData(time, bucketLength, m_CurrentBucketStats.s_FeatureData);
const TTimeVec& preSampleLastBucketTimes = this->lastBucketTimes();
TSizeTimeUMap lastBucketTimesMap;
for (const auto& featureData : m_CurrentBucketStats.s_FeatureData) {
for (const auto& data : featureData.second) {
std::size_t pid = data.first;
lastBucketTimesMap[pid] = preSampleLastBucketTimes[pid];
}
}
this->CIndividualModel::sample(time, time + bucketLength, resourceMonitor);
// Declared outside the loop to minimize the number of times they are created.
maths::common::CModel::TTimeDouble2VecSizeTrVec values;
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec trendWeights;
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec priorWeights;
for (auto& featureData : m_CurrentBucketStats.s_FeatureData) {
model_t::EFeature feature = featureData.first;
TSizeFeatureDataPrVec& data = featureData.second;
std::size_t dimension = model_t::dimension(feature);
LOG_TRACE(<< model_t::print(feature) << " data = " << data);
this->applyFilter(model_t::E_XF_By, true, this->personFilter(), data);
for (const auto& data_ : data) {
std::size_t pid = data_.first;
const CGathererTools::TSampleVec& samples = data_.second.s_Samples;
maths::common::CModel* model = this->model(feature, pid);
if (model == nullptr) {
LOG_ERROR(<< "Missing model for " << this->personName(pid));
continue;
}
// initialCountWeight returns a weight value as double:
// 0.0 if checkScheduledEvents is true
// 1.0 if both checkScheduledEvents and checkRules are false
// A small weight - 0.005 - if checkRules is true.
// This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier.
// This reduces the impact of the values affected by the skip_model_update rule
// on the model while not completely ignoring them. This still allows the model to
// learn from the affected values - addressing point 1. and 2. in
// https://github.com/elastic/ml-cpp/issues/1272, Namely
// 1. If you apply it from the start of the modelling it can stop the model learning anything at all.
// 2. It can stop the model ever adapting to some change in data characteristics
core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength);
double initialCountWeight{this->initialCountWeight(
feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, sampleTime)};
if (initialCountWeight == 0.0) {
model->skipTime(time - lastBucketTimesMap[pid]);
continue;
}
const TOptionalSample& bucket = data_.second.s_BucketValue;
if (model_t::isSampled(feature) && bucket != std::nullopt) {
values.assign(1, core::make_triple(
bucket->time(), TDouble2Vec(bucket->value(dimension)),
model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID));
model->addBucketValue(values);
}
// For sparse data we reduce the impact of samples from empty buckets.
// In effect, we smoothly transition to modeling only values from non-empty
// buckets as the data becomes sparse.
double emptyBucketWeight = this->emptyBucketWeight(feature, pid, time);
if (emptyBucketWeight == 0.0) {
continue;
}
std::size_t n = samples.size();
double countWeight =
(this->params().s_MaximumUpdatesPerBucket > 0.0 && n > 0
? this->params().s_MaximumUpdatesPerBucket / static_cast<double>(n)
: 1.0) *
this->learnRate(feature) * initialCountWeight;
double outlierWeightDerate = this->derate(pid, sampleTime);
// Note we need to scale the amount of data we'll "age out" of the residual
// model in one bucket by the empty bucket weight so the posterior doesn't
// end up too flat.
double scaledInterval = emptyBucketWeight;
double scaledCountWeight = emptyBucketWeight * countWeight;
LOG_TRACE(<< "Bucket = " << gatherer.printCurrentBucket()
<< ", feature = " << model_t::print(feature) << ", samples = "
<< samples << ", isInteger = " << data_.second.s_IsInteger
<< ", person = " << this->personName(pid)
<< ", dimension = " << dimension << ", count weight = " << countWeight
<< ", scaled count weight = " << scaledCountWeight
<< ", scaled interval = " << scaledInterval);
values.resize(n);
trendWeights.resize(n, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
priorWeights.resize(n, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
for (std::size_t i = 0; i < n; ++i) {
core_t::TTime ithSampleTime = samples[i].time();
TDouble2Vec ithSampleValue(samples[i].value(dimension));
double countVarianceScale = samples[i].varianceScale();
values[i] = core::make_triple(
model_t::sampleTime(feature, time, bucketLength, ithSampleTime),
ithSampleValue, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID);
model->countWeights(ithSampleTime, ithSampleValue,
countWeight, scaledCountWeight,
outlierWeightDerate, countVarianceScale,
trendWeights[i], priorWeights[i]);
}
auto annotationCallback = [&](const std::string& annotation) {
if (this->params().s_AnnotationsEnabled) {
m_CurrentBucketStats.s_Annotations.emplace_back(
time, CAnnotation::E_ModelChange, annotation,
gatherer.searchKey().detectorIndex(),
gatherer.searchKey().partitionFieldName(),
gatherer.partitionFieldValue(),
gatherer.searchKey().overFieldName(), EMPTY_STRING,
gatherer.searchKey().byFieldName(), gatherer.personName(pid));
}
};
maths::common::CModelAddSamplesParams params;
auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor);
params.isInteger(data_.second.s_IsInteger)
.isNonNegative(data_.second.s_IsNonNegative)
.propagationInterval(scaledInterval)
.trendWeights(trendWeights)
.priorWeights(priorWeights)
.bucketOccupancy(model_t::includeEmptyBuckets(feature)
? this->personFrequency(pid)
: 1.0)
.firstValueTime(pid < this->firstBucketTimes().size()
? this->firstBucketTimes()[pid]
: std::numeric_limits<core_t::TTime>::min())
.annotationCallback([&](const std::string& annotation) {
annotationCallback(annotation);
})
.memoryCircuitBreaker(circuitBreaker);
if (model->addSamples(params, values) == maths::common::CModel::E_Reset) {
gatherer.resetSampleCount(pid);
}
}
}
this->sampleCorrelateModels();
}
}