include/model/CDataGatherer.h (260 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#ifndef INCLUDED_ml_model_CDataGatherer_h
#define INCLUDED_ml_model_CDataGatherer_h
#include <core/CLogger.h>
#include <core/CMemoryUsage.h>
#include <core/CoreTypes.h>
#include <model/CBucketGatherer.h>
#include <model/CBucketQueue.h>
#include <model/CDynamicStringIdRegistry.h>
#include <model/FunctionTypes.h>
#include <model/ImportExport.h>
#include <model/ModelTypes.h>
#include <model/SModelParams.h>
#include <boost/unordered_map.hpp>
#include <any>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
namespace ml {
namespace core {
class CStatePersistInserter;
class CStateRestoreTraverser;
}
namespace model {
class CEventData;
class CMetricBucketGatherer;
class CResourceMonitor;
class CSampleCounts;
class CSearchKey;
//! \brief Time series data gathering interface and common functionality.
//!
//! DESCRIPTION:\n
//! This defines the common interface to gather the data which we model
//! in order to characterize a time series. The interface breaks down
//! in to fields, update with new event data, features, person, attribute,
//! metric and functionality to manage the passage of time.
//!
//! The features provide a way of customizing the data to model. For more
//! details see model_t::EFeature. Features are generally quantities that
//! are computed for the bucketing time intervals. There is a templated
//! accessor to retrieve all feature data which is the main interface used
//! by the model classes to retrieve data.
//!
//! The raw events can be partitioned by up to two categorical fields. These
//! map to the concept of person and attribute fields. The person interface
//! provides a mapping between people and their unique identifiers and also
//! manages retrieving people's counts and life cycle. The attribute interface
//! is only meaningful for population modeling and and provides a mapping
//! between attribute values and their unique identifiers and manages their
//! life cycle.
//!
//! Finally, the metric interface provides some custom functions which apply
//! specifically to metric valued time series. (See CMetricBucketGatherer
//! for more information.)
//!
//! IMPLEMENTATION DECISIONS:\n
//! This functionality has been separated from the CModel class hierarchy,
//! which own data gatherer objects because we want to avoid monolithic
//! model classes.
//!
//! This represents a natural division of the tasks of modeling a time
//! series and gathering the data to model. It is anticipated that the
//! data gathering could become reasonably involved if, for example, we
//! start doing regression to estimate gradient, curvature or even
//! arbitrary basis function coefficients, to describe the shape of the
//! time series or start estimating temporal correlation in the series
//! (auto-regression).
//!
//! All accessors for quantities which vary in time in this class *must*
//! take a time. This is so that they can sanity check the input to
//! ensure that data are available at the requested time. The intention
//! is that this should automatically detect if the gatherer is being
//! misused. The passage of time is managed in objects of this class
//! by addArrival (which invokes timeNow with the event time) and timeNow
//! which refreshes the current time. Data are only available for the
//! bucketing interval containing the current time so must be sampled
//! before the time is incremented passed the end of that bucketing
//! interval. For models, this is managed by the CModel::sample function
//! implementations.
//!
//! Time-based data gathering is handled by further classes derived from
//! CBucketGatherer, for Metrics and EventRates accordingly.
class MODEL_EXPORT CDataGatherer {
public:
using TDoubleVec = std::vector<double>;
using TDouble1Vec = core::CSmallVector<double, 1>;
using TSizeVec = std::vector<std::size_t>;
using TStrVec = std::vector<std::string>;
using TStrVecCItr = TStrVec::const_iterator;
using TStrCPtrVec = std::vector<const std::string*>;
using TSizeUInt64Pr = std::pair<std::size_t, std::uint64_t>;
using TSizeUInt64PrVec = std::vector<TSizeUInt64Pr>;
using TFeatureVec = model_t::TFeatureVec;
using TSizeSizePr = std::pair<std::size_t, std::size_t>;
using TSizeSizePrUInt64Pr = std::pair<TSizeSizePr, std::uint64_t>;
using TSizeSizePrUInt64PrVec = std::vector<TSizeSizePrUInt64Pr>;
using TSizeSizePrUInt64UMap = boost::unordered_map<TSizeSizePr, std::uint64_t>;
using TSizeSizePrUInt64UMapQueue = CBucketQueue<TSizeSizePrUInt64UMap>;
using TSizeSizePrOptionalStrPrUInt64UMap = CBucketGatherer::TSizeSizePrOptionalStrPrUInt64UMap;
using TSizeSizePrOptionalStrPrUInt64UMapVec = std::vector<TSizeSizePrOptionalStrPrUInt64UMap>;
using TSizeSizePrOptionalStrPrUInt64UMapVecQueue =
CBucketQueue<TSizeSizePrOptionalStrPrUInt64UMapVec>;
using TSearchKeyCRef = std::reference_wrapper<const CSearchKey>;
using TBucketGathererPtr = std::unique_ptr<CBucketGatherer>;
using TFeatureAnyPr = std::pair<model_t::EFeature, std::any>;
using TFeatureAnyPrVec = std::vector<TFeatureAnyPr>;
using TMetricCategoryVec = std::vector<model_t::EMetricCategory>;
using TSampleCountsPtr = std::unique_ptr<CSampleCounts>;
using TTimeVec = std::vector<core_t::TTime>;
public:
//! The summary count indicating an explicit null record.
static const std::size_t EXPLICIT_NULL_SUMMARY_COUNT;
//! The expected memory usage per by field
static const std::size_t ESTIMATED_MEM_USAGE_PER_BY_FIELD;
//! The expected memory usage per over field
static const std::size_t ESTIMATED_MEM_USAGE_PER_OVER_FIELD;
public:
//! \name Life-cycle
//@{
//! Create a new data series gatherer.
//!
//! \param[in] gathererType Indicates what sort of bucket data to gather:
//! EventRate/Metric, Population/Individual
//! \param[in] summaryMode Indicates whether the data being gathered
//! are already summarized by an external aggregation process.
//! \param[in] modelParams The global configuration parameters.
//! \param[in] partitionFieldValue The value of the field which splits
//! the data.
//! \param[in] key The key of the search for which to gatherer data.
//! \param[in] features The features of the data to model.
//! \param[in] bucketGathererInitData The parameter initialization object for the bucket gatherer.
CDataGatherer(model_t::EAnalysisCategory gathererType,
model_t::ESummaryMode summaryMode,
const SModelParams& modelParams,
std::string partitionFieldValue,
const CSearchKey& key,
const TFeatureVec& features,
const CBucketGatherer::SBucketGathererInitData& bucketGathererInitData);
//! Construct from a state document.
CDataGatherer(model_t::EAnalysisCategory gathererType,
model_t::ESummaryMode summaryMode,
const SModelParams& modelParams,
std::string partitionFieldValue,
const CSearchKey& key,
const CBucketGatherer::SBucketGathererInitData& bucketGathererInitData,
core::CStateRestoreTraverser& traverser);
//! Create a copy that will result in the same persisted state as the
//! original. This is effectively a copy constructor that creates a
//! copy that's only valid for a single purpose. The boolean flag is
//! redundant except to create a signature that will not be mistaken for
//! a general purpose copy constructor.
CDataGatherer(bool isForPersistence, const CDataGatherer& other);
~CDataGatherer();
CDataGatherer(const CDataGatherer&) = delete;
CDataGatherer& operator=(const CDataGatherer&) = delete;
//@}
//! \name Persistence
//@{
//! Persist state by passing information to the supplied inserter.
void acceptPersistInserter(core::CStatePersistInserter& inserter) const;
//! Create a clone of this data gatherer that will result in the same
//! persisted state. The clone may be incomplete in ways that do not
//! affect the persisted representation, and must not be used for any
//! other purpose.
//!
//! \warning The caller owns the object returned.
CDataGatherer* cloneForPersistence() const;
//@}
//! Check if the data being gathered are already summarized by an
//! external aggregation process.
model_t::ESummaryMode summaryMode() const;
//! Get the function.
model::function_t::EFunction function() const;
//! Get a description of the component searches.
std::string description() const;
//! Is this a population data gatherer?
bool isPopulation() const;
//! Get the maximum size of all the member containers.
std::size_t maxDimension() const;
//! \name Fields
//@{
//! Get the partition field name.
//!
//! The name of the partitioning field.
const std::string& partitionFieldName() const;
//! Get the partition field value.
//!
//! The value of the partitioning field.
const std::string& partitionFieldValue() const;
//! This is the common field in all searches "along" which the
//! probabilities are aggregated, i.e. the "by" field name for
//! individual models and the "over" field name for population
//! models.
const std::string& personFieldName() const;
//! Get the attribute field name if one exists.
const std::string& attributeFieldName() const;
//! Get the name of the field containing the metric value.
const std::string& valueFieldName() const;
//! Get an iterator at the beginning the influencing field names.
TStrVecCItr beginInfluencers() const;
//! Get an iterator at the end of the influencing field names.
TStrVecCItr endInfluencers() const;
//! Return the search key for which this is gathering data.
const CSearchKey& searchKey() const;
//! Get the fields for which to gather data.
//!
//! This defines the fields to extract from a record. These include
//! the fields which define the categories whose counts are being
//! analyzed, the fields containing metric series names and values
//! and the fields defining a population.
const TStrVec& fieldsOfInterest() const;
//! Get the number of by field values. For a population model this will
//! be equal to numberActiveAttributes(); for an individual model
//! numberActivePeople().
std::size_t numberByFieldValues() const;
//! Get the number of over field values. For a population model this
//! will be equal to numberActivePeople(); for an individual model 0.
std::size_t numberOverFieldValues() const;
//! Have we been configured to use NULL values?
bool useNull() const;
//@}
//! \name Update
//@{
//! Process the specified fields.
//!
//! This adds people and attributes as necessary and fills out the
//! event data from \p fieldValues.
bool processFields(const TStrCPtrVec& fieldValues,
CEventData& result,
CResourceMonitor& resourceMonitor);
//! Record the arrival of \p data at \p time.
bool addArrival(const TStrCPtrVec& fieldValues, CEventData& data, CResourceMonitor& resourceMonitor);
//! Roll time to the end of the bucket that is latency after the sampled bucket.
void sampleNow(core_t::TTime sampleBucketStart);
//! Roll time to the end of the bucket that is latency after the sampled bucket
//! without performing any updates that impact the model.
void skipSampleNow(core_t::TTime sampleBucketStart);
//@}
//! \name Features
//@{
//! Get the number of features on which this is gathering data.
std::size_t numberFeatures() const;
//! Check if this is gathering data on \p feature.
bool hasFeature(model_t::EFeature feature) const;
//! Get the feature corresponding to \p i.
//!
//! \warning \p i must be in range for the features this gatherer
//! is collecting, i.e. it must be less than numberFeatures.
model_t::EFeature feature(std::size_t i) const;
//! Get the collection of features for which data is being gathered.
const TFeatureVec& features() const;
//! Get the data for all features for the bucketing time interval
//! containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[in] bucketLength The length of the bucketing interval.
//! \param[out] result Filled in with the feature data at \p time.
//! \tparam T The type of the feature data.
template<typename T>
bool featureData(core_t::TTime time,
core_t::TTime bucketLength,
std::vector<std::pair<model_t::EFeature, T>>& result) const {
TFeatureAnyPrVec rawFeatureData;
m_BucketGatherer->featureData(time, bucketLength, rawFeatureData);
bool succeeded = true;
result.clear();
result.reserve(rawFeatureData.size());
for (std::size_t i = 0; i < rawFeatureData.size(); ++i) {
TFeatureAnyPr& feature = rawFeatureData[i];
// Check the typeid before attempting the cast so we
// don't use throw to handle failure, which is slow.
if (feature.second.type() != typeid(T)) {
LOG_ERROR(<< "Bad type for feature = " << model_t::print(feature.first)
<< ", expected " << typeid(T).name() << " got "
<< feature.second.type().name());
succeeded = false;
continue;
}
// We emulate move semantics here to avoid the expensive
// copy if T is large (as we expect it might be sometimes).
// We have to adopt the using std::swap idiom (contravening
// coding guidelines) because T can be a built in type.
// Unfortunately, this implementation requires T to be
// default constructible.
using std::swap;
result.push_back(std::pair<model_t::EFeature, T>(feature.first, T()));
T& tmp = std::any_cast<T&>(feature.second);
swap(result.back().second, tmp);
}
return succeeded;
}
//@}
//! \name Person
//@{
//! Get the number of active people (not pruned).
std::size_t numberActivePeople() const;
//! Get the maximum person identifier seen so far
//! (some of which might have been pruned).
std::size_t numberPeople() const;
//! Get the unique identifier of a person if it exists.
//!
//! \param[in] person The person of interest.
//! \param[out] result Filled in with the identifier of \p person
//! if they exist otherwise max std::size_t.
//! \return True if the person exists and false otherwise.
bool personId(const std::string& person, std::size_t& result) const;
//! Get the unique identifier of an arbitrary known person.
//! \param[out] result Filled in with the identifier of a person
//! \return True if a person exists and false otherwise.
bool anyPersonId(std::size_t& result) const;
//! Get the name of the person identified by \p pid if they exist.
//!
//! \param[in] pid The unique identifier of the person of interest.
//! \return The person name if they exist and a fallback otherwise.
const std::string& personName(std::size_t pid) const;
//! Get the name of the person identified by \p pid if they exist.
//!
//! \param[in] pid The unique identifier of the person of interest.
//! \param[in] fallback The fall back name.
//! \return The person name if they exist and \p fallback otherwise.
const std::string& personName(std::size_t pid, const std::string& fallback) const;
//! Get the non-zero counts by person for the bucketing interval
//! containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[out] result Filled in with the non-zero counts by person.
//! The first element is the person identifier and the second their
//! count in the bucketing interval. The result is sorted by person.
//! \note We expect the non-zero counts to be sparse on the space
//! of people so use a sparse encoding:
//! <pre class="fragment">
//! \f$ pid \leftarrow c\f$
//! </pre>
//! where,\n
//! \f$pid\f$ is the person identifier,\n
//! \f$c\f$ is the count for the person.
void personNonZeroCounts(core_t::TTime time, TSizeUInt64PrVec& result) const;
//! Stop gathering data on the people identified by \p peopleToRemove.
void recyclePeople(const TSizeVec& peopleToRemove);
//! Remove all traces of people whose identifiers are greater than
//! or equal to \p lowestPersonToRemove.
void removePeople(std::size_t lowestPersonToRemove);
//! Get unique identifiers of any people that have been recycled.
TSizeVec& recycledPersonIds();
//! Check that the person is no longer being modeled.
bool isPersonActive(std::size_t pid) const;
//! Record a person called \p person.
std::size_t addPerson(const std::string& person,
CResourceMonitor& resourceMonitor,
bool& addedPerson);
//@}
//! \name Attribute
//@{
//! Get the number of active attributes (not pruned).
std::size_t numberActiveAttributes() const;
//! Get the maximum attribute identifier seen so far
//! (some of which might have been pruned).
std::size_t numberAttributes() const;
//! Get the unique identifier of an attribute if it exists.
//!
//! \param[in] attribute The person of interest.
//! \param[out] result Filled in with the identifier of \p attribute
//! if they exist otherwise max std::size_t.
//! \return True if the attribute exists and false otherwise.
bool attributeId(const std::string& attribute, std::size_t& result) const;
//! Get the name of the attribute identified by \p cid if they exist.
//!
//! \param[in] cid The unique identifier of the attribute of interest.
//! \return The attribute name if it exists anda fallback otherwise.
const std::string& attributeName(std::size_t cid) const;
//! Get the name of the attribute identified by \p cid if they exist.
//!
//! \param[in] cid The unique identifier of the attribute of interest.
//! \param[in] fallback The fall back name.
//! \return The attribute name if it exists and \p fallback otherwise.
const std::string& attributeName(std::size_t cid, const std::string& fallback) const;
//! Stop gathering data on the attributes identified by \p attributesToRemove.
void recycleAttributes(const TSizeVec& attributesToRemove);
//! Remove all traces of attributes whose identifiers are greater than
//! or equal to \p lowestAttributeToRemove.
void removeAttributes(std::size_t lowestAttributeToRemove);
//! Get unique identifiers of any attributes that have been recycled.
TSizeVec& recycledAttributeIds();
//! Check that the person is no longer being modeled.
bool isAttributeActive(std::size_t cid) const;
//@}
//! \name Metric
//@{
//! Get the current number of measurements in a sample for
//! the model of the entity identified by \p id.
//!
//! If we are performing temporal analysis we have one sample
//! count per person and if we are performing population analysis
//! we have one sample count per attribute.
double sampleCount(std::size_t id) const;
//! Get the effective number of measurements in a sample for
//! the model of the entity identified by \p id.
//!
//! If we are performing temporal analysis we have one sample
//! count per person and if we are performing population analysis
//! we have one sample count per attribute.
double effectiveSampleCount(std::size_t id) const;
//! Reset the number of measurements in a sample for the entity
//! identified \p id.
//!
//! If we are performing individual analysis we have one sample
//! count per person and if we are performing population analysis
//! we have one sample count per attribute.
void resetSampleCount(std::size_t id);
//! Get the sample counts.
const TSampleCountsPtr& sampleCounts() const;
//@}
//! \name Time
//@{
//! Get the start of the current bucketing time interval.
core_t::TTime currentBucketStartTime() const;
//! Get the length of the bucketing time interval.
core_t::TTime bucketLength() const;
//! Check if data is available at \p time.
bool dataAvailable(core_t::TTime time) const;
//! For each bucket in the interval [\p startTime, \p endTime],
//! validate that it can be sampled and increment \p startTime
//! to the first valid bucket or \p endTime if no valid buckets
//! exist.
//!
//! \param[in,out] startTime The start of the interval to sample.
//! \param[in] endTime The end of the interval to sample.
bool validateSampleTimes(core_t::TTime& startTime, core_t::TTime endTime) const;
//! Roll time forwards to \p time. Note this method is only supported
//! for testing purposes and should not normally be called.
void timeNow(core_t::TTime time);
//! Print the current bucket.
std::string printCurrentBucket() const;
//! Record a attribute called \p attribute.
std::size_t addAttribute(const std::string& attribute,
CResourceMonitor& resourceMonitor,
bool& addedAttribute);
//@}
//! \name Counts
//@{
//! Get the non-zero (person, attribute) pair counts in the
//! bucketing interval corresponding to the given time.
const TSizeSizePrUInt64UMap& bucketCounts(core_t::TTime time) const;
//! Get the non-zero (person, attribute) pair counts for each
//! value of influencing field.
const TSizeSizePrOptionalStrPrUInt64UMapVec& influencerCounts(core_t::TTime time) const;
//@}
//! Get the checksum of this gatherer.
std::uint64_t checksum() const;
//! Debug the memory used by this component.
void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const;
//! Get the memory used by this component.
std::size_t memoryUsage() const;
//! Clear this data gatherer.
void clear();
//! Reset bucket and return true if bucket was successfully
//! reset or false otherwise.
bool resetBucket(core_t::TTime bucketStart);
//! Release memory that is no longer needed
void releaseMemory(core_t::TTime samplingCutoffTime);
//! Get the global configuration parameters.
const SModelParams& params() const;
// \name Tuple
//@{
//! Extract the person identifier from a tuple.
template<typename T>
static inline std::size_t extractPersonId(const std::pair<const TSizeSizePr, T>& tuple) {
return tuple.first.first;
}
//! Extract the person identifier from a tuple.
template<typename T>
static inline std::size_t extractPersonId(const std::pair<TSizeSizePr, T>& tuple) {
return tuple.first.first;
}
//! Extract the person identifier from a tuple.
static inline std::size_t extractPersonId(const TSizeSizePr& tuple) {
return tuple.first;
}
//! Extracts the person identifier from a tuple.
struct SExtractPersonId {
template<typename TUPLE>
std::size_t operator()(const TUPLE& t) const {
return CDataGatherer::extractPersonId(t);
}
};
//! Extract the attribute identifier from a tuple.
template<typename T>
static inline std::size_t
extractAttributeId(const std::pair<const TSizeSizePr, T>& tuple) {
return tuple.first.second;
}
//! Extract the attribute identifier from a tuple.
template<typename T>
static inline std::size_t extractAttributeId(const std::pair<TSizeSizePr, T>& tuple) {
return tuple.first.second;
}
//! Extract the attribute identifier from a tuple.
static inline std::size_t extractAttributeId(const TSizeSizePr& tuple) {
return tuple.second;
}
//! Extracts the attribute identifier from a tuple.
struct SExtractAttributeId {
template<typename TUPLE>
std::size_t operator()(const TUPLE& t) const {
return CDataGatherer::extractAttributeId(t);
}
};
//! Extract the data from a tuple.
template<typename T>
static inline const T& extractData(const std::pair<const TSizeSizePr, T>& tuple) {
return tuple.second;
}
//! Extract the data from a tuple.
template<typename T>
static inline const T& extractData(const std::pair<TSizeSizePr, T>& tuple) {
return tuple.second;
}
//@}
//! In the case of manually named summarized statistics, map the first
//! feature to a metric category.
bool determineMetricCategory(TMetricCategoryVec& fieldMetricCategories) const;
//! Helper to avoid code duplication when getting a count from a
//! field. Logs different errors for missing value and invalid value.
static bool extractCountFromField(const std::string& fieldName,
const std::string* fieldValue,
std::size_t& count);
//! Helper to avoid code duplication when getting a metric value from a
//! field. Logs different errors for missing value and invalid value.
bool extractMetricFromField(const std::string& fieldName,
std::string fieldValue,
TDouble1Vec& metricValue) const;
//! Returns the startTime of the earliest bucket for which data are still
//! accepted.
core_t::TTime earliestBucketStartTime() const;
//! Check the class invariants.
bool checkInvariants() const;
private:
//! The summary count field value to indicate that the record should
//! be ignored.
static const std::string EXPLICIT_NULL;
private:
using TModelParamsCRef = std::reference_wrapper<const SModelParams>;
private:
//! Restore state from supplied traverser.
bool acceptRestoreTraverser(const CBucketGatherer::SBucketGathererInitData& bucketGathererInitData,
core::CStateRestoreTraverser& traverser);
//! Restore a bucket gatherer from the supplied traverser.
bool restoreBucketGatherer(const CBucketGatherer::SBucketGathererInitData& bucketGathererInitData,
core::CStateRestoreTraverser& traverser);
//! Persist a bucket gatherer by passing information to the supplied
//! inserter.
void persistBucketGatherers(core::CStatePersistInserter& inserter) const;
//! Create the bucket specific data gatherer.
void createBucketGatherer(model_t::EAnalysisCategory gathererType,
const CBucketGatherer::SBucketGathererInitData& initData);
private:
//! The type of the bucket gatherer(s) used.
model_t::EAnalysisCategory m_GathererType;
//! The collection of features on which to gather data.
TFeatureVec m_Features;
//! The bucket gatherer which contains the bucket-specific
//! metrics and counts.
TBucketGathererPtr m_BucketGatherer;
//! Indicates whether the data being gathered are already summarized
//! by an external aggregation process.
model_t::ESummaryMode m_SummaryMode;
//! The global configuration parameters.
TModelParamsCRef m_Params;
//! The key of the search for which data is being gathered.
TSearchKeyCRef m_SearchKey;
//! The value of the partition field for this detector.
std::string m_PartitionFieldValue;
//! A registry where person names are mapped to unique IDs.
CDynamicStringIdRegistry m_PeopleRegistry;
//! A registry where attribute names are mapped to unique IDs.
CDynamicStringIdRegistry m_AttributesRegistry;
//! True if this is a population data gatherer and false otherwise.
bool m_Population;
//! If true the gatherer will process missing person field values.
bool m_UseNull;
//! The object responsible for managing sample counts.
TSampleCountsPtr m_SampleCounts;
};
}
}
#endif // INCLUDED_ml_model_CDataGatherer_h