void CMetricPopulationModel::sample()

in lib/model/CMetricPopulationModel.cc [308:527]


void CMetricPopulationModel::sample(core_t::TTime startTime,
                                    core_t::TTime endTime,
                                    CResourceMonitor& resourceMonitor) {
    CDataGatherer& gatherer = this->dataGatherer();
    core_t::TTime bucketLength = gatherer.bucketLength();
    if (!gatherer.validateSampleTimes(startTime, endTime)) {
        return;
    }

    this->createUpdateNewModels(startTime, resourceMonitor);
    this->currentBucketInterimCorrections().clear();
    m_CurrentBucketStats.s_Annotations.clear();

    for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
        LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")");

        gatherer.sampleNow(time);

        TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
        gatherer.featureData(time, bucketLength, featureData);

        const TTimeVec& preSampleAttributeLastBucketTimes = this->attributeLastBucketTimes();
        TSizeTimeUMap attributeLastBucketTimesMap;
        for (const auto& featureData_ : featureData) {
            TSizeSizePrFeatureDataPrVec& data =
                m_CurrentBucketStats.s_FeatureData[featureData_.first];
            for (const auto& data_ : data) {
                std::size_t cid = CDataGatherer::extractAttributeId(data_);
                attributeLastBucketTimesMap[cid] = preSampleAttributeLastBucketTimes[cid];
            }
        }

        this->CPopulationModel::sample(time, time + bucketLength, resourceMonitor);

        // Currently, we only remember one bucket.
        m_CurrentBucketStats.s_StartTime = time;
        TSizeUInt64PrVec& personCounts = m_CurrentBucketStats.s_PersonCounts;
        gatherer.personNonZeroCounts(time, personCounts);
        this->applyFilter(model_t::E_XF_Over, true, this->personFilter(), personCounts);

        const TTimeVec& attributeLastBucketTimes = this->attributeLastBucketTimes();

        for (auto& featureData_ : featureData) {
            model_t::EFeature feature = featureData_.first;
            std::size_t dimension = model_t::dimension(feature);
            TSizeSizePrFeatureDataPrVec& data = m_CurrentBucketStats.s_FeatureData[feature];
            data.swap(featureData_.second);
            LOG_TRACE(<< model_t::print(feature) << ": " << data);
            this->applyFilters(true, this->personFilter(), this->attributeFilter(), data);

            TSizeValuesAndWeightsUMap attributeValuesAndWeights;
            TSizeFuzzyDeduplicateUMap duplicates;

            if (data.size() >= this->params().s_MinimumToFuzzyDeduplicate) {
                // Set up fuzzy de-duplication.
                for (const auto& data_ : data) {
                    std::size_t cid = CDataGatherer::extractAttributeId(data_);
                    const CGathererTools::TSampleVec& samples =
                        CDataGatherer::extractData(data_).s_Samples;
                    for (const auto& sample : samples) {
                        duplicates[cid].add(TDouble2Vec(sample.value(dimension)));
                    }
                }
                for (auto& attribute : duplicates) {
                    attribute.second.computeEpsilons(
                        bucketLength, this->params().s_MinimumToFuzzyDeduplicate);
                }
            }

            for (const auto& data_ : data) {
                std::size_t pid = CDataGatherer::extractPersonId(data_);
                std::size_t cid = CDataGatherer::extractAttributeId(data_);

                maths::common::CModel* model{this->model(feature, cid)};
                if (model == nullptr) {
                    LOG_ERROR(<< "Missing model for " << this->attributeName(cid));
                    continue;
                }
                // initialCountWeight returns a weight value as double:
                // 0.0 if checkScheduledEvents is true
                // 1.0 if both checkScheduledEvents and checkRules are false
                // A small weight - 0.005 - if checkRules is true.
                // This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier.
                // This reduces the impact of the values affected by the skip_model_update rule
                // on the model while not completely ignoring them. This still allows the model to
                // learn from the affected values - addressing point 1. and 2. in
                // https://github.com/elastic/ml-cpp/issues/1272, Namely
                // 1. If you apply it from the start of the modelling it can stop the model learning anything at all.
                // 2. It can stop the model ever adapting to some change in data characteristics
                core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength);
                double initialCountWeight{
                    this->initialCountWeight(feature, pid, cid, sampleTime)};
                if (initialCountWeight == 0.0) {
                    core_t::TTime skipTime = sampleTime - attributeLastBucketTimesMap[cid];
                    if (skipTime > 0) {
                        model->skipTime(skipTime);
                        // Update the last time so we don't advance the same model
                        // multiple times (once per person)
                        attributeLastBucketTimesMap[cid] = sampleTime;
                    }
                    continue;
                }

                const TOptionalSample& bucket =
                    CDataGatherer::extractData(data_).s_BucketValue;
                const CGathererTools::TSampleVec& samples =
                    CDataGatherer::extractData(data_).s_Samples;
                bool isInteger = CDataGatherer::extractData(data_).s_IsInteger;
                bool isNonNegative = CDataGatherer::extractData(data_).s_IsNonNegative;
                core_t::TTime cutoff = attributeLastBucketTimes[cid] -
                                       this->params().s_SamplingAgeCutoff;
                LOG_TRACE(<< "Adding " << CDataGatherer::extractData(data_)
                          << " for person = " << gatherer.personName(pid)
                          << " and attribute = " << gatherer.attributeName(cid));

                SValuesAndWeights& attribute = attributeValuesAndWeights[cid];

                attribute.s_IsInteger &= isInteger;
                attribute.s_IsNonNegative &= isNonNegative;
                if (model_t::isSampled(feature) && bucket) {
                    attribute.s_BucketValues.emplace_back(
                        bucket->time(), TDouble2Vec(bucket->value(dimension)), pid);
                }

                std::size_t n = std::count_if(samples.begin(), samples.end(),
                                              [cutoff](const CSample& sample) {
                                                  return sample.time() >= cutoff;
                                              });
                double updatesPerBucket = this->params().s_MaximumUpdatesPerBucket;
                double countWeight = initialCountWeight *
                                     this->sampleRateWeight(pid, cid) *
                                     this->learnRate(feature) *
                                     (updatesPerBucket > 0.0 && n > 0
                                          ? updatesPerBucket / static_cast<double>(n)
                                          : 1.0);
                LOG_TRACE(<< "countWeight = " << countWeight);

                for (const auto& sample : samples) {
                    if (sample.time() < cutoff) {
                        continue;
                    }

                    double countVarianceScale = sample.varianceScale();
                    TDouble2Vec value(sample.value(dimension));
                    std::size_t duplicate = duplicates[cid].duplicate(sample.time(), value);

                    if (duplicate < attribute.s_Values.size()) {
                        model->addCountWeights(sample.time(), countWeight,
                                               countWeight, countVarianceScale,
                                               attribute.s_TrendWeights[duplicate],
                                               attribute.s_ResidualWeights[duplicate]);
                    } else {
                        attribute.s_Values.emplace_back(sample.time(), value, pid);
                        attribute.s_TrendWeights.push_back(
                            maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
                        attribute.s_ResidualWeights.push_back(
                            maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
                        auto& trendWeight = attribute.s_TrendWeights.back();
                        auto& residualWeight = attribute.s_ResidualWeights.back();
                        model->countWeights(sample.time(), value, countWeight,
                                            countWeight, 1.0, // outlier weight derate
                                            countVarianceScale, trendWeight, residualWeight);
                    }
                }
            }

            for (auto& attribute : attributeValuesAndWeights) {
                std::size_t cid = attribute.first;
                core_t::TTime latest = std::numeric_limits<core_t::TTime>::lowest();
                for (const auto& value : attribute.second.s_Values) {
                    latest = std::max(latest, value.first);
                }

                auto annotationCallback = [&](const std::string& annotation) {
                    if (this->params().s_AnnotationsEnabled) {
                        m_CurrentBucketStats.s_Annotations.emplace_back(
                            time, CAnnotation::E_ModelChange, annotation,
                            gatherer.searchKey().detectorIndex(),
                            gatherer.searchKey().partitionFieldName(),
                            gatherer.partitionFieldValue(),
                            gatherer.searchKey().overFieldName(),
                            gatherer.attributeName(cid),
                            gatherer.searchKey().byFieldName(), EMPTY_STRING);
                    }
                };

                maths::common::CModelAddSamplesParams params;
                auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor);
                params.isInteger(attribute.second.s_IsInteger)
                    .isNonNegative(attribute.second.s_IsNonNegative)
                    .propagationInterval(this->propagationTime(cid, latest))
                    .trendWeights(attribute.second.s_TrendWeights)
                    .priorWeights(attribute.second.s_ResidualWeights)
                    .firstValueTime(cid < this->attributeFirstBucketTimes().size()
                                        ? this->attributeFirstBucketTimes()[cid]
                                        : std::numeric_limits<core_t::TTime>::min())
                    .annotationCallback([&](const std::string& annotation) {
                        annotationCallback(annotation);
                    })
                    .memoryCircuitBreaker(circuitBreaker);

                maths::common::CModel* model{this->model(feature, cid)};
                if (model == nullptr) {
                    LOG_TRACE(<< "Model unexpectedly null");
                    return;
                }
                if (model->addSamples(params, attribute.second.s_Values) ==
                    maths::common::CModel::E_Reset) {
                    gatherer.resetSampleCount(cid);
                }
            }
        }

        for (const auto& feature : m_FeatureCorrelatesModels) {
            feature.s_Models->processSamples();
        }

        m_Probabilities.clear();
    }
}