void CEventRateModel::sample()

in lib/model/CEventRateModel.cc [218:380]


void CEventRateModel::sample(core_t::TTime startTime,
                             core_t::TTime endTime,
                             CResourceMonitor& resourceMonitor) {
    CDataGatherer& gatherer = this->dataGatherer();
    core_t::TTime bucketLength = gatherer.bucketLength();

    if (!gatherer.validateSampleTimes(startTime, endTime)) {
        return;
    }

    this->createUpdateNewModels(startTime, resourceMonitor);
    this->currentBucketInterimCorrections().clear();
    m_CurrentBucketStats.s_Annotations.clear();

    for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
        LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")");

        gatherer.sampleNow(time);
        gatherer.featureData(time, bucketLength, m_CurrentBucketStats.s_FeatureData);

        const CIndividualModel::TTimeVec& preSampleLastBucketTimes = this->lastBucketTimes();
        CIndividualModel::TSizeTimeUMap lastBucketTimesMap;
        for (const auto& featureData : m_CurrentBucketStats.s_FeatureData) {
            for (const auto& data : featureData.second) {
                std::size_t pid = data.first;
                lastBucketTimesMap[pid] = preSampleLastBucketTimes[pid];
            }
        }

        this->CIndividualModel::sample(time, time + bucketLength, resourceMonitor);

        // Declared outside the loop to minimize the number of times they are created.
        maths::common::CModel::TTimeDouble2VecSizeTrVec values;
        maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec trendWeights;
        maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec priorWeights;

        for (auto& featureData : m_CurrentBucketStats.s_FeatureData) {
            model_t::EFeature feature = featureData.first;
            TSizeFeatureDataPrVec& data = featureData.second;
            std::size_t dimension = model_t::dimension(feature);
            LOG_TRACE(<< model_t::print(feature) << ": " << data);

            if (feature == model_t::E_IndividualTotalBucketCountByPerson) {
                for (const auto& data_ : data) {
                    if (data_.second.s_Count > 0) {
                        LOG_TRACE(<< "person = " << this->personName(data_.first));
                        m_ProbabilityPrior.addSamples({static_cast<double>(data_.first)},
                                                      maths_t::CUnitWeights::SINGLE_UNIT);
                    }
                }
                if (!data.empty()) {
                    m_ProbabilityPrior.propagateForwardsByTime(1.0);
                }
                continue;
            }
            if (model_t::isCategorical(feature)) {
                continue;
            }

            this->applyFilter(model_t::E_XF_By, true, this->personFilter(), data);

            for (const auto& data_ : data) {
                std::size_t pid = data_.first;

                maths::common::CModel* model = this->model(feature, pid);
                if (model == nullptr) {
                    LOG_ERROR(<< "Missing model for " << this->personName(pid));
                    continue;
                }

                // initialCountWeight returns a weight value as double:
                // 0.0 if checkScheduledEvents is true
                // 1.0 if both checkScheduledEvents and checkRules are false
                // A small weight - 0.005 - if checkRules is true.
                // This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier.
                // This reduces the impact of the values affected by the skip_model_update rule
                // on the model while not completely ignoring them. This still allows the model to
                // learn from the affected values - addressing point 1. and 2. in
                // https://github.com/elastic/ml-cpp/issues/1272, Namely
                // 1. If you apply it from the start of the modelling it can stop the model learning anything at all.
                // 2. It can stop the model ever adapting to some change in data characteristics
                core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength);
                double initialCountWeight = this->initialCountWeight(
                    feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, sampleTime);
                if (initialCountWeight == 0.0) {
                    model->skipTime(sampleTime - lastBucketTimesMap[pid]);
                    continue;
                }

                // For sparse data we reduce the impact of samples from empty buckets.
                // In effect, we smoothly transition to modeling only values from non-empty
                // buckets as the data becomes sparse.
                double emptyBucketWeight = this->emptyBucketWeight(feature, pid, time);
                if (emptyBucketWeight == 0.0) {
                    continue;
                }

                double count = model_t::offsetCountToZero(
                    feature, static_cast<double>(data_.second.s_Count));
                TDouble2Vec value{count};
                double outlierWeightDerate = this->derate(pid, sampleTime);
                double countWeight = initialCountWeight * this->learnRate(feature);
                // Note we need to scale the amount of data we'll "age out" of the residual
                // model in one bucket by the empty bucket weight so the posterior doesn't
                // end up too flat.
                double scaledInterval = emptyBucketWeight;
                double scaledCountWeight = emptyBucketWeight * countWeight;

                LOG_TRACE(<< "Bucket = " << this->printCurrentBucket()
                          << ", feature = " << model_t::print(feature) << ", count = "
                          << count << ", person = " << this->personName(pid)
                          << ", empty bucket weight = " << emptyBucketWeight
                          << ", count weight = " << countWeight
                          << ", scaled count weight = " << scaledCountWeight
                          << ", scaled interval = " << scaledInterval);

                values.assign(1, core::make_triple(sampleTime, value, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID));
                trendWeights.resize(1, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
                priorWeights.resize(1, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
                model->countWeights(sampleTime, value, countWeight, scaledCountWeight,
                                    outlierWeightDerate, 1.0, // count variance scale
                                    trendWeights[0], priorWeights[0]);

                auto annotationCallback = [&](const std::string& annotation) {
                    if (this->params().s_AnnotationsEnabled) {
                        m_CurrentBucketStats.s_Annotations.emplace_back(
                            time, CAnnotation::E_ModelChange, annotation,
                            gatherer.searchKey().detectorIndex(),
                            gatherer.searchKey().partitionFieldName(),
                            gatherer.partitionFieldValue(),
                            gatherer.searchKey().overFieldName(), EMPTY_STRING,
                            gatherer.searchKey().byFieldName(), gatherer.personName(pid));
                    }
                };

                maths::common::CModelAddSamplesParams params;
                auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor);
                params.isInteger(true)
                    .isNonNegative(true)
                    .propagationInterval(scaledInterval)
                    .trendWeights(trendWeights)
                    .priorWeights(priorWeights)
                    .bucketOccupancy(model_t::includeEmptyBuckets(feature)
                                         ? this->personFrequency(pid)
                                         : 1.0)
                    .firstValueTime(pid < this->firstBucketTimes().size()
                                        ? this->firstBucketTimes()[pid]
                                        : std::numeric_limits<core_t::TTime>::min())
                    .annotationCallback([&](const std::string& annotation) {
                        annotationCallback(annotation);
                    })
                    .memoryCircuitBreaker(circuitBreaker);

                if (model->addSamples(params, values) == maths::common::CModel::E_Reset) {
                    gatherer.resetSampleCount(pid);
                }
            }
        }

        this->sampleCorrelateModels();
        m_Probabilities = TCategoryProbabilityCache(m_ProbabilityPrior);
    }
}