include/model/CBucketGatherer.h (213 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #ifndef INCLUDED_ml_model_CBucketGatherer_h #define INCLUDED_ml_model_CBucketGatherer_h #include <core/CCompressedDictionary.h> #include <core/CHashing.h> #include <core/CLogger.h> #include <core/CMemoryUsage.h> #include <core/CoreTypes.h> #include <model/CBucketQueue.h> #include <model/CEventData.h> #include <model/ImportExport.h> #include <model/ModelTypes.h> #include <boost/unordered_map.hpp> #include <boost/unordered_set.hpp> #include <any> #include <cstdint> #include <functional> #include <map> #include <optional> #include <string> #include <vector> namespace ml { namespace core { class CStatePersistInserter; class CStateRestoreTraverser; } namespace model { class CDataGatherer; class CEventData; class CSearchKey; class CResourceMonitor; //! \brief Time series data gathering interface. //! //! DESCRIPTION:\n //! This defines the interface to gather time-specific data for one or more //! time series. //! //! This is subclassed by Metric and EventRate implementations. //! //! IMPLEMENTATION DECISIONS:\n //! This functionality has been separated from the CDataGatherer in order //! to allow the CDataGatherer to support multiple overlapping buckets and //! buckets with different time spans. However, the overlapping feature //! has been removed but this class is kept to avoid BWC issues. class MODEL_EXPORT CBucketGatherer { public: using TDoubleVec = std::vector<double>; using TDouble1Vec = core::CSmallVector<double, 1>; using TSizeVec = std::vector<std::size_t>; using TStrVec = std::vector<std::string>; using TStrVecCItr = TStrVec::const_iterator; using TStrCPtrVec = std::vector<const std::string*>; using TSizeUInt64Pr = std::pair<std::size_t, std::uint64_t>; using TSizeUInt64PrVec = std::vector<TSizeUInt64Pr>; using TFeatureVec = model_t::TFeatureVec; using TOptionalDouble = std::optional<double>; using TSizeSizePr = std::pair<std::size_t, std::size_t>; using TSizeSizePrUInt64Pr = std::pair<TSizeSizePr, std::uint64_t>; using TSizeSizePrUInt64PrVec = std::vector<TSizeSizePrUInt64Pr>; using TDictionary = core::CCompressedDictionary<2>; using TWordSizeUMap = TDictionary::TWordTUMap<std::size_t>; using TWordSizeUMapItr = TWordSizeUMap::iterator; using TWordSizeUMapCItr = TWordSizeUMap::const_iterator; using TSizeSizePrUInt64UMap = boost::unordered_map<TSizeSizePr, std::uint64_t>; using TSizeSizePrUInt64UMapItr = TSizeSizePrUInt64UMap::iterator; using TSizeSizePrUInt64UMapCItr = TSizeSizePrUInt64UMap::const_iterator; using TSizeSizePrUInt64UMapQueue = CBucketQueue<TSizeSizePrUInt64UMap>; using TTimeSizeSizePrUInt64UMapMap = std::map<core_t::TTime, TSizeSizePrUInt64UMap>; using TSizeSizePrUInt64UMapQueueItr = TSizeSizePrUInt64UMapQueue::iterator; using TSizeSizePrUInt64UMapQueueCItr = TSizeSizePrUInt64UMapQueue::const_iterator; using TSizeSizePrUInt64UMapQueueCRItr = TSizeSizePrUInt64UMapQueue::const_reverse_iterator; using TSizeSizePrUSet = boost::unordered_set<TSizeSizePr>; using TSizeSizePrUSetCItr = TSizeSizePrUSet::const_iterator; using TSizeSizePrUSetQueue = CBucketQueue<TSizeSizePrUSet>; using TTimeSizeSizePrUSetMap = std::map<core_t::TTime, TSizeSizePrUSet>; using TSizeSizePrUSetQueueCItr = TSizeSizePrUSetQueue::const_iterator; using TOptionalStr = std::optional<std::string>; using TOptionalStrVec = std::vector<TOptionalStr>; using TSizeSizePrOptionalStrPr = std::pair<TSizeSizePr, std::string>; //! \brief Hashes a ((size_t, size_t), string*) pair. struct MODEL_EXPORT SSizeSizePrOptionalStrPrHash { std::size_t operator()(const TSizeSizePrOptionalStrPr& key) const { std::uint64_t const seed = core::CHashing::hashCombine( static_cast<std::uint64_t>(key.first.first), static_cast<std::uint64_t>(key.first.second)); return core::CHashing::hashCombine(seed, s_Hasher(key.second)); } core::CHashing::CMurmurHash2String s_Hasher; }; //! \brief Checks two ((size_t, size_t), string*) pairs for equality. struct MODEL_EXPORT SSizeSizePrOptionalStrPrEqual { bool operator()(const TSizeSizePrOptionalStrPr& lhs, const TSizeSizePrOptionalStrPr& rhs) const { return lhs.first == rhs.first && lhs.second == rhs.second; } }; using TSizeSizePrOptionalStrPrUInt64UMap = boost::unordered_map<TSizeSizePrOptionalStrPr, std::uint64_t, SSizeSizePrOptionalStrPrHash, SSizeSizePrOptionalStrPrEqual>; using TSizeSizePrOptionalStrPrUInt64UMapCItr = TSizeSizePrOptionalStrPrUInt64UMap::const_iterator; using TSizeSizePrOptionalStrPrUInt64UMapItr = TSizeSizePrOptionalStrPrUInt64UMap::iterator; using TSizeSizePrOptionalStrPrUInt64UMapVec = std::vector<TSizeSizePrOptionalStrPrUInt64UMap>; using TSizeSizePrOptionalStrPrUInt64UMapVecQueue = CBucketQueue<TSizeSizePrOptionalStrPrUInt64UMapVec>; using TSizeSizePrOptionalStrPrUInt64UMapVecCItr = TSizeSizePrOptionalStrPrUInt64UMapVec::const_iterator; using TTimeSizeSizePrOptionalStrPrUInt64UMapVecMap = std::map<core_t::TTime, TSizeSizePrOptionalStrPrUInt64UMapVec>; using TSearchKeyCRef = std::reference_wrapper<const CSearchKey>; using TFeatureAnyPr = std::pair<model_t::EFeature, std::any>; using TFeatureAnyPrVec = std::vector<TFeatureAnyPr>; using TMetricCategoryVec = std::vector<model_t::EMetricCategory>; using TTimeVec = std::vector<core_t::TTime>; using TTimeVecCItr = TTimeVec::const_iterator; struct SBucketGathererInitData { // The name of the field holding the summary count. const std::string& s_SummaryCountFieldName; // The name of the field which identifies people. const std::string& s_PersonFieldName; // The name of the field which defines the person attributes. const std::string& s_AttributeFieldName; // The name of the field which contains the metric values. const std::string& s_ValueFieldName; // The field names for which we will compute influences. const TStrVec& s_InfluenceFieldNames; // The start of the time interval for which to gather data. core_t::TTime s_StartTime; // Override for the number of measurements // in a statistic. (Note that this is intended for testing only.) // A zero value means that the data gatherer class will determine // an appropriate value for the bucket length and data rate. unsigned int s_SampleOverrideCount; }; public: static const std::string EVENTRATE_BUCKET_GATHERER_TAG; static const std::string METRIC_BUCKET_GATHERER_TAG; public: //! \name Life-cycle //@{ //! Create a new data series gatherer. //! //! \param[in] dataGatherer The owning data gatherer. //! \param[in] bucketGathererInitData The parameter initialization object //! for the bucket gatherer. CBucketGatherer(CDataGatherer& dataGatherer, const SBucketGathererInitData& bucketGathererInitData); //! Create a copy that will result in the same persisted state as the //! original. This is effectively a copy constructor that creates a //! copy that's only valid for a single purpose. The boolean flag is //! redundant except to create a signature that will not be mistaken for //! a general purpose copy constructor. CBucketGatherer(bool isForPersistence, const CBucketGatherer& other); virtual ~CBucketGatherer() = default; //@} //! \name Persistence //@{ //! Persist state by passing information to the supplied inserter void baseAcceptPersistInserter(core::CStatePersistInserter& inserter) const; //! Restore the state bool baseAcceptRestoreTraverser(core::CStateRestoreTraverser& traverser); //! Create a clone of this data gatherer that will result in the same //! persisted state. The clone may be incomplete in ways that do not //! affect the persisted representation, and must not be used for any //! other purpose. //! \warning The caller owns the object returned. virtual CBucketGatherer* cloneForPersistence() const = 0; //! The persistence tag name of the subclass. virtual const std::string& persistenceTag() const = 0; //@} //! \name Fields //@{ //! This is the common field in all searches "along" which the //! probabilities are aggregated, i.e. the "by" field name for //! individual models and the "over" field name for population //! models. virtual const std::string& personFieldName() const = 0; //! Get the attribute field name if one exists. virtual const std::string& attributeFieldName() const = 0; //! Get the name of the field containing the metric value. virtual const std::string& valueFieldName() const = 0; //! Get an iterator at the beginning the influencing field names. virtual TStrVecCItr beginInfluencers() const = 0; //! Get an iterator at the end of the influencing field names. virtual TStrVecCItr endInfluencers() const = 0; //! Get the fields for which to gather data. //! //! This defines the fields to extract from a record. These include //! the fields which define the categories whose counts are being //! analyzed, the fields containing metric series names and values //! and the fields defining a population. virtual const TStrVec& fieldsOfInterest() const = 0; //@} //! Get a description of the component searches. virtual std::string description() const = 0; //! \name Update //@{ //! Process the specified fields. //! //! This adds people and attributes as necessary and fills out the //! event data from \p fieldValues. virtual bool processFields(const TStrCPtrVec& fieldValues, CEventData& result, CResourceMonitor& resourceMonitor) = 0; //! Record the arrival of \p data at \p time. bool addEventData(CEventData& data); //! Roll time forwards to \p time. void timeNow(core_t::TTime time); //! Roll time to the end of the bucket that is latency after the sampled bucket. void sampleNow(core_t::TTime sampleBucketStart); //! Roll time to the end of the bucket that is latency after the sampled bucket //! without performing any updates that impact the model. void skipSampleNow(core_t::TTime sampleBucketStart); //@} //! \name People //@{ //! Get the non-zero counts by person for the bucketing interval //! containing \p time. //! //! \param[in] time The time of interest. //! \param[out] result Filled in with the non-zero counts by person. //! The first element is the person identifier and the second their //! count in the bucketing interval. The result is sorted by person. //! \note We expect the non-zero counts to be sparse on the space //! of people so use a sparse encoding: //! <pre class="fragment"> //! \f$ pid \leftarrow c\f$ //! </pre> //! where,\n //! \f$pid\f$ is the person identifier,\n //! \f$c\f$ is the count for the person. void personNonZeroCounts(core_t::TTime time, TSizeUInt64PrVec& result) const; //! Stop gathering data on the people identified by \p peopleToRemove. virtual void recyclePeople(const TSizeVec& peopleToRemove) = 0; //! Remove all traces of people whose identifiers are greater than //! or equal to \p lowestPersonToRemove. virtual void removePeople(std::size_t lowestPersonToRemove) = 0; //@} //! \name Attribute //@{ //! Stop gathering data on the attributes identified by \p attributesToRemove. virtual void recycleAttributes(const TSizeVec& attributesToRemove) = 0; //! Remove all traces of attributes whose identifiers are greater than //! or equal to \p lowestAttributeToRemove. virtual void removeAttributes(std::size_t lowestAttributeToRemove) = 0; //@} //! \name Time //@{ //! Get the start of the current bucketing time interval. core_t::TTime currentBucketStartTime() const; //! The earliest time for which data can still arrive. core_t::TTime earliestBucketStartTime() const; //! Get the length of the bucketing time interval. core_t::TTime bucketLength() const; //! Check if data is available at \p time. bool dataAvailable(core_t::TTime time) const; //! For each bucket in the interval [\p startTime, \p endTime], //! validate that it can be sampled and increment \p startTime //! to the first valid bucket or \p endTime if no valid buckets //! exist. //! //! \param[in,out] startTime The start of the interval to sample. //! \param[in] endTime The end of the interval to sample. bool validateSampleTimes(core_t::TTime& startTime, core_t::TTime endTime) const; //! Print the current bucket. std::string printCurrentBucket() const; //@} //! \name Counts //@{ //! Get the non-zero (person, attribute) pair counts in the //! bucketing interval corresponding to the given time. const TSizeSizePrUInt64UMap& bucketCounts(core_t::TTime time) const; //! Get the non-zero (person, attribute) pair counts for each //! value of influencing field. const TSizeSizePrOptionalStrPrUInt64UMapVec& influencerCounts(core_t::TTime time) const; //@} //! Get the checksum of this gatherer. virtual std::uint64_t checksum() const = 0; //! Debug the memory used by this component. virtual void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const = 0; //! Get the memory used by this component. virtual std::size_t memoryUsage() const = 0; //! Get the static size of this object. virtual std::size_t staticSize() const = 0; //! Clear this data gatherer. virtual void clear() = 0; //! Reset bucket and return true if bucket was successfully //! reset or false otherwise. virtual bool resetBucket(core_t::TTime bucketStart) = 0; //! Release memory that is no longer needed virtual void releaseMemory(core_t::TTime samplingCutoffTime) = 0; //! Remove the values in queue for the people or attributes //! in \p toRemove. //! //! \tparam T This must be an associative array from person //! id and/or attribute id to some corresponding value. template<typename F, typename T> static void remove(const TSizeVec& toRemove, const F& extractId, CBucketQueue<T>& queue) { for (auto bucketItr = queue.begin(); bucketItr != queue.end(); ++bucketItr) { T& bucket = *bucketItr; for (auto i = bucket.begin(); i != bucket.end(); /**/) { if (std::binary_search(toRemove.begin(), toRemove.end(), extractId(*i))) { i = bucket.erase(i); } else { ++i; } } } } //! Remove the values in queue for the people or attributes //! in \p toRemove. //! //! \tparam T This must be a vector of associative array from person //! id and/or attribute id to some corresponding value. template<typename F, typename T> static void remove(const TSizeVec& toRemove, const F& extractId, CBucketQueue<std::vector<T>>& queue) { for (auto bucketItr = queue.begin(); bucketItr != queue.end(); ++bucketItr) { for (std::size_t i = 0; i < bucketItr->size(); ++i) { T& bucket = (*bucketItr)[i]; for (auto j = bucket.begin(); j != bucket.end(); /**/) { if (std::binary_search(toRemove.begin(), toRemove.end(), extractId(j->first))) { j = bucket.erase(j); } else { ++j; } } } } } //! Get the raw data for all features for the bucketing time interval //! containing \p time. //! //! \param[in] time The time of interest. //! \param[out] result Filled in with the feature data at \p time. virtual void featureData(core_t::TTime time, core_t::TTime bucketLength, TFeatureAnyPrVec& result) const = 0; //! Get a reference to the owning data gatherer. const CDataGatherer& dataGatherer() const; //! Has this pid/cid pair had only explicit null records? bool hasExplicitNullsOnly(core_t::TTime time, std::size_t pid, std::size_t cid) const; //! Create samples if possible for the bucket pointed out by \p time. virtual void sample(core_t::TTime time) = 0; //! Persist state by passing information \p inserter. virtual void acceptPersistInserter(core::CStatePersistInserter& inserter) const = 0; private: //! Resize the necessary data structures so they can hold values //! for the person and/or attribute identified by \p pid and \p cid, //! respectively. //! //! \param[in] pid The identifier of the person to accommodate. //! \param[in] cid The identifier of the attribute to accommodate. virtual void resize(std::size_t pid, std::size_t cid) = 0; //! Record the arrival of \p values for attribute identified //! by \p cid and person identified by \p pid. //! //! \param[in] pid The identifier of the person who generated //! the value. //! \param[in] cid The identifier of the value's attribute. //! \param[in] time The time of the \p values. //! \param[in] values The metric statistic value(s). //! \param[in] count The number of measurements in the metric //! statistic. //! \param[in] stringValue The value for the function string argument //! if there is one or null. //! \param[in] influences The influencing field values which label //! the value. virtual void addValue(std::size_t pid, std::size_t cid, core_t::TTime time, const CEventData::TDouble1VecArray& values, std::size_t count, const CEventData::TOptionalStr& stringValue, const TOptionalStrVec& influences) = 0; //! Handle the start of a new bucketing interval. virtual void startNewBucket(core_t::TTime time, bool skipUpdates) = 0; //! Roll time forwards to \p time and update depending on \p skipUpdates void hiddenTimeNow(core_t::TTime time, bool skipUpdates); protected: //! Reference to the owning data gatherer CDataGatherer& m_DataGatherer; private: //! The earliest time of any record that has arrived. core_t::TTime m_EarliestTime; //! The start of the current bucketing interval. core_t::TTime m_BucketStart; //! The non-zero (person, attribute) pair counts in the current //! bucketing interval. TSizeSizePrUInt64UMapQueue m_PersonAttributeCounts; //! A set per bucket that contains a (pid,cid) pair if at least //! one explicit null record has been seen. TSizeSizePrUSetQueue m_PersonAttributeExplicitNulls; //! The influencing field value counts per person and/or attribute. TSizeSizePrOptionalStrPrUInt64UMapVecQueue m_InfluencerCounts; }; } } #endif // INCLUDED_ml_model_CBucketGatherer_h