lib/model/CDataGatherer.cc (654 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <model/CDataGatherer.h> #include <core/CLogger.h> #include <core/CMemoryDef.h> #include <core/CProgramCounters.h> #include <core/CStatePersistInserter.h> #include <core/CStateRestoreTraverser.h> #include <core/CStringUtils.h> #include <core/RestoreMacros.h> #include <maths/common/CChecksum.h> #include <maths/common/CMathsFuncs.h> #include <model/CEventRateBucketGatherer.h> #include <model/CMetricBucketGatherer.h> #include <model/CSampleCounts.h> #include <model/CSearchKey.h> #include <algorithm> #include <utility> namespace ml { namespace model { namespace { const std::string FEATURE_TAG("a"); const std::string PEOPLE_REGISTRY_TAG("b"); const std::string ATTRIBUTES_REGISTRY_TAG("c"); const std::string SAMPLE_COUNTS_TAG("d"); const std::string BUCKET_GATHERER_TAG("e"); const std::string DEFAULT_PERSON_NAME("-"); const std::string DEFAULT_ATTRIBUTE_NAME("-"); const std::string PERSON("person"); const std::string ATTRIBUTE("attribute"); const std::string EMPTY_STRING; namespace detail { //! Make sure \p features only includes supported features, doesn't //! contain any duplicates, etc. const CDataGatherer::TFeatureVec& sanitize(CDataGatherer::TFeatureVec& features, model_t::EAnalysisCategory gathererType) { std::size_t j = 0; for (std::size_t i = 0; i < features.size(); ++i) { switch (gathererType) { case model_t::E_EventRate: case model_t::E_PopulationEventRate: switch (features[i]) { CASE_INDIVIDUAL_COUNT: features[j] = features[i]; ++j; break; CASE_INDIVIDUAL_METRIC: LOG_ERROR(<< "Unexpected feature = " << model_t::print(features[i])); break; CASE_POPULATION_COUNT: features[j] = features[i]; ++j; break; CASE_POPULATION_METRIC: LOG_ERROR(<< "Unexpected feature = " << model_t::print(features[i])); break; } break; case model_t::E_Metric: case model_t::E_PopulationMetric: switch (features[i]) { CASE_INDIVIDUAL_COUNT: LOG_ERROR(<< "Unexpected feature = " << model_t::print(features[i])); break; CASE_INDIVIDUAL_METRIC: features[j] = features[i]; ++j; break; CASE_POPULATION_COUNT: LOG_ERROR(<< "Unexpected feature = " << model_t::print(features[i])); break; CASE_POPULATION_METRIC: features[j] = features[i]; ++j; break; } break; } } features.erase(features.begin() + j, features.end()); std::sort(features.begin(), features.end()); features.erase(std::unique(features.begin(), features.end()), features.end()); return features; } //! Wrapper which copies \p features. CDataGatherer::TFeatureVec sanitize(const CDataGatherer::TFeatureVec& features, model_t::EAnalysisCategory gathererType) { CDataGatherer::TFeatureVec result(features); return sanitize(result, gathererType); } //! Check if the gatherer is for population modelling. bool isPopulation(model_t::EAnalysisCategory gathererType) { switch (gathererType) { case model_t::E_EventRate: case model_t::E_Metric: return false; case model_t::E_PopulationEventRate: case model_t::E_PopulationMetric: return true; } return false; } } // detail:: } // unnamed:: const std::string CDataGatherer::EXPLICIT_NULL("null"); const std::size_t CDataGatherer::EXPLICIT_NULL_SUMMARY_COUNT(std::numeric_limits<std::size_t>::max()); const std::size_t CDataGatherer::ESTIMATED_MEM_USAGE_PER_BY_FIELD(20000); const std::size_t CDataGatherer::ESTIMATED_MEM_USAGE_PER_OVER_FIELD(1000); CDataGatherer::CDataGatherer(model_t::EAnalysisCategory gathererType, model_t::ESummaryMode summaryMode, const SModelParams& modelParams, std::string partitionFieldValue, const CSearchKey& key, const TFeatureVec& features, const CBucketGatherer::SBucketGathererInitData& bucketGathererInitData) : m_GathererType(gathererType), m_Features(detail::sanitize(features, gathererType)), m_SummaryMode(summaryMode), m_Params(modelParams), m_SearchKey(key), m_PartitionFieldValue(std::move(partitionFieldValue)), m_PeopleRegistry(PERSON, counter_t::E_TSADNumberNewPeople, counter_t::E_TSADNumberNewPeopleNotAllowed, counter_t::E_TSADNumberNewPeopleRecycled), m_AttributesRegistry(ATTRIBUTE, counter_t::E_TSADNumberNewAttributes, counter_t::E_TSADNumberNewAttributesNotAllowed, counter_t::E_TSADNumberNewAttributesRecycled), m_Population(detail::isPopulation(gathererType)), m_UseNull(key.useNull()) { std::sort(m_Features.begin(), m_Features.end()); this->createBucketGatherer(gathererType, bucketGathererInitData); } CDataGatherer::CDataGatherer(model_t::EAnalysisCategory gathererType, model_t::ESummaryMode summaryMode, const SModelParams& modelParams, std::string partitionFieldValue, const CSearchKey& key, const CBucketGatherer::SBucketGathererInitData& bucketGathererInitData, core::CStateRestoreTraverser& traverser) : m_GathererType(gathererType), m_SummaryMode(summaryMode), m_Params(modelParams), m_SearchKey(key), m_PartitionFieldValue(std::move(partitionFieldValue)), m_PeopleRegistry(PERSON, counter_t::E_TSADNumberNewPeople, counter_t::E_TSADNumberNewPeopleNotAllowed, counter_t::E_TSADNumberNewPeopleRecycled), m_AttributesRegistry(ATTRIBUTE, counter_t::E_TSADNumberNewAttributes, counter_t::E_TSADNumberNewAttributesNotAllowed, counter_t::E_TSADNumberNewAttributesRecycled), m_Population(detail::isPopulation(gathererType)), m_UseNull(key.useNull()) { auto func = [this, &bucketGathererInitData](core::CStateRestoreTraverser& traverser_) { return acceptRestoreTraverser(bucketGathererInitData, traverser_); }; if (traverser.traverseSubLevel(func) == false) { LOG_ERROR(<< "Failed to correctly restore data gatherer"); } } CDataGatherer::CDataGatherer(bool isForPersistence, const CDataGatherer& other) : m_GathererType(other.m_GathererType), m_Features(other.m_Features), m_SummaryMode(other.m_SummaryMode), m_Params(other.m_Params), m_SearchKey(other.m_SearchKey), m_PartitionFieldValue(other.m_PartitionFieldValue), m_PeopleRegistry(isForPersistence, other.m_PeopleRegistry), m_AttributesRegistry(isForPersistence, other.m_AttributesRegistry), m_Population(other.m_Population), m_UseNull(other.m_UseNull) { if (!isForPersistence) { LOG_ABORT(<< "This constructor only creates clones for persistence"); } m_BucketGatherer.reset(other.m_BucketGatherer->cloneForPersistence()); if (other.m_SampleCounts) { m_SampleCounts.reset(other.m_SampleCounts->cloneForPersistence()); } } CDataGatherer::~CDataGatherer() = default; CDataGatherer* CDataGatherer::cloneForPersistence() const { return new CDataGatherer(true, *this); } model_t::ESummaryMode CDataGatherer::summaryMode() const { return m_SummaryMode; } model::function_t::EFunction CDataGatherer::function() const { return function_t::function(this->features()); } bool CDataGatherer::isPopulation() const { return m_Population; } std::string CDataGatherer::description() const { return m_BucketGatherer->description(); } std::size_t CDataGatherer::maxDimension() const { return std::max(this->numberPeople(), this->numberAttributes()); } const std::string& CDataGatherer::partitionFieldName() const { return ml::core::unwrap_ref(m_SearchKey).partitionFieldName(); } const std::string& CDataGatherer::partitionFieldValue() const { return m_PartitionFieldValue; } const CSearchKey& CDataGatherer::searchKey() const { return m_SearchKey; } CDataGatherer::TStrVecCItr CDataGatherer::beginInfluencers() const { return m_BucketGatherer->beginInfluencers(); } CDataGatherer::TStrVecCItr CDataGatherer::endInfluencers() const { return m_BucketGatherer->endInfluencers(); } const std::string& CDataGatherer::personFieldName() const { return m_BucketGatherer->personFieldName(); } const std::string& CDataGatherer::attributeFieldName() const { return m_BucketGatherer->attributeFieldName(); } const std::string& CDataGatherer::valueFieldName() const { return m_BucketGatherer->valueFieldName(); } const CDataGatherer::TStrVec& CDataGatherer::fieldsOfInterest() const { return m_BucketGatherer->fieldsOfInterest(); } std::size_t CDataGatherer::numberByFieldValues() const { return this->isPopulation() ? this->numberActiveAttributes() : this->numberActivePeople(); } std::size_t CDataGatherer::numberOverFieldValues() const { return this->isPopulation() ? this->numberActivePeople() : 0; } bool CDataGatherer::processFields(const TStrCPtrVec& fieldValues, CEventData& result, CResourceMonitor& resourceMonitor) { return m_BucketGatherer->processFields(fieldValues, result, resourceMonitor); } bool CDataGatherer::addArrival(const TStrCPtrVec& fieldValues, CEventData& data, CResourceMonitor& resourceMonitor) { // We process fields even if we are in the first partial bucket so that // we add enough extra memory to the resource monitor in order to control // the number of partitions created. m_BucketGatherer->processFields(fieldValues, data, resourceMonitor); if (core_t::TTime const time = data.time(); time < m_BucketGatherer->earliestBucketStartTime()) { // Ignore records that are out of the latency window. // Records in an incomplete first bucket will end up here, // but we don't want to model these. return false; } return m_BucketGatherer->addEventData(data); } void CDataGatherer::sampleNow(core_t::TTime sampleBucketStart) { m_BucketGatherer->sampleNow(sampleBucketStart); } void CDataGatherer::skipSampleNow(core_t::TTime sampleBucketStart) { m_BucketGatherer->skipSampleNow(sampleBucketStart); } std::size_t CDataGatherer::numberFeatures() const { return m_Features.size(); } bool CDataGatherer::hasFeature(model_t::EFeature feature) const { return std::binary_search(m_Features.begin(), m_Features.end(), feature); } model_t::EFeature CDataGatherer::feature(std::size_t i) const { return m_Features[i]; } const CDataGatherer::TFeatureVec& CDataGatherer::features() const { return m_Features; } std::size_t CDataGatherer::numberActivePeople() const { return m_PeopleRegistry.numberActiveNames(); } std::size_t CDataGatherer::numberPeople() const { return m_PeopleRegistry.numberNames(); } bool CDataGatherer::personId(const std::string& person, std::size_t& result) const { return m_PeopleRegistry.id(person, result); } bool CDataGatherer::anyPersonId(std::size_t& result) const { return m_PeopleRegistry.anyId(result); } const std::string& CDataGatherer::personName(std::size_t pid) const { return this->personName(pid, DEFAULT_PERSON_NAME); } const std::string& CDataGatherer::personName(std::size_t pid, const std::string& fallback) const { return m_PeopleRegistry.name(pid, fallback); } void CDataGatherer::personNonZeroCounts(core_t::TTime time, TSizeUInt64PrVec& result) const { m_BucketGatherer->personNonZeroCounts(time, result); } void CDataGatherer::recyclePeople(const TSizeVec& peopleToRemove) { if (peopleToRemove.empty()) { return; } m_BucketGatherer->recyclePeople(peopleToRemove); if (!this->isPopulation() && m_SampleCounts) { m_SampleCounts->recycle(peopleToRemove); } m_PeopleRegistry.recycleNames(peopleToRemove, DEFAULT_PERSON_NAME); core::CProgramCounters::counter(counter_t::E_TSADNumberPrunedItems) += peopleToRemove.size(); } void CDataGatherer::removePeople(std::size_t lowestPersonToRemove) { if (lowestPersonToRemove >= this->numberPeople()) { return; } if (!this->isPopulation() && m_SampleCounts) { m_SampleCounts->remove(lowestPersonToRemove); } m_BucketGatherer->removePeople(lowestPersonToRemove); m_PeopleRegistry.removeNames(lowestPersonToRemove); } CDataGatherer::TSizeVec& CDataGatherer::recycledPersonIds() { return m_PeopleRegistry.recycledIds(); } bool CDataGatherer::isPersonActive(std::size_t pid) const { return m_PeopleRegistry.isIdActive(pid); } std::size_t CDataGatherer::addPerson(const std::string& person, CResourceMonitor& resourceMonitor, bool& addedPerson) { return m_PeopleRegistry.addName(person, m_BucketGatherer->currentBucketStartTime(), resourceMonitor, addedPerson); } std::size_t CDataGatherer::numberActiveAttributes() const { return m_AttributesRegistry.numberActiveNames(); } std::size_t CDataGatherer::numberAttributes() const { return m_AttributesRegistry.numberNames(); } bool CDataGatherer::attributeId(const std::string& attribute, std::size_t& result) const { return m_AttributesRegistry.id(attribute, result); } const std::string& CDataGatherer::attributeName(std::size_t cid) const { return this->attributeName(cid, DEFAULT_ATTRIBUTE_NAME); } const std::string& CDataGatherer::attributeName(std::size_t cid, const std::string& fallback) const { return m_AttributesRegistry.name(cid, fallback); } void CDataGatherer::recycleAttributes(const TSizeVec& attributesToRemove) { if (attributesToRemove.empty()) { return; } if (this->isPopulation() && m_SampleCounts) { m_SampleCounts->recycle(attributesToRemove); } m_BucketGatherer->recycleAttributes(attributesToRemove); m_AttributesRegistry.recycleNames(attributesToRemove, DEFAULT_ATTRIBUTE_NAME); core::CProgramCounters::counter(counter_t::E_TSADNumberPrunedItems) += attributesToRemove.size(); } void CDataGatherer::removeAttributes(std::size_t lowestAttributeToRemove) { if (lowestAttributeToRemove >= this->numberAttributes()) { return; } if (this->isPopulation() && m_SampleCounts) { m_SampleCounts->remove(lowestAttributeToRemove); } m_BucketGatherer->removeAttributes(lowestAttributeToRemove); m_AttributesRegistry.removeNames(lowestAttributeToRemove); } CDataGatherer::TSizeVec& CDataGatherer::recycledAttributeIds() { return m_AttributesRegistry.recycledIds(); } bool CDataGatherer::isAttributeActive(std::size_t cid) const { return m_AttributesRegistry.isIdActive(cid); } std::size_t CDataGatherer::addAttribute(const std::string& attribute, CResourceMonitor& resourceMonitor, bool& addedAttribute) { return m_AttributesRegistry.addName(attribute, m_BucketGatherer->currentBucketStartTime(), resourceMonitor, addedAttribute); } double CDataGatherer::sampleCount(std::size_t id) const { if (m_SampleCounts) { return static_cast<double>(m_SampleCounts->count(id)); } LOG_ERROR(<< "Sample count for non-metric gatherer"); return 0.0; } double CDataGatherer::effectiveSampleCount(std::size_t id) const { if (m_SampleCounts) { return m_SampleCounts->effectiveSampleCount(id); } LOG_ERROR(<< "Effective sample count for non-metric gatherer"); return 0.0; } void CDataGatherer::resetSampleCount(std::size_t id) { if (m_SampleCounts) { m_SampleCounts->resetSampleCount(*this, id); } } const CDataGatherer::TSampleCountsPtr& CDataGatherer::sampleCounts() const { return m_SampleCounts; } core_t::TTime CDataGatherer::currentBucketStartTime() const { return m_BucketGatherer->currentBucketStartTime(); } core_t::TTime CDataGatherer::bucketLength() const { return m_BucketGatherer->bucketLength(); } bool CDataGatherer::dataAvailable(core_t::TTime time) const { return m_BucketGatherer->dataAvailable(time); } bool CDataGatherer::validateSampleTimes(core_t::TTime& startTime, core_t::TTime endTime) const { return m_BucketGatherer->validateSampleTimes(startTime, endTime); } void CDataGatherer::timeNow(core_t::TTime time) { m_BucketGatherer->timeNow(time); } std::string CDataGatherer::printCurrentBucket() const { return m_BucketGatherer->printCurrentBucket(); } const CDataGatherer::TSizeSizePrUInt64UMap& CDataGatherer::bucketCounts(core_t::TTime time) const { return m_BucketGatherer->bucketCounts(time); } const CDataGatherer::TSizeSizePrOptionalStrPrUInt64UMapVec& CDataGatherer::influencerCounts(core_t::TTime time) const { return m_BucketGatherer->influencerCounts(time); } std::uint64_t CDataGatherer::checksum() const { std::uint64_t result = m_PeopleRegistry.checksum(); result = maths::common::CChecksum::calculate(result, m_AttributesRegistry); result = maths::common::CChecksum::calculate(result, m_SummaryMode); result = maths::common::CChecksum::calculate(result, m_Features); if (m_SampleCounts) { result = maths::common::CChecksum::calculate(result, m_SampleCounts->checksum(*this)); } result = maths::common::CChecksum::calculate(result, m_BucketGatherer); LOG_TRACE(<< "checksum = " << result); return result; } void CDataGatherer::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CDataGatherer"); core::memory_debug::dynamicSize("m_Features", m_Features, mem); core::memory_debug::dynamicSize("m_PeopleRegistry", m_PeopleRegistry, mem); core::memory_debug::dynamicSize("m_AttributesRegistry", m_AttributesRegistry, mem); core::memory_debug::dynamicSize("m_SampleCounts", m_SampleCounts, mem); core::memory_debug::dynamicSize("m_BucketGatherer", m_BucketGatherer, mem); } std::size_t CDataGatherer::memoryUsage() const { std::size_t mem = core::memory::dynamicSize(m_Features); mem += core::memory::dynamicSize(m_PartitionFieldValue); mem += core::memory::dynamicSize(m_PeopleRegistry); mem += core::memory::dynamicSize(m_AttributesRegistry); mem += core::memory::dynamicSize(m_SampleCounts); mem += core::memory::dynamicSize(m_BucketGatherer); return mem; } bool CDataGatherer::useNull() const { return m_UseNull; } void CDataGatherer::clear() { m_PeopleRegistry.clear(); m_AttributesRegistry.clear(); if (m_SampleCounts) { m_SampleCounts->clear(); } if (m_BucketGatherer) { m_BucketGatherer->clear(); } } bool CDataGatherer::resetBucket(core_t::TTime bucketStart) { return m_BucketGatherer->resetBucket(bucketStart); } void CDataGatherer::releaseMemory(core_t::TTime samplingCutoffTime) { if (this->isPopulation()) { m_BucketGatherer->releaseMemory(samplingCutoffTime); } } const SModelParams& CDataGatherer::params() const { return m_Params; } void CDataGatherer::acceptPersistInserter(core::CStatePersistInserter& inserter) const { for (auto m_Feature : m_Features) { inserter.insertValue(FEATURE_TAG, static_cast<int>(m_Feature)); } inserter.insertLevel(PEOPLE_REGISTRY_TAG, [this](core::CStatePersistInserter& inserter_) { m_PeopleRegistry.acceptPersistInserter(inserter_); }); inserter.insertLevel(ATTRIBUTES_REGISTRY_TAG, [this](core::CStatePersistInserter& inserter_) { m_AttributesRegistry.acceptPersistInserter(inserter_); }); if (m_SampleCounts) { inserter.insertLevel(SAMPLE_COUNTS_TAG, [sampleCounts = m_SampleCounts.get()]( core::CStatePersistInserter & inserter_) { sampleCounts->acceptPersistInserter(inserter_); }); } inserter.insertLevel(BUCKET_GATHERER_TAG, [this](core::CStatePersistInserter& inserter_) { persistBucketGatherers(inserter_); }); } bool CDataGatherer::determineMetricCategory(TMetricCategoryVec& fieldMetricCategories) const { if (m_Features.empty()) { LOG_WARN(<< "No features to determine metric category from"); return false; } if (m_Features.size() > 1) { LOG_WARN(<< m_Features.size() << " features to determine metric category " "from - only the first will be used"); } model_t::EMetricCategory result; if (model_t::metricCategory(m_Features.front(), result) == false) { LOG_ERROR(<< "Unable to map feature " << model_t::print(m_Features.front()) << " to a metric category"); return false; } fieldMetricCategories.push_back(result); return true; } bool CDataGatherer::extractCountFromField(const std::string& fieldName, const std::string* fieldValue, std::size_t& count) { if (fieldValue == nullptr) { // Treat not present as explicit null count = EXPLICIT_NULL_SUMMARY_COUNT; return true; } std::string fieldValueCopy(*fieldValue); core::CStringUtils::trimWhitespace(fieldValueCopy); if (fieldValueCopy.empty() || fieldValueCopy == EXPLICIT_NULL) { count = EXPLICIT_NULL_SUMMARY_COUNT; return true; } double count_; if (core::CStringUtils::stringToType(fieldValueCopy, count_) == false || count_ < 0.0) { LOG_ERROR(<< "Unable to extract count " << fieldName << " from " << fieldValueCopy); return false; } count = static_cast<std::size_t>(count_ + 0.5); // Treat count of 0 as a failure to extract. This will cause the record to be ignored. return count > 0; } bool CDataGatherer::extractMetricFromField(const std::string& fieldName, std::string fieldValue, TDouble1Vec& result) const { result.clear(); core::CStringUtils::trimWhitespace(fieldValue); if (fieldValue.empty()) { LOG_WARN(<< "Configured metric " << fieldName << " not present in event"); return false; } const std::string& delimiter = m_Params.get().s_MultivariateComponentDelimiter; // Split the string up by the delimiter and parse each token separately. std::size_t first = 0; do { std::size_t const last = fieldValue.find(delimiter, first); double value; // Avoid a string duplication in the (common) case of only one value bool const convertedOk = (first == 0 && last == std::string::npos) ? core::CStringUtils::stringToType(fieldValue, value) : core::CStringUtils::stringToType( fieldValue.substr(first, last - first), value); if (!convertedOk) { LOG_ERROR(<< "Unable to extract " << fieldName << " from " << fieldValue); result.clear(); return false; } if (maths::common::CMathsFuncs::isFinite(value) == false) { LOG_ERROR(<< "Bad value for " << fieldName << " from " << fieldValue); result.clear(); return false; } result.push_back(value); first = last + (last != std::string::npos ? delimiter.length() : 0); } while (first != std::string::npos); return true; } core_t::TTime CDataGatherer::earliestBucketStartTime() const { return m_BucketGatherer->earliestBucketStartTime(); } bool CDataGatherer::checkInvariants() const { if (m_BucketGatherer == nullptr) { LOG_ERROR(<< "No bucket gatherer"); return false; } if (m_PeopleRegistry.checkInvariants() == false) { LOG_ERROR(<< "People registry invariants violated"); return false; } if (m_AttributesRegistry.checkInvariants() == false) { LOG_ERROR(<< "Attributes registry invariants violated"); return false; } return true; } bool CDataGatherer::acceptRestoreTraverser(const CBucketGatherer::SBucketGathererInitData& bucketGathererInitData, core::CStateRestoreTraverser& traverser) { this->clear(); m_Features.clear(); do { const std::string& name = traverser.name(); if (name == FEATURE_TAG) { int feature(-1); if (core::CStringUtils::stringToType(traverser.value(), feature) == false || feature < 0) { LOG_ERROR(<< "Invalid feature in " << traverser.value()); return false; } m_Features.push_back(static_cast<model_t::EFeature>(feature)); continue; } RESTORE(PEOPLE_REGISTRY_TAG, traverser.traverseSubLevel(std::bind( &CDynamicStringIdRegistry::acceptRestoreTraverser, &m_PeopleRegistry, std::placeholders::_1))) RESTORE(ATTRIBUTES_REGISTRY_TAG, traverser.traverseSubLevel( std::bind(&CDynamicStringIdRegistry::acceptRestoreTraverser, &m_AttributesRegistry, std::placeholders::_1))) RESTORE_SETUP_TEARDOWN( SAMPLE_COUNTS_TAG, m_SampleCounts = std::make_unique<CSampleCounts>(0), traverser.traverseSubLevel(std::bind(&CSampleCounts::acceptRestoreTraverser, m_SampleCounts.get(), std::placeholders::_1)), /**/) RESTORE(BUCKET_GATHERER_TAG, traverser.traverseSubLevel(std::bind( &CDataGatherer::restoreBucketGatherer, this, std::cref(bucketGathererInitData), std::placeholders::_1))) } while (traverser.next()); return true; } bool CDataGatherer::restoreBucketGatherer(const CBucketGatherer::SBucketGathererInitData& bucketGathererInitData, core::CStateRestoreTraverser& traverser) { do { const std::string& name = traverser.name(); if (name == CBucketGatherer::EVENTRATE_BUCKET_GATHERER_TAG) { m_BucketGatherer = std::make_unique<CEventRateBucketGatherer>( *this, bucketGathererInitData, traverser); if (m_BucketGatherer == nullptr) { LOG_ERROR(<< "Failed to create event rate bucket gatherer"); return false; } } else if (name == CBucketGatherer::METRIC_BUCKET_GATHERER_TAG) { m_BucketGatherer = std::make_unique<CMetricBucketGatherer>( *this, bucketGathererInitData, traverser); if (m_BucketGatherer == nullptr) { LOG_ERROR(<< "Failed to create metric bucket gatherer"); return false; } } } while (traverser.next()); if (m_BucketGatherer == nullptr) { LOG_ERROR(<< "Failed to restore any bucket gatherer"); return false; } return true; } void CDataGatherer::persistBucketGatherers(core::CStatePersistInserter& inserter) const { inserter.insertLevel( m_BucketGatherer->persistenceTag(), [capture0 = m_BucketGatherer.get()]( core::CStatePersistInserter & inserter_) { capture0->acceptPersistInserter(inserter_); }); } void CDataGatherer::createBucketGatherer(model_t::EAnalysisCategory gathererType, const CBucketGatherer::SBucketGathererInitData& initData) { switch (gathererType) { case model_t::E_EventRate: case model_t::E_PopulationEventRate: m_BucketGatherer = std::make_unique<CEventRateBucketGatherer>(*this, initData); break; case model_t::E_Metric: case model_t::E_PopulationMetric: m_SampleCounts = std::make_unique<CSampleCounts>(initData.s_SampleOverrideCount); m_BucketGatherer = std::make_unique<CMetricBucketGatherer>(*this, initData); break; } } } }