void CEventRatePopulationModel::sample()

in lib/model/CEventRatePopulationModel.cc [348:545]


void CEventRatePopulationModel::sample(core_t::TTime startTime,
                                       core_t::TTime endTime,
                                       CResourceMonitor& resourceMonitor) {
    CDataGatherer& gatherer = this->dataGatherer();
    core_t::TTime bucketLength = gatherer.bucketLength();
    if (!gatherer.validateSampleTimes(startTime, endTime)) {
        return;
    }

    this->createUpdateNewModels(startTime, resourceMonitor);
    this->currentBucketInterimCorrections().clear();
    m_CurrentBucketStats.s_Annotations.clear();

    for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
        LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")");

        gatherer.sampleNow(time);
        TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
        gatherer.featureData(time, bucketLength, featureData);

        this->CPopulationModel::sample(time, time + bucketLength, resourceMonitor);
        const TTimeVec& preSampleAttributeLastBucketTimes = this->attributeLastBucketTimes();
        TSizeTimeUMap attributeLastBucketTimesMap;
        for (const auto& featureData_ : featureData) {
            TSizeSizePrFeatureDataPrVec& data =
                m_CurrentBucketStats.s_FeatureData[featureData_.first];
            for (const auto& data_ : data) {
                std::size_t cid = CDataGatherer::extractAttributeId(data_);
                attributeLastBucketTimesMap[cid] = preSampleAttributeLastBucketTimes[cid];
            }
        }

        // Currently, we only remember one bucket.
        m_CurrentBucketStats.s_StartTime = time;
        TSizeUInt64PrVec& personCounts = m_CurrentBucketStats.s_PersonCounts;
        gatherer.personNonZeroCounts(time, personCounts);
        this->applyFilter(model_t::E_XF_Over, true, this->personFilter(), personCounts);

        for (auto& featureData_ : featureData) {
            model_t::EFeature feature = featureData_.first;
            TSizeSizePrFeatureDataPrVec& data = m_CurrentBucketStats.s_FeatureData[feature];
            data.swap(featureData_.second);
            LOG_TRACE(<< model_t::print(feature) << ": " << data);

            if (feature == model_t::E_PopulationUniquePersonCountByAttribute) {
                TDoubleVec categories;
                TDoubleVec concentrations;
                categories.reserve(data.size());
                concentrations.reserve(data.size());
                for (const auto& tuple : data) {
                    categories.push_back(static_cast<double>(
                        CDataGatherer::extractAttributeId(tuple)));
                    concentrations.push_back(static_cast<double>(
                        CDataGatherer::extractData(tuple).s_Count));
                }
                maths::common::CMultinomialConjugate prior(
                    std::numeric_limits<int>::max(), categories, concentrations);
                m_AttributeProbabilityPrior.swap(prior);
                continue;
            }
            if (model_t::isCategorical(feature)) {
                continue;
            }

            this->applyFilters(true, this->personFilter(), this->attributeFilter(), data);

            core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength);

            TSizeValuesAndWeightsUMap attributeValuesAndWeights;
            TSizeFuzzyDeduplicateUMap duplicates;

            if (data.size() >= this->params().s_MinimumToFuzzyDeduplicate) {
                // Set up fuzzy de-duplication.
                for (const auto& data_ : data) {
                    std::size_t cid = CDataGatherer::extractAttributeId(data_);
                    std::uint64_t count = CDataGatherer::extractData(data_).s_Count;
                    duplicates[cid].add({static_cast<double>(count)});
                }
                for (auto& attribute : duplicates) {
                    attribute.second.computeEpsilons(
                        bucketLength, this->params().s_MinimumToFuzzyDeduplicate);
                }
            }

            for (const auto& data_ : data) {
                std::size_t pid = CDataGatherer::extractPersonId(data_);
                std::size_t cid = CDataGatherer::extractAttributeId(data_);

                maths::common::CModel* model{this->model(feature, cid)};
                if (model == nullptr) {
                    LOG_ERROR(<< "Missing model for " << this->attributeName(cid));
                    continue;
                }
                // initialCountWeight returns a weight value as double:
                // 0.0 if checkScheduledEvents is true
                // 1.0 if both checkScheduledEvents and checkRules are false
                // A small weight - 0.005 - if checkRules is true.
                // This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier.
                // This reduces the impact of the values affected by the skip_model_update rule
                // on the model while not completely ignoring them. This still allows the model to
                // learn from the affected values - addressing point 1. and 2. in
                // https://github.com/elastic/ml-cpp/issues/1272, Namely
                // 1. If you apply it from the start of the modelling it can stop the model learning anything at all.
                // 2. It can stop the model ever adapting to some change in data characteristics
                double initialCountWeight{
                    this->initialCountWeight(feature, pid, cid, sampleTime)};
                if (initialCountWeight == 0.0) {
                    core_t::TTime skipTime = sampleTime - attributeLastBucketTimesMap[cid];
                    if (skipTime > 0) {
                        model->skipTime(skipTime);
                        // Update the last time so we don't advance the same model
                        // multiple times (once per person)
                        attributeLastBucketTimesMap[cid] = sampleTime;
                    }
                    continue;
                }

                double count =
                    static_cast<double>(CDataGatherer::extractData(data_).s_Count);
                double value = model_t::offsetCountToZero(feature, count);
                double countWeight = initialCountWeight *
                                     this->sampleRateWeight(pid, cid) *
                                     this->learnRate(feature);
                LOG_TRACE(<< "Adding " << value
                          << " for person = " << gatherer.personName(pid)
                          << " and attribute = " << gatherer.attributeName(cid));

                SValuesAndWeights& attribute = attributeValuesAndWeights[cid];
                std::size_t duplicate = duplicates[cid].duplicate(sampleTime, {value});

                if (duplicate < attribute.s_Values.size()) {
                    model->addCountWeights(sampleTime, countWeight, countWeight,
                                           1.0, attribute.s_TrendWeights[duplicate],
                                           attribute.s_ResidualWeights[duplicate]);
                } else {
                    attribute.s_Values.emplace_back(sampleTime, TDouble2Vec{value}, pid);
                    attribute.s_TrendWeights.push_back(
                        maths_t::CUnitWeights::unit<TDouble2Vec>(1));
                    attribute.s_ResidualWeights.push_back(
                        maths_t::CUnitWeights::unit<TDouble2Vec>(1));
                    model->countWeights(sampleTime, {value}, countWeight,
                                        countWeight, 1.0, // outlier weight derate
                                        1.0, // count variance scale
                                        attribute.s_TrendWeights.back(),
                                        attribute.s_ResidualWeights.back());
                }
            }

            for (auto& attribute : attributeValuesAndWeights) {
                std::size_t cid = attribute.first;
                auto annotationCallback = [&](const std::string& annotation) {
                    if (this->params().s_AnnotationsEnabled) {
                        m_CurrentBucketStats.s_Annotations.emplace_back(
                            time, CAnnotation::E_ModelChange, annotation,
                            gatherer.searchKey().detectorIndex(),
                            gatherer.searchKey().partitionFieldName(),
                            gatherer.partitionFieldValue(),
                            gatherer.searchKey().overFieldName(),
                            gatherer.attributeName(cid),
                            gatherer.searchKey().byFieldName(), EMPTY_STRING);
                    }
                };

                maths::common::CModelAddSamplesParams params;
                auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor);
                params.isInteger(true)
                    .isNonNegative(true)
                    .propagationInterval(this->propagationTime(cid, sampleTime))
                    .trendWeights(attribute.second.s_TrendWeights)
                    .priorWeights(attribute.second.s_ResidualWeights)
                    .firstValueTime(cid < this->attributeFirstBucketTimes().size()
                                        ? this->attributeFirstBucketTimes()[cid]
                                        : std::numeric_limits<core_t::TTime>::min())
                    .annotationCallback([&](const std::string& annotation) {
                        annotationCallback(annotation);
                    })
                    .memoryCircuitBreaker(circuitBreaker);

                maths::common::CModel* model{this->model(feature, cid)};
                if (model == nullptr) {
                    LOG_TRACE(<< "Model unexpectedly null");
                    continue;
                }
                if (model->addSamples(params, attribute.second.s_Values) ==
                    maths::common::CModel::E_Reset) {
                    gatherer.resetSampleCount(cid);
                }
            }
        }

        for (const auto& feature : m_FeatureCorrelatesModels) {
            feature.s_Models->processSamples();
        }

        m_AttributeProbabilities = TCategoryProbabilityCache(m_AttributeProbabilityPrior);
        m_Probabilities.clear();
    }
}