void CMetricModel::sample()

in lib/model/CMetricModel.cc [187:355]


void CMetricModel::sample(core_t::TTime startTime,
                          core_t::TTime endTime,
                          CResourceMonitor& resourceMonitor) {
    CDataGatherer& gatherer = this->dataGatherer();
    core_t::TTime bucketLength = gatherer.bucketLength();

    m_CurrentBucketStats.s_StartTime = std::max(m_CurrentBucketStats.s_StartTime, startTime);

    this->createUpdateNewModels(startTime, resourceMonitor);
    m_CurrentBucketStats.s_InterimCorrections.clear();
    m_CurrentBucketStats.s_Annotations.clear();
    m_CurrentBucketStats.s_FeatureData.clear();
    m_CurrentBucketStats.s_PersonCounts.clear();

    if (!gatherer.validateSampleTimes(startTime, endTime)) {
        return;
    }

    for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
        LOG_TRACE(<< "Sampling [" << time << "," << time + bucketLength << ")");

        gatherer.sampleNow(time);
        gatherer.featureData(time, bucketLength, m_CurrentBucketStats.s_FeatureData);

        const TTimeVec& preSampleLastBucketTimes = this->lastBucketTimes();
        TSizeTimeUMap lastBucketTimesMap;
        for (const auto& featureData : m_CurrentBucketStats.s_FeatureData) {
            for (const auto& data : featureData.second) {
                std::size_t pid = data.first;
                lastBucketTimesMap[pid] = preSampleLastBucketTimes[pid];
            }
        }

        this->CIndividualModel::sample(time, time + bucketLength, resourceMonitor);

        // Declared outside the loop to minimize the number of times they are created.
        maths::common::CModel::TTimeDouble2VecSizeTrVec values;
        maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec trendWeights;
        maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec priorWeights;

        for (auto& featureData : m_CurrentBucketStats.s_FeatureData) {
            model_t::EFeature feature = featureData.first;
            TSizeFeatureDataPrVec& data = featureData.second;
            std::size_t dimension = model_t::dimension(feature);
            LOG_TRACE(<< model_t::print(feature) << " data = " << data);
            this->applyFilter(model_t::E_XF_By, true, this->personFilter(), data);

            for (const auto& data_ : data) {
                std::size_t pid = data_.first;
                const CGathererTools::TSampleVec& samples = data_.second.s_Samples;

                maths::common::CModel* model = this->model(feature, pid);
                if (model == nullptr) {
                    LOG_ERROR(<< "Missing model for " << this->personName(pid));
                    continue;
                }

                // initialCountWeight returns a weight value as double:
                // 0.0 if checkScheduledEvents is true
                // 1.0 if both checkScheduledEvents and checkRules are false
                // A small weight - 0.005 - if checkRules is true.
                // This weight is applied to countWeight (and therefore scaledCountWeight) as multiplier.
                // This reduces the impact of the values affected by the skip_model_update rule
                // on the model while not completely ignoring them. This still allows the model to
                // learn from the affected values - addressing point 1. and 2. in
                // https://github.com/elastic/ml-cpp/issues/1272, Namely
                // 1. If you apply it from the start of the modelling it can stop the model learning anything at all.
                // 2. It can stop the model ever adapting to some change in data characteristics

                core_t::TTime sampleTime = model_t::sampleTime(feature, time, bucketLength);
                double initialCountWeight{this->initialCountWeight(
                    feature, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, sampleTime)};
                if (initialCountWeight == 0.0) {
                    model->skipTime(time - lastBucketTimesMap[pid]);
                    continue;
                }

                const TOptionalSample& bucket = data_.second.s_BucketValue;
                if (model_t::isSampled(feature) && bucket != std::nullopt) {
                    values.assign(1, core::make_triple(
                                         bucket->time(), TDouble2Vec(bucket->value(dimension)),
                                         model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID));
                    model->addBucketValue(values);
                }

                // For sparse data we reduce the impact of samples from empty buckets.
                // In effect, we smoothly transition to modeling only values from non-empty
                // buckets as the data becomes sparse.
                double emptyBucketWeight = this->emptyBucketWeight(feature, pid, time);
                if (emptyBucketWeight == 0.0) {
                    continue;
                }

                std::size_t n = samples.size();
                double countWeight =
                    (this->params().s_MaximumUpdatesPerBucket > 0.0 && n > 0
                         ? this->params().s_MaximumUpdatesPerBucket / static_cast<double>(n)
                         : 1.0) *
                    this->learnRate(feature) * initialCountWeight;
                double outlierWeightDerate = this->derate(pid, sampleTime);
                // Note we need to scale the amount of data we'll "age out" of the residual
                // model in one bucket by the empty bucket weight so the posterior doesn't
                // end up too flat.
                double scaledInterval = emptyBucketWeight;
                double scaledCountWeight = emptyBucketWeight * countWeight;

                LOG_TRACE(<< "Bucket = " << gatherer.printCurrentBucket()
                          << ", feature = " << model_t::print(feature) << ", samples = "
                          << samples << ", isInteger = " << data_.second.s_IsInteger
                          << ", person = " << this->personName(pid)
                          << ", dimension = " << dimension << ", count weight = " << countWeight
                          << ", scaled count weight = " << scaledCountWeight
                          << ", scaled interval = " << scaledInterval);

                values.resize(n);
                trendWeights.resize(n, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
                priorWeights.resize(n, maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
                for (std::size_t i = 0; i < n; ++i) {
                    core_t::TTime ithSampleTime = samples[i].time();
                    TDouble2Vec ithSampleValue(samples[i].value(dimension));
                    double countVarianceScale = samples[i].varianceScale();
                    values[i] = core::make_triple(
                        model_t::sampleTime(feature, time, bucketLength, ithSampleTime),
                        ithSampleValue, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID);
                    model->countWeights(ithSampleTime, ithSampleValue,
                                        countWeight, scaledCountWeight,
                                        outlierWeightDerate, countVarianceScale,
                                        trendWeights[i], priorWeights[i]);
                }

                auto annotationCallback = [&](const std::string& annotation) {
                    if (this->params().s_AnnotationsEnabled) {
                        m_CurrentBucketStats.s_Annotations.emplace_back(
                            time, CAnnotation::E_ModelChange, annotation,
                            gatherer.searchKey().detectorIndex(),
                            gatherer.searchKey().partitionFieldName(),
                            gatherer.partitionFieldValue(),
                            gatherer.searchKey().overFieldName(), EMPTY_STRING,
                            gatherer.searchKey().byFieldName(), gatherer.personName(pid));
                    }
                };

                maths::common::CModelAddSamplesParams params;
                auto circuitBreaker = CMemoryCircuitBreaker(resourceMonitor);
                params.isInteger(data_.second.s_IsInteger)
                    .isNonNegative(data_.second.s_IsNonNegative)
                    .propagationInterval(scaledInterval)
                    .trendWeights(trendWeights)
                    .priorWeights(priorWeights)
                    .bucketOccupancy(model_t::includeEmptyBuckets(feature)
                                         ? this->personFrequency(pid)
                                         : 1.0)
                    .firstValueTime(pid < this->firstBucketTimes().size()
                                        ? this->firstBucketTimes()[pid]
                                        : std::numeric_limits<core_t::TTime>::min())
                    .annotationCallback([&](const std::string& annotation) {
                        annotationCallback(annotation);
                    })
                    .memoryCircuitBreaker(circuitBreaker);

                if (model->addSamples(params, values) == maths::common::CModel::E_Reset) {
                    gatherer.resetSampleCount(pid);
                }
            }
        }

        this->sampleCorrelateModels();
    }
}