in lib/model/CEventRateModel.cc [218:380]
void CEventRateModel::sample(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) {
CDataGatherer& gatherer = this->dataGatherer();
core_t::TTime bucketLength = gatherer.bucketLength();
if (!gatherer.validateSampleTimes(startTime, endTime)) {
return;
}
this->createUpdateNewModels(startTime, resourceMonitor);
this->currentBucketInterimCorrections().clear();
m_CurrentBucketStats.s_Annotations.clear();
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")");
gatherer.sampleNow(time);
gatherer.featureData(time, bucketLength, m_CurrentBucketStats.s_FeatureData);
const CIndividualModel::TTimeVec& preSampleLastBucketTimes = this->lastBucketTimes();
CIndividualModel::TSizeTimeUMap lastBucketTimesMap;
for (const auto& featureData : m_CurrentBucketStats.s_FeatureData) {
for (const auto& data : featureData.second) {
std::size_t pid = data.first;
lastBucketTimesMap[pid] = preSampleLastBucketTimes[pid];
}
}
this->CIndividualModel::sample(time, time + bucketLength, resourceMonitor);
// Declared outside the loop to minimize the number of times they are created.
maths::common::CModel::TTimeDouble2VecSizeTrVec values;
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec trendWeights;
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec priorWeights;
for (auto& featureData : m_CurrentBucketStats.s_FeatureData) {
model_t::EFeature feature = featureData.first;
TSizeFeatureDataPrVec& data = featureData.second;
std::size_t dimension = model_t::dimension(feature);
LOG_TRACE(<< model_t::print(feature) << ": " << data);
if (feature == model_t::E_IndividualTotalBucketCountByPerson) {
for (const auto& data_ : data) {
if (data_.second.s_Count > 0) {
LOG_TRACE(<< "person = " << this->personName(data_.first));
m_ProbabilityPrior.addSamples({static_cast<double>(data_.first)},
maths_t::CUnitWeights::SINGLE_UNIT);
}
}
if (!data.empty()) {
m_ProbabilityPrior.propagateForwardsByTime(1.0);
}
continue;
}
if (model_t::isCategorical(feature)) {
continue;
}
this->applyFilter(model_t::E_XF_By, true, this->personFilter(), data);
for (const auto& data_ : data) {
std::size_t pid = data_.first;
maths::common::CModel* model = this->model(feature, pid);
if (model == nullptr) {
LOG_ERROR(<< "Missing model for " << this->personName(pid));
continue;
}
// initialCountWeight returns a weight value as double:
// 0.0 if checkScheduledEvents is true
// 1.0 if both checkScheduledEvents and checkRules are false
// A small weight - 0.005 - if checkRules is true.
// This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier.
// This reduces the impact of the values affected by the skip_model_update rule
// on the model while not completely ignoring them. This still allows the model to
// learn from the affected values - addressing point 1. and 2. in
// https://github.com/elastic/ml-cpp/issues/1272, Namely
// 1. If you apply it from the start of the modelling it can stop the model learning anything at all.
// 2. It can stop the model ever adapting to some change in data characteristics
core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength);
double initialCountWeight = this->initialCountWeight(
feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, sampleTime);
if (initialCountWeight == 0.0) {
model->skipTime(sampleTime - lastBucketTimesMap[pid]);
continue;
}
// For sparse data we reduce the impact of samples from empty buckets.
// In effect, we smoothly transition to modeling only values from non-empty
// buckets as the data becomes sparse.
double emptyBucketWeight = this->emptyBucketWeight(feature, pid, time);
if (emptyBucketWeight == 0.0) {
continue;
}
double count = model_t::offsetCountToZero(
feature, static_cast<double>(data_.second.s_Count));
TDouble2Vec value{count};
double outlierWeightDerate = this->derate(pid, sampleTime);
double countWeight = initialCountWeight * this->learnRate(feature);
// Note we need to scale the amount of data we'll "age out" of the residual
// model in one bucket by the empty bucket weight so the posterior doesn't
// end up too flat.
double scaledInterval = emptyBucketWeight;
double scaledCountWeight = emptyBucketWeight * countWeight;
LOG_TRACE(<< "Bucket = " << this->printCurrentBucket()
<< ", feature = " << model_t::print(feature) << ", count = "
<< count << ", person = " << this->personName(pid)
<< ", empty bucket weight = " << emptyBucketWeight
<< ", count weight = " << countWeight
<< ", scaled count weight = " << scaledCountWeight
<< ", scaled interval = " << scaledInterval);
values.assign(1, core::make_triple(sampleTime, value, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID));
trendWeights.resize(1, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
priorWeights.resize(1, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
model->countWeights(sampleTime, value, countWeight, scaledCountWeight,
outlierWeightDerate, 1.0, // count variance scale
trendWeights[0], priorWeights[0]);
auto annotationCallback = [&](const std::string& annotation) {
if (this->params().s_AnnotationsEnabled) {
m_CurrentBucketStats.s_Annotations.emplace_back(
time, CAnnotation::E_ModelChange, annotation,
gatherer.searchKey().detectorIndex(),
gatherer.searchKey().partitionFieldName(),
gatherer.partitionFieldValue(),
gatherer.searchKey().overFieldName(), EMPTY_STRING,
gatherer.searchKey().byFieldName(), gatherer.personName(pid));
}
};
maths::common::CModelAddSamplesParams params;
auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor);
params.isInteger(true)
.isNonNegative(true)
.propagationInterval(scaledInterval)
.trendWeights(trendWeights)
.priorWeights(priorWeights)
.bucketOccupancy(model_t::includeEmptyBuckets(feature)
? this->personFrequency(pid)
: 1.0)
.firstValueTime(pid < this->firstBucketTimes().size()
? this->firstBucketTimes()[pid]
: std::numeric_limits<core_t::TTime>::min())
.annotationCallback([&](const std::string& annotation) {
annotationCallback(annotation);
})
.memoryCircuitBreaker(circuitBreaker);
if (model->addSamples(params, values) == maths::common::CModel::E_Reset) {
gatherer.resetSampleCount(pid);
}
}
}
this->sampleCorrelateModels();
m_Probabilities = TCategoryProbabilityCache(m_ProbabilityPrior);
}
}