in lib/model/CMetricPopulationModel.cc [308:527]
void CMetricPopulationModel::sample(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) {
CDataGatherer& gatherer = this->dataGatherer();
core_t::TTime bucketLength = gatherer.bucketLength();
if (!gatherer.validateSampleTimes(startTime, endTime)) {
return;
}
this->createUpdateNewModels(startTime, resourceMonitor);
this->currentBucketInterimCorrections().clear();
m_CurrentBucketStats.s_Annotations.clear();
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")");
gatherer.sampleNow(time);
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(time, bucketLength, featureData);
const TTimeVec& preSampleAttributeLastBucketTimes = this->attributeLastBucketTimes();
TSizeTimeUMap attributeLastBucketTimesMap;
for (const auto& featureData_ : featureData) {
TSizeSizePrFeatureDataPrVec& data =
m_CurrentBucketStats.s_FeatureData[featureData_.first];
for (const auto& data_ : data) {
std::size_t cid = CDataGatherer::extractAttributeId(data_);
attributeLastBucketTimesMap[cid] = preSampleAttributeLastBucketTimes[cid];
}
}
this->CPopulationModel::sample(time, time + bucketLength, resourceMonitor);
// Currently, we only remember one bucket.
m_CurrentBucketStats.s_StartTime = time;
TSizeUInt64PrVec& personCounts = m_CurrentBucketStats.s_PersonCounts;
gatherer.personNonZeroCounts(time, personCounts);
this->applyFilter(model_t::E_XF_Over, true, this->personFilter(), personCounts);
const TTimeVec& attributeLastBucketTimes = this->attributeLastBucketTimes();
for (auto& featureData_ : featureData) {
model_t::EFeature feature = featureData_.first;
std::size_t dimension = model_t::dimension(feature);
TSizeSizePrFeatureDataPrVec& data = m_CurrentBucketStats.s_FeatureData[feature];
data.swap(featureData_.second);
LOG_TRACE(<< model_t::print(feature) << ": " << data);
this->applyFilters(true, this->personFilter(), this->attributeFilter(), data);
TSizeValuesAndWeightsUMap attributeValuesAndWeights;
TSizeFuzzyDeduplicateUMap duplicates;
if (data.size() >= this->params().s_MinimumToFuzzyDeduplicate) {
// Set up fuzzy de-duplication.
for (const auto& data_ : data) {
std::size_t cid = CDataGatherer::extractAttributeId(data_);
const CGathererTools::TSampleVec& samples =
CDataGatherer::extractData(data_).s_Samples;
for (const auto& sample : samples) {
duplicates[cid].add(TDouble2Vec(sample.value(dimension)));
}
}
for (auto& attribute : duplicates) {
attribute.second.computeEpsilons(
bucketLength, this->params().s_MinimumToFuzzyDeduplicate);
}
}
for (const auto& data_ : data) {
std::size_t pid = CDataGatherer::extractPersonId(data_);
std::size_t cid = CDataGatherer::extractAttributeId(data_);
maths::common::CModel* model{this->model(feature, cid)};
if (model == nullptr) {
LOG_ERROR(<< "Missing model for " << this->attributeName(cid));
continue;
}
// initialCountWeight returns a weight value as double:
// 0.0 if checkScheduledEvents is true
// 1.0 if both checkScheduledEvents and checkRules are false
// A small weight - 0.005 - if checkRules is true.
// This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier.
// This reduces the impact of the values affected by the skip_model_update rule
// on the model while not completely ignoring them. This still allows the model to
// learn from the affected values - addressing point 1. and 2. in
// https://github.com/elastic/ml-cpp/issues/1272, Namely
// 1. If you apply it from the start of the modelling it can stop the model learning anything at all.
// 2. It can stop the model ever adapting to some change in data characteristics
core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength);
double initialCountWeight{
this->initialCountWeight(feature, pid, cid, sampleTime)};
if (initialCountWeight == 0.0) {
core_t::TTime skipTime = sampleTime - attributeLastBucketTimesMap[cid];
if (skipTime > 0) {
model->skipTime(skipTime);
// Update the last time so we don't advance the same model
// multiple times (once per person)
attributeLastBucketTimesMap[cid] = sampleTime;
}
continue;
}
const TOptionalSample& bucket =
CDataGatherer::extractData(data_).s_BucketValue;
const CGathererTools::TSampleVec& samples =
CDataGatherer::extractData(data_).s_Samples;
bool isInteger = CDataGatherer::extractData(data_).s_IsInteger;
bool isNonNegative = CDataGatherer::extractData(data_).s_IsNonNegative;
core_t::TTime cutoff = attributeLastBucketTimes[cid] -
this->params().s_SamplingAgeCutoff;
LOG_TRACE(<< "Adding " << CDataGatherer::extractData(data_)
<< " for person = " << gatherer.personName(pid)
<< " and attribute = " << gatherer.attributeName(cid));
SValuesAndWeights& attribute = attributeValuesAndWeights[cid];
attribute.s_IsInteger &= isInteger;
attribute.s_IsNonNegative &= isNonNegative;
if (model_t::isSampled(feature) && bucket) {
attribute.s_BucketValues.emplace_back(
bucket->time(), TDouble2Vec(bucket->value(dimension)), pid);
}
std::size_t n = std::count_if(samples.begin(), samples.end(),
[cutoff](const CSample& sample) {
return sample.time() >= cutoff;
});
double updatesPerBucket = this->params().s_MaximumUpdatesPerBucket;
double countWeight = initialCountWeight *
this->sampleRateWeight(pid, cid) *
this->learnRate(feature) *
(updatesPerBucket > 0.0 && n > 0
? updatesPerBucket / static_cast<double>(n)
: 1.0);
LOG_TRACE(<< "countWeight = " << countWeight);
for (const auto& sample : samples) {
if (sample.time() < cutoff) {
continue;
}
double countVarianceScale = sample.varianceScale();
TDouble2Vec value(sample.value(dimension));
std::size_t duplicate = duplicates[cid].duplicate(sample.time(), value);
if (duplicate < attribute.s_Values.size()) {
model->addCountWeights(sample.time(), countWeight,
countWeight, countVarianceScale,
attribute.s_TrendWeights[duplicate],
attribute.s_ResidualWeights[duplicate]);
} else {
attribute.s_Values.emplace_back(sample.time(), value, pid);
attribute.s_TrendWeights.push_back(
maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
attribute.s_ResidualWeights.push_back(
maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
auto& trendWeight = attribute.s_TrendWeights.back();
auto& residualWeight = attribute.s_ResidualWeights.back();
model->countWeights(sample.time(), value, countWeight,
countWeight, 1.0, // outlier weight derate
countVarianceScale, trendWeight, residualWeight);
}
}
}
for (auto& attribute : attributeValuesAndWeights) {
std::size_t cid = attribute.first;
core_t::TTime latest = std::numeric_limits<core_t::TTime>::lowest();
for (const auto& value : attribute.second.s_Values) {
latest = std::max(latest, value.first);
}
auto annotationCallback = [&](const std::string& annotation) {
if (this->params().s_AnnotationsEnabled) {
m_CurrentBucketStats.s_Annotations.emplace_back(
time, CAnnotation::E_ModelChange, annotation,
gatherer.searchKey().detectorIndex(),
gatherer.searchKey().partitionFieldName(),
gatherer.partitionFieldValue(),
gatherer.searchKey().overFieldName(),
gatherer.attributeName(cid),
gatherer.searchKey().byFieldName(), EMPTY_STRING);
}
};
maths::common::CModelAddSamplesParams params;
auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor);
params.isInteger(attribute.second.s_IsInteger)
.isNonNegative(attribute.second.s_IsNonNegative)
.propagationInterval(this->propagationTime(cid, latest))
.trendWeights(attribute.second.s_TrendWeights)
.priorWeights(attribute.second.s_ResidualWeights)
.firstValueTime(cid < this->attributeFirstBucketTimes().size()
? this->attributeFirstBucketTimes()[cid]
: std::numeric_limits<core_t::TTime>::min())
.annotationCallback([&](const std::string& annotation) {
annotationCallback(annotation);
})
.memoryCircuitBreaker(circuitBreaker);
maths::common::CModel* model{this->model(feature, cid)};
if (model == nullptr) {
LOG_TRACE(<< "Model unexpectedly null");
return;
}
if (model->addSamples(params, attribute.second.s_Values) ==
maths::common::CModel::E_Reset) {
gatherer.resetSampleCount(cid);
}
}
}
for (const auto& feature : m_FeatureCorrelatesModels) {
feature.s_Models->processSamples();
}
m_Probabilities.clear();
}
}