lib/model/CMetricBucketGatherer.cc (1,378 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <model/CMetricBucketGatherer.h> #include <core/CLogger.h> #include <core/CMemoryDefStd.h> #include <core/CProgramCounters.h> #include <maths/common/CBasicStatistics.h> #include <maths/common/CBasicStatisticsPersist.h> #include <maths/common/CChecksum.h> #include <maths/common/COrderings.h> #include <model/CDataGatherer.h> #include <model/CGathererTools.h> #include <model/CResourceMonitor.h> #include <model/CSampleCounts.h> #include <model/CSampleGatherer.h> #include <model/CSearchKey.h> #include <boost/tuple/tuple.hpp> #include <boost/unordered_map.hpp> #include <any> #include <atomic> #include <map> #include <utility> #include <vector> namespace ml { namespace model { namespace { using TSizeSizePr = std::pair<std::size_t, std::size_t>; using TDoubleVec = std::vector<double>; using TSizeVec = std::vector<std::size_t>; using TStrVec = std::vector<std::string>; using TStrCRef = std::reference_wrapper<const std::string>; using TStrCRefStrCRefPr = std::pair<TStrCRef, TStrCRef>; using TStrCRefStrCRefPrUInt64Map = std::map<TStrCRefStrCRefPr, std::uint64_t, maths::common::COrderings::SLess>; using TSampleVec = std::vector<CSample>; using TSizeMeanGathererUMap = boost::unordered_map<std::size_t, CGathererTools::TMeanGatherer>; using TSizeSizeMeanGathererUMapUMap = boost::unordered_map<std::size_t, TSizeMeanGathererUMap>; using TSizeMedianGathererUMap = boost::unordered_map<std::size_t, CGathererTools::TMedianGatherer>; using TSizeSizeMedianGathererUMapUMap = boost::unordered_map<std::size_t, TSizeMedianGathererUMap>; using TSizeMinGathererUMap = boost::unordered_map<std::size_t, CGathererTools::TMinGatherer>; using TSizeSizeMinGathererUMapUMap = boost::unordered_map<std::size_t, TSizeMinGathererUMap>; using TSizeMaxGathererUMap = boost::unordered_map<std::size_t, CGathererTools::TMaxGatherer>; using TSizeSizeMaxGathererUMapUMap = boost::unordered_map<std::size_t, TSizeMaxGathererUMap>; using TSizeVarianceGathererUMap = boost::unordered_map<std::size_t, CGathererTools::TVarianceGatherer>; using TSizeSizeVarianceGathererUMapUMap = boost::unordered_map<std::size_t, TSizeVarianceGathererUMap>; using TSizeSumGathererUMap = boost::unordered_map<std::size_t, CGathererTools::CSumGatherer>; using TSizeSizeSumGathererUMapUMap = boost::unordered_map<std::size_t, TSizeSumGathererUMap>; using TSizeMultivariateMeanGathererUMap = boost::unordered_map<std::size_t, CGathererTools::TMultivariateMeanGatherer>; using TSizeSizeMultivariateMeanGathererUMapUMap = boost::unordered_map<std::size_t, TSizeMultivariateMeanGathererUMap>; using TSizeMultivariateMinGathererUMap = boost::unordered_map<std::size_t, CGathererTools::TMultivariateMinGatherer>; using TSizeSizeMultivariateMinGathererUMapUMap = boost::unordered_map<std::size_t, TSizeMultivariateMinGathererUMap>; using TSizeMultivariateMaxGathererUMap = boost::unordered_map<std::size_t, CGathererTools::TMultivariateMaxGatherer>; using TSizeSizeMultivariateMaxGathererUMapUMap = boost::unordered_map<std::size_t, TSizeMultivariateMaxGathererUMap>; using TSizeFeatureDataPr = std::pair<std::size_t, SMetricFeatureData>; using TSizeFeatureDataPrVec = std::vector<TSizeFeatureDataPr>; using TSizeSizePrFeatureDataPr = std::pair<TSizeSizePr, SMetricFeatureData>; using TSizeSizePrFeatureDataPrVec = std::vector<TSizeSizePrFeatureDataPr>; using TSizeSizePrUInt64UMap = CMetricBucketGatherer::TSizeSizePrUInt64UMap; using TCategorySizePr = CMetricBucketGatherer::TCategorySizePr; using TCategorySizePrAnyMap = CMetricBucketGatherer::TCategorySizePrAnyMap; using TCategorySizePrAnyMapItr = CMetricBucketGatherer::TCategorySizePrAnyMapItr; using TCategorySizePrAnyMapCItr = CMetricBucketGatherer::TCategorySizePrAnyMapCItr; using TOptionalStrVec = CBucketGatherer::TOptionalStrVec; template<typename T> using TSizeTUMap = boost::unordered_map<std::size_t, T>; template<typename T> using TSizeSizeTUMapUMap = boost::unordered_map<std::size_t, TSizeTUMap<T>>; const std::string CURRENT_VERSION("1"); // We use short field names to reduce the state size const std::string BASE_TAG("a"); const std::string VERSION_TAG("b"); const std::string MEAN_TAG("e"); const std::string MIN_TAG("f"); const std::string MAX_TAG("g"); const std::string SUM_TAG("h"); const std::string MULTIVARIATE_MEAN_TAG("i"); const std::string MULTIVARIATE_MIN_TAG("j"); const std::string MULTIVARIATE_MAX_TAG("k"); const std::string MEDIAN_TAG("l"); const std::string VARIANCE_TAG("m"); const std::string EMPTY_STRING; const TDoubleVec EMPTY_DOUBLE_VEC; // Nested tags. const std::string ATTRIBUTE_TAG("a"); const std::string DATA_TAG("b"); const std::string PERSON_TAG("c"); //! Get the by field name. const std::string& byField(bool population, const TStrVec& fieldNames) { return population ? fieldNames[1] : fieldNames[0]; } //! Get the over field name. const std::string& overField(bool population, const TStrVec& fieldNames) { return population ? fieldNames[0] : EMPTY_STRING; } template<model_t::EMetricCategory> struct SDataType {}; template<> struct SDataType<model_t::E_Mean> { using Type = TSizeSizeMeanGathererUMapUMap; }; template<> struct SDataType<model_t::E_Median> { using Type = TSizeSizeMedianGathererUMapUMap; }; template<> struct SDataType<model_t::E_Min> { using Type = TSizeSizeMinGathererUMapUMap; }; template<> struct SDataType<model_t::E_Max> { using Type = TSizeSizeMaxGathererUMapUMap; }; template<> struct SDataType<model_t::E_Sum> { using Type = TSizeSizeSumGathererUMapUMap; }; template<> struct SDataType<model_t::E_Variance> { using Type = TSizeSizeVarianceGathererUMapUMap; }; template<> struct SDataType<model_t::E_MultivariateMean> { using Type = TSizeSizeMultivariateMeanGathererUMapUMap; }; template<> struct SDataType<model_t::E_MultivariateMin> { using Type = TSizeSizeMultivariateMinGathererUMapUMap; }; template<> struct SDataType<model_t::E_MultivariateMax> { using Type = TSizeSizeMultivariateMaxGathererUMapUMap; }; template<typename ITR, typename T> struct SMaybeConst {}; template<typename T> struct SMaybeConst<TCategorySizePrAnyMapItr, T> { using Type = T; }; template<typename T> struct SMaybeConst<TCategorySizePrAnyMapCItr, T> { using Type = const T; }; //! Register the callbacks for computing the size of feature data gatherers //! with \p visitor. template<typename VISITOR> void registerMemoryCallbacks(VISITOR& visitor) { visitor.template registerCallback<TSizeSizeMeanGathererUMapUMap>(); visitor.template registerCallback<TSizeSizeMedianGathererUMapUMap>(); visitor.template registerCallback<TSizeSizeMinGathererUMapUMap>(); visitor.template registerCallback<TSizeSizeMaxGathererUMapUMap>(); visitor.template registerCallback<TSizeSizeVarianceGathererUMapUMap>(); visitor.template registerCallback<TSizeSizeSumGathererUMapUMap>(); visitor.template registerCallback<TSizeSizeMultivariateMeanGathererUMapUMap>(); visitor.template registerCallback<TSizeSizeMultivariateMinGathererUMapUMap>(); visitor.template registerCallback<TSizeSizeMultivariateMaxGathererUMapUMap>(); } //! Register the callbacks for computing the size of feature data gatherers. void registerMemoryCallbacks() { static std::atomic_flag once = ATOMIC_FLAG_INIT; if (once.test_and_set() == false) { registerMemoryCallbacks(core::memory::anyVisitor()); registerMemoryCallbacks(core::memory_debug::anyVisitor()); } } //! Apply a function \p f to a gatherer held as a value by map entry \p i //! of an explicit metric category template<model_t::EMetricCategory CATEGORY, typename ITR, typename F> void applyFunc(ITR i, const F& f) { using TDataType = typename SDataType<CATEGORY>::Type; f(i->first, std::any_cast<typename SMaybeConst<ITR, TDataType>::Type&>(i->second)); } //! Apply a function \p f to all the gatherers held in [\p begin, \p end). template<typename ITR, typename F> bool applyFunc(ITR begin, ITR end, const F& f) { for (ITR i = begin; i != end; ++i) { model_t::EMetricCategory category = i->first.first; try { switch (category) { case model_t::E_Mean: applyFunc<model_t::E_Mean>(i, f); break; case model_t::E_Median: applyFunc<model_t::E_Median>(i, f); break; case model_t::E_Min: applyFunc<model_t::E_Min>(i, f); break; case model_t::E_Max: applyFunc<model_t::E_Max>(i, f); break; case model_t::E_Variance: applyFunc<model_t::E_Variance>(i, f); break; case model_t::E_Sum: applyFunc<model_t::E_Sum>(i, f); break; case model_t::E_MultivariateMean: applyFunc<model_t::E_MultivariateMean>(i, f); break; case model_t::E_MultivariateMin: applyFunc<model_t::E_MultivariateMin>(i, f); break; case model_t::E_MultivariateMax: applyFunc<model_t::E_MultivariateMax>(i, f); break; } } catch (const std::exception& e) { LOG_ERROR(<< "Apply failed for " << category << ": " << e.what()); return false; } } return true; } //! Apply a function \p f to all the gatherers held in \p data. template<typename T, typename F> bool applyFunc(T& data, const F& f) { return applyFunc(data.begin(), data.end(), f); } //! Initialize feature data for a specific category template<model_t::EMetricCategory CATEGORY> void initializeFeatureDataInstance(std::size_t dimension, TCategorySizePrAnyMap& featureData) { using Type = typename SDataType<CATEGORY>::Type; featureData[{CATEGORY, dimension}] = Type(); } //! Persists the data gatherers (for individual metric categories). class CPersistFeatureData { public: template<typename T> void operator()(const TCategorySizePr& category, const TSizeSizeTUMapUMap<T>& data, core::CStatePersistInserter& inserter) const { if (data.empty()) { inserter.insertValue(this->tagName(category), EMPTY_STRING); return; } inserter.insertLevel( this->tagName(category), std::bind<void>(SDoPersist(), std::cref(data), std::placeholders::_1)); } private: std::string tagName(const TCategorySizePr& category) const { switch (category.first) { case model_t::E_Mean: return MEAN_TAG; case model_t::E_Median: return MEDIAN_TAG; case model_t::E_Min: return MIN_TAG; case model_t::E_Max: return MAX_TAG; case model_t::E_Variance: return VARIANCE_TAG; case model_t::E_Sum: return SUM_TAG; case model_t::E_MultivariateMean: return MULTIVARIATE_MEAN_TAG + core::CStringUtils::typeToString(category.second); case model_t::E_MultivariateMin: return MULTIVARIATE_MIN_TAG + core::CStringUtils::typeToString(category.second); case model_t::E_MultivariateMax: return MULTIVARIATE_MAX_TAG + core::CStringUtils::typeToString(category.second); } return EMPTY_STRING; } struct SDoPersist { template<typename T> void operator()(const TSizeSizeTUMapUMap<T>& data, core::CStatePersistInserter& inserter) const { using TSizeSizeTUMapUMapCItr = typename TSizeSizeTUMapUMap<T>::const_iterator; std::vector<TSizeSizeTUMapUMapCItr> dataItrs; dataItrs.reserve(data.size()); for (auto i = data.cbegin(); i != data.cend(); ++i) { dataItrs.push_back(i); } std::sort(dataItrs.begin(), dataItrs.end(), [](TSizeSizeTUMapUMapCItr lhs, TSizeSizeTUMapUMapCItr rhs) { return lhs->first < rhs->first; }); for (auto itr : dataItrs) { inserter.insertLevel(ATTRIBUTE_TAG, std::bind<void>(SDoPersist(), itr->first, std::cref(itr->second), std::placeholders::_1)); } } template<typename T> void operator()(std::size_t cid, const TSizeTUMap<T>& pidMap, core::CStatePersistInserter& inserter) const { inserter.insertValue(ATTRIBUTE_TAG, cid); using TSizeTUMapCItr = typename TSizeTUMap<T>::const_iterator; std::vector<TSizeTUMapCItr> pidItrs; pidItrs.reserve(pidMap.size()); for (auto i = pidMap.cbegin(); i != pidMap.cend(); ++i) { pidItrs.push_back(i); } std::sort(pidItrs.begin(), pidItrs.end(), [](TSizeTUMapCItr lhs, TSizeTUMapCItr rhs) { return lhs->first < rhs->first; }); for (auto itr : pidItrs) { inserter.insertLevel(PERSON_TAG, std::bind<void>(SDoPersist(), itr->first, std::cref(itr->second), std::placeholders::_1)); } } template<typename T> void operator()(std::size_t pid, const T& data, core::CStatePersistInserter& inserter) const { inserter.insertValue(PERSON_TAG, pid); inserter.insertLevel(DATA_TAG, std::bind<void>(&T::acceptPersistInserter, &data, std::placeholders::_1)); } }; }; //! Restores the data gatherers (for individual metric categories). template<model_t::EMetricCategory CATEGORY> class CRestoreFeatureData { public: bool operator()(core::CStateRestoreTraverser& traverser, std::size_t dimension, bool isNewVersion, const CMetricBucketGatherer& gatherer, TCategorySizePrAnyMap& result) const { std::any& data = result[{CATEGORY, dimension}]; return this->restore(traverser, dimension, isNewVersion, gatherer, data); } private: //! Add a restored data gatherer to \p result. bool restore(core::CStateRestoreTraverser& traverser, std::size_t dimension, bool isNewVersion, const CMetricBucketGatherer& gatherer, std::any& result) const { using Type = typename SDataType<CATEGORY>::Type; if (!result.has_value()) { result = Type(); } Type& data = *std::any_cast<Type>(&result); // An empty sub-level implies a person with 100% invalid data. if (!traverser.hasSubLevel()) { return true; } if (isNewVersion) { return traverser.traverseSubLevel( std::bind<bool>(CDoNewRestore(dimension), std::placeholders::_1, std::cref(gatherer), std::ref(data))); } else { return traverser.traverseSubLevel( std::bind<bool>(CDoOldRestore(dimension), std::placeholders::_1, std::cref(gatherer), std::ref(data))); } } //! \brief Responsible for restoring individual gatherers. class CDoNewRestore { public: CDoNewRestore(std::size_t dimension) : m_Dimension(dimension) {} template<typename T> bool operator()(core::CStateRestoreTraverser& traverser, const CMetricBucketGatherer& gatherer, TSizeSizeTUMapUMap<T>& result) const { do { const std::string& name = traverser.name(); if (name == ATTRIBUTE_TAG) { if (traverser.traverseSubLevel(std::bind<bool>( &CDoNewRestore::restoreAttributes<T>, this, std::placeholders::_1, std::cref(gatherer), std::ref(result))) == false) { LOG_ERROR(<< "Invalid data in " << traverser.value()); return false; } } } while (traverser.next()); return true; } template<typename T> bool restoreAttributes(core::CStateRestoreTraverser& traverser, const CMetricBucketGatherer& gatherer, TSizeSizeTUMapUMap<T>& result) const { std::size_t lastCid(0); bool seenCid(false); do { const std::string& name = traverser.name(); if (name == ATTRIBUTE_TAG) { if (core::CStringUtils::stringToType(traverser.value(), lastCid) == false) { LOG_ERROR(<< "Invalid attribute ID in " << traverser.value()); return false; } seenCid = true; result[lastCid] = TSizeTUMap<T>(1); } else if (name == PERSON_TAG) { if (!seenCid) { LOG_ERROR(<< "Incorrect format - person before attribute ID in " << traverser.value()); return false; } if (traverser.traverseSubLevel(std::bind<bool>( &CDoNewRestore::restorePeople<T>, this, std::placeholders::_1, std::cref(gatherer), std::ref(result[lastCid]))) == false) { LOG_ERROR(<< "Invalid data in " << traverser.value()); return false; } } } while (traverser.next()); return true; } template<typename T> bool restorePeople(core::CStateRestoreTraverser& traverser, const CMetricBucketGatherer& gatherer, TSizeTUMap<T>& result) const { std::size_t lastPid(0); bool seenPid(false); do { const std::string& name = traverser.name(); if (name == PERSON_TAG) { if (core::CStringUtils::stringToType(traverser.value(), lastPid) == false) { LOG_ERROR(<< "Invalid person ID in " << traverser.value()); return false; } seenPid = true; } else if (name == DATA_TAG) { if (!seenPid) { LOG_ERROR(<< "Incorrect format - data before person ID in " << traverser.value()); return false; } T initial(gatherer.dataGatherer().params(), m_Dimension, gatherer.currentBucketStartTime(), gatherer.bucketLength(), gatherer.beginInfluencers(), gatherer.endInfluencers()); if (traverser.traverseSubLevel( std::bind<bool>(&T::acceptRestoreTraverser, &initial, std::placeholders::_1)) == false) { LOG_ERROR(<< "Invalid data in " << traverser.value()); return false; } result.emplace(lastPid, initial); } } while (traverser.next()); return true; } private: std::size_t m_Dimension; }; //! \brief Responsible for restoring individual gatherers. class CDoOldRestore { public: CDoOldRestore(std::size_t dimension) : m_Dimension(dimension) {} template<typename T> bool operator()(core::CStateRestoreTraverser& traverser, const CMetricBucketGatherer& gatherer, TSizeSizeTUMapUMap<T>& result) const { bool isPopulation = gatherer.dataGatherer().isPopulation(); if (isPopulation) { this->restorePopulation(traverser, gatherer, result); } else { this->restoreIndividual(traverser, gatherer, result); } return true; } template<typename T> bool restoreIndividual(core::CStateRestoreTraverser& traverser, const CMetricBucketGatherer& gatherer, TSizeSizeTUMapUMap<T>& result) const { std::size_t pid(0); do { const std::string& name = traverser.name(); if (name == DATA_TAG) { T initial(gatherer.dataGatherer().params(), m_Dimension, gatherer.currentBucketStartTime(), gatherer.bucketLength(), gatherer.beginInfluencers(), gatherer.endInfluencers()); if (traverser.traverseSubLevel( std::bind(&T::acceptRestoreTraverser, &initial, std::placeholders::_1)) == false) { LOG_ERROR(<< "Invalid data in " << traverser.value()); return false; } result[model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID].emplace(pid, initial); pid++; } } while (traverser.next()); return true; } template<typename T> bool restorePopulation(core::CStateRestoreTraverser& traverser, const CMetricBucketGatherer& gatherer, TSizeSizeTUMapUMap<T>& result) const { std::size_t pid; std::size_t lastCid(0); bool seenCid(false); do { const std::string& name = traverser.name(); if (name == ATTRIBUTE_TAG) { if (core::CStringUtils::stringToType(traverser.value(), lastCid) == false) { LOG_ERROR(<< "Invalid attribute ID in " << traverser.value()); return false; } seenCid = true; } else if (name == DATA_TAG) { if (!seenCid) { LOG_ERROR(<< "Incorrect format - data before attribute ID in " << traverser.value()); return false; } T initial(gatherer.dataGatherer().params(), m_Dimension, gatherer.currentBucketStartTime(), gatherer.bucketLength(), gatherer.beginInfluencers(), gatherer.endInfluencers()); if (traverser.traverseSubLevel( std::bind(&T::acceptRestoreTraverser, &initial, std::placeholders::_1)) == false) { LOG_ERROR(<< "Invalid data in " << traverser.value()); return false; } auto& pidMap = result[lastCid]; pid = pidMap.size(); pidMap.emplace(pid, initial); } } while (traverser.next()); return true; } private: std::size_t m_Dimension; }; }; //! Removes the people from the data gatherers. struct SRemovePeople { public: template<typename T> void operator()(const TCategorySizePr& /*category*/, TSizeSizeTUMapUMap<T>& data, std::size_t begin, std::size_t end) const { for (auto& cidEntry : data) { for (std::size_t pid = begin; pid < end; ++pid) { cidEntry.second.erase(pid); } } } template<typename T> void operator()(const TCategorySizePr& /*category*/, TSizeSizeTUMapUMap<T>& data, const TSizeVec& peopleToRemove) const { for (auto& cidEntry : data) { for (auto pid : peopleToRemove) { cidEntry.second.erase(pid); } } } }; //! Removes attributes from the data gatherers. struct SRemoveAttributes { template<typename T> void operator()(const TCategorySizePr& /*category*/, TSizeSizeTUMapUMap<T>& data, const TSizeVec& attributesToRemove) const { for (auto cid : attributesToRemove) { data.erase(cid); } } template<typename T> void operator()(const TCategorySizePr& /*category*/, TSizeSizeTUMapUMap<T>& data, std::size_t begin, std::size_t end) const { for (std::size_t cid = begin; cid < end; ++cid) { data.erase(cid); } } }; //! Sample the metric statistics. struct SDoSample { public: template<typename T> void operator()(const TCategorySizePr& /*category*/, TSizeSizeTUMapUMap<T>& data, core_t::TTime time, const CMetricBucketGatherer& gatherer, CSampleCounts& sampleCounts) const { for (const auto& count : gatherer.bucketCounts(time)) { std::size_t pid = CDataGatherer::extractPersonId(count); std::size_t cid = CDataGatherer::extractAttributeId(count); std::size_t activeId = gatherer.dataGatherer().isPopulation() ? cid : pid; auto cidEntry = data.find(cid); if (cidEntry == data.end()) { LOG_ERROR(<< "No gatherer for attribute " << gatherer.dataGatherer().attributeName(cid) << " of person " << gatherer.dataGatherer().personName(pid)); } else { auto pidEntry = cidEntry->second.find(pid); if (pidEntry == cidEntry->second.end()) { LOG_ERROR(<< "No gatherer for attribute " << gatherer.dataGatherer().attributeName(cid) << " of person " << gatherer.dataGatherer().personName(pid)); } else if (pidEntry->second.sample(time, sampleCounts.count(activeId))) { sampleCounts.updateSampleVariance(activeId); } } } } }; //! Stably hashes the collection of data gatherers. struct SHash { public: template<typename T> void operator()(const TCategorySizePr& /*category*/, const TSizeSizeTUMapUMap<T>& data, const CMetricBucketGatherer& gatherer, TStrCRefStrCRefPrUInt64Map& hashes) const { for (const auto& cidEntry : data) { std::size_t cid = cidEntry.first; if (gatherer.dataGatherer().isAttributeActive(cid)) { auto cidName = TStrCRef(gatherer.dataGatherer().attributeName(cid)); for (const auto& pidEntry : cidEntry.second) { std::size_t pid = pidEntry.first; if (gatherer.dataGatherer().isPersonActive(pid)) { auto pidName = TStrCRef(gatherer.dataGatherer().personName(pid)); hashes.emplace(std::piecewise_construct, std::forward_as_tuple(cidName, pidName), std::forward_as_tuple(pidEntry.second.checksum())); } } } } } }; //! Extracts feature data from a collection of gatherers. struct SExtractFeatureData { public: using TFeatureAnyPr = std::pair<model_t::EFeature, std::any>; using TFeatureAnyPrVec = std::vector<TFeatureAnyPr>; public: template<typename T> void operator()(const TCategorySizePr& /*category*/, const TSizeSizeTUMapUMap<T>& data, const CMetricBucketGatherer& gatherer, model_t::EFeature feature, core_t::TTime time, core_t::TTime bucketLength, TFeatureAnyPrVec& result) const { if (gatherer.dataGatherer().isPopulation()) { result.emplace_back(feature, TSizeSizePrFeatureDataPrVec()); this->featureData( data, gatherer, time, bucketLength, this->isSum(feature), *std::any_cast<TSizeSizePrFeatureDataPrVec>(&result.back().second)); } else { result.emplace_back(feature, TSizeFeatureDataPrVec()); this->featureData( data, gatherer, time, bucketLength, this->isSum(feature), *std::any_cast<TSizeFeatureDataPrVec>(&result.back().second)); } } private: static const TSampleVec ZERO_SAMPLE; private: bool isSum(model_t::EFeature feature) const { return feature == model_t::E_IndividualSumByBucketAndPerson || feature == model_t::E_IndividualLowSumByBucketAndPerson || feature == model_t::E_IndividualHighSumByBucketAndPerson; } template<typename T, typename U> void featureData(const TSizeSizeTUMapUMap<T>& data, const CMetricBucketGatherer& gatherer, core_t::TTime time, core_t::TTime bucketLength, bool isSum, U& result) const { result.clear(); if (isSum) { if (data.empty() == false) { auto& pidMap = data.begin()->second; result.reserve(pidMap.size()); for (auto& pidEntry : pidMap) { std::size_t pid = pidEntry.first; if (gatherer.hasExplicitNullsOnly( time, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID) == false) { this->featureData(pidEntry.second, gatherer, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, time, bucketLength, result); } } } } else { const TSizeSizePrUInt64UMap& counts = gatherer.bucketCounts(time); result.reserve(counts.size()); for (const auto& count : counts) { std::size_t cid = CDataGatherer::extractAttributeId(count); auto cidEntry = data.find(cid); if (cidEntry == data.end()) { LOG_ERROR(<< "No gatherers for attribute " << gatherer.dataGatherer().attributeName(cid)); continue; } std::size_t pid = CDataGatherer::extractPersonId(count); auto pidEntry = cidEntry->second.find(pid); if (pidEntry == cidEntry->second.end()) { LOG_ERROR(<< "No gatherers for person " << gatherer.dataGatherer().personName(pid)); continue; } this->featureData(pidEntry->second, gatherer, pid, cid, time, bucketLength, result); } } std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess()); } //! Individual model specialization template<typename T> void featureData(const T& data, const CMetricBucketGatherer& gatherer, std::size_t pid, std::size_t /*cid*/, core_t::TTime time, core_t::TTime bucketLength, TSizeFeatureDataPrVec& result) const { result.emplace_back( pid, this->featureData(data, time, bucketLength, gatherer.dataGatherer().effectiveSampleCount(pid))); } //! Population model specialization template<typename T> void featureData(const T& data, const CMetricBucketGatherer& gatherer, std::size_t pid, std::size_t cid, core_t::TTime time, core_t::TTime bucketLength, TSizeSizePrFeatureDataPrVec& result) const { result.emplace_back( TSizeSizePr(pid, cid), this->featureData(data, time, bucketLength, gatherer.dataGatherer().effectiveSampleCount(cid))); } SMetricFeatureData featureData(const CGathererTools::CSumGatherer& data, core_t::TTime time, core_t::TTime bucketLength, double /*effectiveSampleCount*/) const { return data.featureData(time, bucketLength, ZERO_SAMPLE); } template<typename T> inline SMetricFeatureData featureData(const T& data, core_t::TTime time, core_t::TTime bucketLength, double effectiveSampleCount) const { return data.featureData(time, bucketLength, effectiveSampleCount); } }; const TSampleVec SExtractFeatureData::ZERO_SAMPLE(1, CSample(0, TDoubleVec(1, 0.0), 1.0, 1.0)); //! Adds a value to the specified data gatherers. struct SAddValue { struct SStatistic { core_t::TTime s_Time; const CEventData::TDouble1VecArray* s_Values; unsigned int s_Count; unsigned int s_SampleCount; const TOptionalStrVec* s_Influences; }; template<typename T> inline void operator()(const TCategorySizePr& category, TSizeSizeTUMapUMap<T>& data, std::size_t pid, std::size_t cid, const CMetricBucketGatherer& gatherer, const SStatistic& stat) const { auto& entry = data[cid] .emplace(boost::unordered::piecewise_construct, boost::make_tuple(pid), boost::make_tuple( std::cref(gatherer.dataGatherer().params()), category.second, gatherer.currentBucketStartTime(), gatherer.bucketLength(), gatherer.beginInfluencers(), gatherer.endInfluencers())) .first->second; entry.add(stat.s_Time, (*stat.s_Values)[category.first], stat.s_Count, stat.s_SampleCount, *stat.s_Influences); } }; //! Updates gatherers with the start of a new bucket. struct SStartNewBucket { public: template<typename T> void operator()(const TCategorySizePr& /*category*/, TSizeSizeTUMapUMap<T>& data, core_t::TTime time) const { for (auto& cidEntry : data) { for (auto& pidEntry : cidEntry.second) { pidEntry.second.startNewBucket(time); } } } }; //! Resets data stored for buckets containing a specified time. struct SResetBucket { public: template<typename T> void operator()(const TCategorySizePr& /*category*/, TSizeSizeTUMapUMap<T>& data, core_t::TTime bucketStart) const { for (auto& cidEntry : data) { for (auto& pidEntry : cidEntry.second) { pidEntry.second.resetBucket(bucketStart); } } } }; //! Releases memory that is no longer needed. struct SReleaseMemory { public: template<typename T> void operator()(const TCategorySizePr& /*category*/, TSizeSizeTUMapUMap<T>& data, core_t::TTime samplingCutoffTime) const { for (auto& cidEntry : data) { auto& pidMap = cidEntry.second; for (auto i = pidMap.begin(); i != pidMap.end(); /**/) { if (i->second.isRedundant(samplingCutoffTime)) { i = pidMap.erase(i); } else { ++i; } } } } }; } // unnamed:: CMetricBucketGatherer::CMetricBucketGatherer(CDataGatherer& dataGatherer, const SBucketGathererInitData& initData) : CBucketGatherer(dataGatherer, initData), m_ValueFieldName(initData.s_ValueFieldName) { this->initializeFieldNamesPart1(initData); this->initializeFieldNamesPart2(initData); this->initializeFeatureData(); } CMetricBucketGatherer::CMetricBucketGatherer(CDataGatherer& dataGatherer, const SBucketGathererInitData& initData, core::CStateRestoreTraverser& traverser) : CBucketGatherer(dataGatherer, initData), m_ValueFieldName(initData.s_ValueFieldName) { this->initializeFieldNamesPart1(initData); if (traverser.traverseSubLevel(std::bind(&CMetricBucketGatherer::acceptRestoreTraverser, this, std::placeholders::_1)) == false) { traverser.setBadState(); } else { this->initializeFieldNamesPart2(initData); } } CMetricBucketGatherer::CMetricBucketGatherer(bool isForPersistence, const CMetricBucketGatherer& other) : CBucketGatherer(isForPersistence, other), m_ValueFieldName(other.m_ValueFieldName), m_FieldNames(other.m_FieldNames), m_BeginInfluencingFields(0), m_BeginValueFields(0), m_FeatureData(other.m_FeatureData) { if (!isForPersistence) { LOG_ABORT(<< "This constructor only creates clones for persistence"); } } void CMetricBucketGatherer::acceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertLevel(BASE_TAG, std::bind(&CBucketGatherer::baseAcceptPersistInserter, this, std::placeholders::_1)); inserter.insertValue(VERSION_TAG, CURRENT_VERSION); applyFunc(m_FeatureData, std::bind<void>(CPersistFeatureData(), std::placeholders::_1, std::placeholders::_2, std::ref(inserter))); } bool CMetricBucketGatherer::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { std::string version; bool isCurrentVersion(false); do { const std::string& name = traverser.name(); if (name == BASE_TAG) { if (traverser.traverseSubLevel(std::bind(&CBucketGatherer::baseAcceptRestoreTraverser, this, std::placeholders::_1)) == false) { LOG_ERROR(<< "Invalid data gatherer in " << traverser.value()); return false; } } else if (name == VERSION_TAG) { if (core::CStringUtils::stringToType(traverser.value(), version) == false) { LOG_ERROR(<< "Invalid version in " << traverser.value()); return false; } isCurrentVersion = (version == CURRENT_VERSION); } else if (this->acceptRestoreTraverserInternal(traverser, isCurrentVersion) == false) { // Soldier on or we'll get a core dump later. } } while (traverser.next()); return true; } bool CMetricBucketGatherer::acceptRestoreTraverserInternal(core::CStateRestoreTraverser& traverser, bool isCurrentVersion) { const std::string& name = traverser.name(); if (name == MEAN_TAG) { CRestoreFeatureData<model_t::E_Mean> restore; if (restore(traverser, 1, isCurrentVersion, *this, m_FeatureData) == false) { LOG_ERROR(<< "Invalid mean data in " << traverser.value()); return false; } } else if (name == MIN_TAG) { CRestoreFeatureData<model_t::E_Min> restore; if (restore(traverser, 1, isCurrentVersion, *this, m_FeatureData) == false) { LOG_ERROR(<< "Invalid min data in " << traverser.value()); return false; } } else if (name == MAX_TAG) { CRestoreFeatureData<model_t::E_Max> restore; if (restore(traverser, 1, isCurrentVersion, *this, m_FeatureData) == false) { LOG_ERROR(<< "Invalid max data in " << traverser.value()); return false; } } else if (name == SUM_TAG) { CRestoreFeatureData<model_t::E_Sum> restore; if (restore(traverser, 1, isCurrentVersion, *this, m_FeatureData) == false) { LOG_ERROR(<< "Invalid sum data in " << traverser.value()); return false; } } else if (name == MEDIAN_TAG) { CRestoreFeatureData<model_t::E_Median> restore; if (restore(traverser, 1, isCurrentVersion, *this, m_FeatureData) == false) { LOG_ERROR(<< "Invalid median data in " << traverser.value()); return false; } } else if (name == VARIANCE_TAG) { CRestoreFeatureData<model_t::E_Variance> restore; if (restore(traverser, 1, isCurrentVersion, *this, m_FeatureData) == false) { LOG_ERROR(<< "Invalid variance data in " << traverser.value()); return false; } } else if (name.find(MULTIVARIATE_MEAN_TAG) != std::string::npos) { std::size_t dimension; if (core::CStringUtils::stringToType( name.substr(MULTIVARIATE_MEAN_TAG.length()), dimension) == false) { LOG_ERROR(<< "Invalid dimension in " << name); return false; } CRestoreFeatureData<model_t::E_MultivariateMean> restore; if (restore(traverser, dimension, isCurrentVersion, *this, m_FeatureData) == false) { LOG_ERROR(<< "Invalid multivariate mean data in " << traverser.value()); return false; } } else if (name.find(MULTIVARIATE_MIN_TAG) != std::string::npos) { std::size_t dimension; if (core::CStringUtils::stringToType( name.substr(MULTIVARIATE_MIN_TAG.length()), dimension) == false) { LOG_ERROR(<< "Invalid dimension in " << name); return false; } CRestoreFeatureData<model_t::E_MultivariateMin> restore; if (restore(traverser, dimension, isCurrentVersion, *this, m_FeatureData) == false) { LOG_ERROR(<< "Invalid multivariate min data in " << traverser.value()); return false; } } else if (name.find(MULTIVARIATE_MAX_TAG) != std::string::npos) { std::size_t dimension; if (core::CStringUtils::stringToType( name.substr(MULTIVARIATE_MAX_TAG.length()), dimension) == false) { LOG_ERROR(<< "Invalid dimension in " << name); return false; } CRestoreFeatureData<model_t::E_MultivariateMax> restore; if (restore(traverser, dimension, isCurrentVersion, *this, m_FeatureData) == false) { LOG_ERROR(<< "Invalid multivariate max data in " << traverser.value()); return false; } } return true; } CBucketGatherer* CMetricBucketGatherer::cloneForPersistence() const { return new CMetricBucketGatherer(true, *this); } const std::string& CMetricBucketGatherer::persistenceTag() const { return CBucketGatherer::METRIC_BUCKET_GATHERER_TAG; } const std::string& CMetricBucketGatherer::personFieldName() const { return m_FieldNames[0]; } const std::string& CMetricBucketGatherer::attributeFieldName() const { return m_DataGatherer.isPopulation() ? m_FieldNames[1] : EMPTY_STRING; } const std::string& CMetricBucketGatherer::valueFieldName() const { return m_ValueFieldName; } CMetricBucketGatherer::TStrVecCItr CMetricBucketGatherer::beginInfluencers() const { return m_FieldNames.begin() + m_BeginInfluencingFields; } CMetricBucketGatherer::TStrVecCItr CMetricBucketGatherer::endInfluencers() const { return m_FieldNames.begin() + m_BeginValueFields; } const TStrVec& CMetricBucketGatherer::fieldsOfInterest() const { return m_FieldNames; } std::string CMetricBucketGatherer::description() const { return function_t::name(function_t::function(m_DataGatherer.features())) + (m_ValueFieldName.empty() ? "" : " ") + m_ValueFieldName + +(byField(m_DataGatherer.isPopulation(), m_FieldNames).empty() ? "" : " by ") + byField(m_DataGatherer.isPopulation(), m_FieldNames) + (overField(m_DataGatherer.isPopulation(), m_FieldNames).empty() ? "" : " over ") + overField(m_DataGatherer.isPopulation(), m_FieldNames) + (m_DataGatherer.partitionFieldName().empty() ? "" : " partition=") + m_DataGatherer.partitionFieldName(); } bool CMetricBucketGatherer::processFields(const TStrCPtrVec& fieldValues, CEventData& result, CResourceMonitor& resourceMonitor) { using TOptionalStr = std::optional<std::string>; if (fieldValues.size() != m_FieldNames.size()) { LOG_ERROR(<< "Unexpected field values: " << fieldValues << ", for field names: " << m_FieldNames); return false; } const std::string* person = (fieldValues[0] == nullptr && m_DataGatherer.useNull()) ? &EMPTY_STRING : fieldValues[0]; if (person == nullptr) { // Just ignore: the "person" field wasn't present in the // record. Since all models in an aggregate share this // field we can't process this record further. Note that // we don't warn here since we'll permit a small fraction // of records to having missing field values. return false; } // The code below just ignores missing/invalid values. This // doesn't necessarily stop us processing the record by other // models, so we don't return false. std::size_t i = m_BeginInfluencingFields; for (/**/; i < m_BeginValueFields; ++i) { result.addInfluence(fieldValues[i] ? TOptionalStr(*fieldValues[i]) : std::nullopt); } if (m_DataGatherer.summaryMode() != model_t::E_None) { CEventData::TDouble1VecArraySizePr statistics; statistics.first.fill(TDouble1Vec(1, 0.0)); if (m_DataGatherer.extractCountFromField(m_FieldNames[i], fieldValues[i], statistics.second) == false) { result.addValue(); return true; } ++i; bool allOk = true; if (m_FieldNames.size() > statistics.first.size() + i) { LOG_ERROR(<< "Inconsistency - more statistic field names than allowed " << m_FieldNames.size() - i << " > " << statistics.first.size()); allOk = false; } if (m_FieldNames.size() > m_FieldMetricCategories.size() + i) { LOG_ERROR(<< "Inconsistency - more statistic field names than metric categories " << m_FieldNames.size() - i << " > " << m_FieldMetricCategories.size()); allOk = false; } for (std::size_t j = 0; allOk && i < m_FieldNames.size(); ++i, ++j) { model_t::EMetricCategory category = m_FieldMetricCategories[j]; if (fieldValues[i] == nullptr || m_DataGatherer.extractMetricFromField( m_FieldNames[i], *fieldValues[i], statistics.first[category]) == false) { allOk = false; } } if (allOk) { if (statistics.second == CDataGatherer::EXPLICIT_NULL_SUMMARY_COUNT) { result.setExplicitNull(); } else { result.addStatistics(statistics); } } else { result.addValue(); } } else { TDouble1Vec value; if (fieldValues[i] != nullptr && m_DataGatherer.extractMetricFromField(m_FieldNames[i], *fieldValues[i], value) == true) { result.addValue(value); } else { result.addValue(); } } bool addedPerson = false; std::size_t pid = CDynamicStringIdRegistry::INVALID_ID; if (result.isExplicitNull()) { m_DataGatherer.personId(*person, pid); } else { pid = m_DataGatherer.addPerson(*person, resourceMonitor, addedPerson); } if (pid == CDynamicStringIdRegistry::INVALID_ID) { if (!result.isExplicitNull()) { LOG_TRACE(<< "Couldn't create a person, over memory limit"); } return false; } if (addedPerson) { resourceMonitor.addExtraMemory(m_DataGatherer.isPopulation() ? CDataGatherer::ESTIMATED_MEM_USAGE_PER_OVER_FIELD : CDataGatherer::ESTIMATED_MEM_USAGE_PER_BY_FIELD); ++(m_DataGatherer.isPopulation() ? core::CProgramCounters::counter(counter_t::E_TSADNumberOverFields) : core::CProgramCounters::counter(counter_t::E_TSADNumberByFields)); } if (!result.person(pid)) { LOG_ERROR(<< "Bad by field value: " << *person); return false; } const std::string* attribute = (fieldValues[1] == nullptr && m_DataGatherer.useNull()) ? &EMPTY_STRING : fieldValues[1]; if (m_DataGatherer.isPopulation()) { if (attribute == nullptr) { // Just ignore: the "by" field wasn't present in the // record. This doesn't necessarily stop us processing // the record by other models so we don't return false. // Note that we don't warn here since we'll permit a // small fraction of records to having missing field // values. result.addAttribute(); result.addValue(); return true; } bool addedAttribute = false; std::size_t cid = CDynamicStringIdRegistry::INVALID_ID; if (result.isExplicitNull()) { m_DataGatherer.attributeId(*attribute, cid); } else { cid = m_DataGatherer.addAttribute(*attribute, resourceMonitor, addedAttribute); } result.addAttribute(cid); if (addedAttribute) { resourceMonitor.addExtraMemory(CDataGatherer::ESTIMATED_MEM_USAGE_PER_BY_FIELD); ++core::CProgramCounters::counter(counter_t::E_TSADNumberByFields); } } else { // Add the unique attribute. result.addAttribute(std::size_t(0)); } return true; } void CMetricBucketGatherer::recyclePeople(const TSizeVec& peopleToRemove) { if (peopleToRemove.empty()) { return; } applyFunc(m_FeatureData, [&, remove = SRemovePeople{} ](const auto& category, auto& data) { remove(category, data, peopleToRemove); }); this->CBucketGatherer::recyclePeople(peopleToRemove); } void CMetricBucketGatherer::removePeople(std::size_t lowestPersonToRemove) { applyFunc(m_FeatureData, [&, remove = SRemovePeople{} ](const auto& category, auto& data) { remove(category, data, lowestPersonToRemove, m_DataGatherer.numberPeople()); }); this->CBucketGatherer::removePeople(lowestPersonToRemove); } void CMetricBucketGatherer::recycleAttributes(const TSizeVec& attributesToRemove) { if (attributesToRemove.empty()) { return; } if (m_DataGatherer.isPopulation()) { applyFunc(m_FeatureData, [&, remove = SRemoveAttributes{} ](const auto& category, auto& data) { remove(category, data, attributesToRemove); }); } this->CBucketGatherer::recycleAttributes(attributesToRemove); } void CMetricBucketGatherer::removeAttributes(std::size_t lowestAttributeToRemove) { if (m_DataGatherer.isPopulation()) { applyFunc(m_FeatureData, [&, remove = SRemoveAttributes{} ](const auto& category, auto& data) { remove(category, data, lowestAttributeToRemove, m_DataGatherer.numberAttributes()); }); } this->CBucketGatherer::removeAttributes(lowestAttributeToRemove); } std::uint64_t CMetricBucketGatherer::checksum() const { std::uint64_t seed = this->CBucketGatherer::checksum(); seed = maths::common::CChecksum::calculate(seed, m_DataGatherer.params().s_DecayRate); TStrCRefStrCRefPrUInt64Map hashes; applyFunc(m_FeatureData, [&, hash = SHash{} ](const auto& category, const auto& data) { hash(category, data, *this, hashes); }); LOG_TRACE(<< "seed = " << seed); LOG_TRACE(<< "hashes = " << hashes); return maths::common::CChecksum::calculate(seed, hashes); } void CMetricBucketGatherer::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const { registerMemoryCallbacks(); mem->setName("CMetricBucketGatherer"); this->CBucketGatherer::debugMemoryUsage(mem->addChild()); core::memory_debug::dynamicSize("m_ValueFieldName", m_ValueFieldName, mem); core::memory_debug::dynamicSize("m_FieldNames", m_FieldNames, mem); core::memory_debug::dynamicSize("m_FieldMetricCategories", m_FieldMetricCategories, mem); core::memory_debug::dynamicSize("m_FeatureData", m_FeatureData, mem); } std::size_t CMetricBucketGatherer::memoryUsage() const { registerMemoryCallbacks(); std::size_t mem = this->CBucketGatherer::memoryUsage(); mem += core::memory::dynamicSize(m_ValueFieldName); mem += core::memory::dynamicSize(m_FieldNames); mem += core::memory::dynamicSize(m_FieldMetricCategories); mem += core::memory::dynamicSize(m_FeatureData); return mem; } std::size_t CMetricBucketGatherer::staticSize() const { return sizeof(*this); } void CMetricBucketGatherer::clear() { this->CBucketGatherer::clear(); m_FeatureData.clear(); this->initializeFeatureData(); } bool CMetricBucketGatherer::resetBucket(core_t::TTime bucketStart) { if (this->CBucketGatherer::resetBucket(bucketStart) == false) { return false; } applyFunc(m_FeatureData, [&, reset = SResetBucket{} ](const auto& category, auto& data) { reset(category, data, bucketStart); }); return true; } void CMetricBucketGatherer::releaseMemory(core_t::TTime samplingCutoffTime) { applyFunc(m_FeatureData, [&, releaseMemory = SReleaseMemory{} ](const auto& category, auto& data) { releaseMemory(category, data, samplingCutoffTime); }); } void CMetricBucketGatherer::sample(core_t::TTime time) { if (m_DataGatherer.sampleCounts()) { applyFunc(m_FeatureData, [&, sample = SDoSample{} ](const auto& category, auto& data) { sample(category, data, time, *this, *m_DataGatherer.sampleCounts()); }); } } void CMetricBucketGatherer::featureData(core_t::TTime time, core_t::TTime bucketLength, TFeatureAnyPrVec& result) const { result.clear(); if (!this->dataAvailable(time) || time >= this->currentBucketStartTime() + this->bucketLength()) { LOG_DEBUG(<< "No data available at " << time << ", current bucket = " << this->printCurrentBucket() << ", bucket length = " << this->bucketLength()); return; } for (std::size_t i = 0, n = m_DataGatherer.numberFeatures(); i < n; ++i) { model_t::EFeature feature = m_DataGatherer.feature(i); model_t::EMetricCategory category; if (model_t::metricCategory(feature, category)) { std::size_t dimension = model_t::dimension(feature); auto begin = m_FeatureData.find({category, dimension}); if (begin != m_FeatureData.end()) { auto end = begin; ++end; applyFunc(begin, end, [&, extractFeatureData = SExtractFeatureData{} ]( const auto& category_, const auto& data) { extractFeatureData(category_, data, *this, feature, time, bucketLength, result); }); } else { LOG_ERROR(<< "No data for category " << model_t::print(category)); } } else { LOG_ERROR(<< "Unexpected feature " << model_t::print(feature)); } } } void CMetricBucketGatherer::resize(std::size_t pid, std::size_t cid) { if (m_DataGatherer.sampleCounts()) { m_DataGatherer.sampleCounts()->resize(m_DataGatherer.isPopulation() ? cid : pid); } else { LOG_ERROR(<< "Invalid sample counts for gatherer"); } } void CMetricBucketGatherer::addValue(std::size_t pid, std::size_t cid, core_t::TTime time, const CEventData::TDouble1VecArray& values, std::size_t count, const CEventData::TOptionalStr& /*stringValue*/, const TOptionalStrVec& influences) { // Check that we are correctly sized - a person/attribute might have been added this->resize(pid, cid); SAddValue::SStatistic stat; stat.s_Time = time; stat.s_Values = &values; stat.s_Count = static_cast<unsigned int>(count); if (m_DataGatherer.sampleCounts()) { stat.s_SampleCount = m_DataGatherer.sampleCounts()->count( m_DataGatherer.isPopulation() ? cid : pid); } else { LOG_ERROR(<< "Invalid sample counts for gatherer"); stat.s_SampleCount = 0.0; } stat.s_Influences = &influences; applyFunc(m_FeatureData, [&, addValue = SAddValue{} ](const auto& category, auto& data) { addValue(category, data, pid, cid, *this, stat); }); } void CMetricBucketGatherer::startNewBucket(core_t::TTime time, bool skipUpdates) { LOG_TRACE(<< "StartNewBucket, " << time << " @ " << this); using TUInt64Vec = std::vector<std::uint64_t>; using TSizeUInt64VecUMap = boost::unordered_map<std::size_t, TUInt64Vec>; // Only update the sampleCounts if we are the primary bucket gatherer. // This is the only place where the bucket gatherer needs to know about its // status within the celestial plain, which is a bit ugly... if (!skipUpdates && time % this->bucketLength() == 0) { core_t::TTime earliestAvailableBucketStartTime = this->earliestBucketStartTime(); if (this->dataAvailable(earliestAvailableBucketStartTime)) { TSizeUInt64VecUMap counts; const TSizeSizePrUInt64UMap& counts_ = this->bucketCounts(earliestAvailableBucketStartTime); for (const auto& count : counts_) { if (m_DataGatherer.isPopulation()) { counts[CDataGatherer::extractAttributeId(count)].push_back( CDataGatherer::extractData(count)); } else { counts .emplace(CDataGatherer::extractPersonId(count), TUInt64Vec{0}) .first->second[0] += CDataGatherer::extractData(count); } } double alpha = std::exp(-m_DataGatherer.params().s_DecayRate); for (auto& count : counts) { std::sort(count.second.begin(), count.second.end()); std::size_t n = count.second.size() / 2; double median = count.second.size() % 2 == 0 ? static_cast<double>(count.second[n - 1] + count.second[n]) / 2.0 : static_cast<double>(count.second[n]); m_DataGatherer.sampleCounts()->updateMeanNonZeroBucketCount( count.first, median, alpha); } m_DataGatherer.sampleCounts()->refresh(m_DataGatherer); } } applyFunc(m_FeatureData, [&, startNewBucket = SStartNewBucket{} ](const auto& category, auto& data) { startNewBucket(category, data, time); }); } void CMetricBucketGatherer::initializeFieldNamesPart1(const SBucketGathererInitData& initData) { switch (m_DataGatherer.summaryMode()) { case model_t::E_None: m_FieldNames.reserve(2 + static_cast<std::size_t>(m_DataGatherer.isPopulation()) + initData.s_InfluenceFieldNames.size()); m_FieldNames.push_back(initData.s_PersonFieldName); if (m_DataGatherer.isPopulation()) m_FieldNames.push_back(initData.s_AttributeFieldName); m_BeginInfluencingFields = m_FieldNames.size(); m_FieldNames.insert(m_FieldNames.end(), initData.s_InfluenceFieldNames.begin(), initData.s_InfluenceFieldNames.end()); m_BeginValueFields = m_FieldNames.size(); break; case model_t::E_Manual: m_FieldNames.reserve(3 + static_cast<std::size_t>(m_DataGatherer.isPopulation()) + initData.s_InfluenceFieldNames.size()); m_FieldNames.push_back(initData.s_PersonFieldName); if (m_DataGatherer.isPopulation()) m_FieldNames.push_back(initData.s_AttributeFieldName); m_BeginInfluencingFields = m_FieldNames.size(); m_FieldNames.insert(m_FieldNames.end(), initData.s_InfluenceFieldNames.begin(), initData.s_InfluenceFieldNames.end()); m_BeginValueFields = m_FieldNames.size(); break; } } void CMetricBucketGatherer::initializeFieldNamesPart2(const SBucketGathererInitData& initData) { switch (m_DataGatherer.summaryMode()) { case model_t::E_None: m_FieldNames.push_back(initData.s_ValueFieldName); break; case model_t::E_Manual: m_FieldNames.push_back(initData.s_SummaryCountFieldName); m_FieldNames.push_back(initData.s_ValueFieldName); m_DataGatherer.determineMetricCategory(m_FieldMetricCategories); break; }; } void CMetricBucketGatherer::initializeFeatureData() { for (std::size_t i = 0, n = m_DataGatherer.numberFeatures(); i < n; ++i) { const model_t::EFeature feature = m_DataGatherer.feature(i); model_t::EMetricCategory category; if (model_t::metricCategory(feature, category)) { std::size_t dimension = model_t::dimension(feature); switch (category) { case model_t::E_Mean: initializeFeatureDataInstance<model_t::E_Mean>(dimension, m_FeatureData); break; case model_t::E_Median: initializeFeatureDataInstance<model_t::E_Median>(dimension, m_FeatureData); break; case model_t::E_Min: initializeFeatureDataInstance<model_t::E_Min>(dimension, m_FeatureData); break; case model_t::E_Max: initializeFeatureDataInstance<model_t::E_Max>(dimension, m_FeatureData); break; case model_t::E_Variance: initializeFeatureDataInstance<model_t::E_Variance>(dimension, m_FeatureData); break; case model_t::E_Sum: initializeFeatureDataInstance<model_t::E_Sum>(dimension, m_FeatureData); break; case model_t::E_MultivariateMean: initializeFeatureDataInstance<model_t::E_MultivariateMean>(dimension, m_FeatureData); break; case model_t::E_MultivariateMin: initializeFeatureDataInstance<model_t::E_MultivariateMin>(dimension, m_FeatureData); break; case model_t::E_MultivariateMax: initializeFeatureDataInstance<model_t::E_MultivariateMax>(dimension, m_FeatureData); break; } } else { LOG_ERROR(<< "Unexpected feature = " << model_t::print(m_DataGatherer.feature(i))); } } } } }