in lib/model/CEventRatePopulationModel.cc [348:545]
void CEventRatePopulationModel::sample(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) {
CDataGatherer& gatherer = this->dataGatherer();
core_t::TTime bucketLength = gatherer.bucketLength();
if (!gatherer.validateSampleTimes(startTime, endTime)) {
return;
}
this->createUpdateNewModels(startTime, resourceMonitor);
this->currentBucketInterimCorrections().clear();
m_CurrentBucketStats.s_Annotations.clear();
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")");
gatherer.sampleNow(time);
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(time, bucketLength, featureData);
this->CPopulationModel::sample(time, time + bucketLength, resourceMonitor);
const TTimeVec& preSampleAttributeLastBucketTimes = this->attributeLastBucketTimes();
TSizeTimeUMap attributeLastBucketTimesMap;
for (const auto& featureData_ : featureData) {
TSizeSizePrFeatureDataPrVec& data =
m_CurrentBucketStats.s_FeatureData[featureData_.first];
for (const auto& data_ : data) {
std::size_t cid = CDataGatherer::extractAttributeId(data_);
attributeLastBucketTimesMap[cid] = preSampleAttributeLastBucketTimes[cid];
}
}
// Currently, we only remember one bucket.
m_CurrentBucketStats.s_StartTime = time;
TSizeUInt64PrVec& personCounts = m_CurrentBucketStats.s_PersonCounts;
gatherer.personNonZeroCounts(time, personCounts);
this->applyFilter(model_t::E_XF_Over, true, this->personFilter(), personCounts);
for (auto& featureData_ : featureData) {
model_t::EFeature feature = featureData_.first;
TSizeSizePrFeatureDataPrVec& data = m_CurrentBucketStats.s_FeatureData[feature];
data.swap(featureData_.second);
LOG_TRACE(<< model_t::print(feature) << ": " << data);
if (feature == model_t::E_PopulationUniquePersonCountByAttribute) {
TDoubleVec categories;
TDoubleVec concentrations;
categories.reserve(data.size());
concentrations.reserve(data.size());
for (const auto& tuple : data) {
categories.push_back(static_cast<double>(
CDataGatherer::extractAttributeId(tuple)));
concentrations.push_back(static_cast<double>(
CDataGatherer::extractData(tuple).s_Count));
}
maths::common::CMultinomialConjugate prior(
std::numeric_limits<int>::max(), categories, concentrations);
m_AttributeProbabilityPrior.swap(prior);
continue;
}
if (model_t::isCategorical(feature)) {
continue;
}
this->applyFilters(true, this->personFilter(), this->attributeFilter(), data);
core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength);
TSizeValuesAndWeightsUMap attributeValuesAndWeights;
TSizeFuzzyDeduplicateUMap duplicates;
if (data.size() >= this->params().s_MinimumToFuzzyDeduplicate) {
// Set up fuzzy de-duplication.
for (const auto& data_ : data) {
std::size_t cid = CDataGatherer::extractAttributeId(data_);
std::uint64_t count = CDataGatherer::extractData(data_).s_Count;
duplicates[cid].add({static_cast<double>(count)});
}
for (auto& attribute : duplicates) {
attribute.second.computeEpsilons(
bucketLength, this->params().s_MinimumToFuzzyDeduplicate);
}
}
for (const auto& data_ : data) {
std::size_t pid = CDataGatherer::extractPersonId(data_);
std::size_t cid = CDataGatherer::extractAttributeId(data_);
maths::common::CModel* model{this->model(feature, cid)};
if (model == nullptr) {
LOG_ERROR(<< "Missing model for " << this->attributeName(cid));
continue;
}
// initialCountWeight returns a weight value as double:
// 0.0 if checkScheduledEvents is true
// 1.0 if both checkScheduledEvents and checkRules are false
// A small weight - 0.005 - if checkRules is true.
// This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier.
// This reduces the impact of the values affected by the skip_model_update rule
// on the model while not completely ignoring them. This still allows the model to
// learn from the affected values - addressing point 1. and 2. in
// https://github.com/elastic/ml-cpp/issues/1272, Namely
// 1. If you apply it from the start of the modelling it can stop the model learning anything at all.
// 2. It can stop the model ever adapting to some change in data characteristics
double initialCountWeight{
this->initialCountWeight(feature, pid, cid, sampleTime)};
if (initialCountWeight == 0.0) {
core_t::TTime skipTime = sampleTime - attributeLastBucketTimesMap[cid];
if (skipTime > 0) {
model->skipTime(skipTime);
// Update the last time so we don't advance the same model
// multiple times (once per person)
attributeLastBucketTimesMap[cid] = sampleTime;
}
continue;
}
double count =
static_cast<double>(CDataGatherer::extractData(data_).s_Count);
double value = model_t::offsetCountToZero(feature, count);
double countWeight = initialCountWeight *
this->sampleRateWeight(pid, cid) *
this->learnRate(feature);
LOG_TRACE(<< "Adding " << value
<< " for person = " << gatherer.personName(pid)
<< " and attribute = " << gatherer.attributeName(cid));
SValuesAndWeights& attribute = attributeValuesAndWeights[cid];
std::size_t duplicate = duplicates[cid].duplicate(sampleTime, {value});
if (duplicate < attribute.s_Values.size()) {
model->addCountWeights(sampleTime, countWeight, countWeight,
1.0, attribute.s_TrendWeights[duplicate],
attribute.s_ResidualWeights[duplicate]);
} else {
attribute.s_Values.emplace_back(sampleTime, TDouble2Vec{value}, pid);
attribute.s_TrendWeights.push_back(
maths_t::CUnitWeights::unit<TDouble2Vec>(1));
attribute.s_ResidualWeights.push_back(
maths_t::CUnitWeights::unit<TDouble2Vec>(1));
model->countWeights(sampleTime, {value}, countWeight,
countWeight, 1.0, // outlier weight derate
1.0, // count variance scale
attribute.s_TrendWeights.back(),
attribute.s_ResidualWeights.back());
}
}
for (auto& attribute : attributeValuesAndWeights) {
std::size_t cid = attribute.first;
auto annotationCallback = [&](const std::string& annotation) {
if (this->params().s_AnnotationsEnabled) {
m_CurrentBucketStats.s_Annotations.emplace_back(
time, CAnnotation::E_ModelChange, annotation,
gatherer.searchKey().detectorIndex(),
gatherer.searchKey().partitionFieldName(),
gatherer.partitionFieldValue(),
gatherer.searchKey().overFieldName(),
gatherer.attributeName(cid),
gatherer.searchKey().byFieldName(), EMPTY_STRING);
}
};
maths::common::CModelAddSamplesParams params;
auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor);
params.isInteger(true)
.isNonNegative(true)
.propagationInterval(this->propagationTime(cid, sampleTime))
.trendWeights(attribute.second.s_TrendWeights)
.priorWeights(attribute.second.s_ResidualWeights)
.firstValueTime(cid < this->attributeFirstBucketTimes().size()
? this->attributeFirstBucketTimes()[cid]
: std::numeric_limits<core_t::TTime>::min())
.annotationCallback([&](const std::string& annotation) {
annotationCallback(annotation);
})
.memoryCircuitBreaker(circuitBreaker);
maths::common::CModel* model{this->model(feature, cid)};
if (model == nullptr) {
LOG_TRACE(<< "Model unexpectedly null");
continue;
}
if (model->addSamples(params, attribute.second.s_Values) ==
maths::common::CModel::E_Reset) {
gatherer.resetSampleCount(cid);
}
}
}
for (const auto& feature : m_FeatureCorrelatesModels) {
feature.s_Models->processSamples();
}
m_AttributeProbabilities = TCategoryProbabilityCache(m_AttributeProbabilityPrior);
m_Probabilities.clear();
}
}