include/model/CEventRateBucketGatherer.h (165 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#ifndef INCLUDED_ml_model_CEventRateBucketGatherer_h
#define INCLUDED_ml_model_CEventRateBucketGatherer_h
#include <core/CCompressedDictionary.h>
#include <core/CMemoryUsage.h>
#include <core/CoreTypes.h>
#include <model/CBucketGatherer.h>
#include <model/CFeatureData.h>
#include <model/ImportExport.h>
#include <model/ModelTypes.h>
#include <boost/unordered_map.hpp>
#include <any>
#include <map>
#include <string>
#include <vector>
namespace ml {
namespace model {
//! \brief A structure to handle storing unique strings per person,
//! attribute and influencer, used for the analytic functions
//! "distinct_count" and "info_content"
class MODEL_EXPORT CUniqueStringFeatureData {
public:
using TDictionary1 = core::CCompressedDictionary<1>;
using TWord = TDictionary1::CWord;
using TWordSet = TDictionary1::TWordSet;
using TWordStringUMap = TDictionary1::TWordTUMap<std::string>;
using TOptionalStr = std::optional<std::string>;
using TOptionalStrWordSetUMap = boost::unordered_map<TOptionalStr, TWordSet>;
using TOptionalStrWordSetUMapVec = std::vector<TOptionalStrWordSetUMap>;
using TStrCRef = SEventRateFeatureData::TStrCRef;
using TDouble1Vec = SEventRateFeatureData::TDouble1Vec;
using TDouble1VecDoublePr = SEventRateFeatureData::TDouble1VecDoublePr;
using TStrCRefDouble1VecDoublePrPr = SEventRateFeatureData::TStrCRefDouble1VecDoublePrPr;
using TStrCRefDouble1VecDoublePrPrVec = SEventRateFeatureData::TStrCRefDouble1VecDoublePrPrVec;
using TOptionalStrVec = CBucketGatherer::TOptionalStrVec;
public:
//! Add a string into the collection
void insert(const std::string& value, const TOptionalStrVec& influences);
//! Fill in a FeatureData structure with the influence strings and counts
void populateDistinctCountFeatureData(SEventRateFeatureData& featureData) const;
//! Fill in a FeatureData structure with the influence info_content
void populateInfoContentFeatureData(SEventRateFeatureData& featureData) const;
//! Persist state by passing information \p inserter.
void acceptPersistInserter(core::CStatePersistInserter& inserter) const;
//! Initialize state reading from \p traverser.
bool acceptRestoreTraverser(core::CStateRestoreTraverser& traverser);
//! Get the checksum of this object.
std::uint64_t checksum() const;
//! Get the memory usage of this object in a tree structure.
void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const;
//! Get the memory usage of this object.
std::size_t memoryUsage() const;
//! Print the unique strings for debug.
std::string print() const;
private:
TDictionary1 m_Dictionary1;
TWordStringUMap m_UniqueStrings;
TOptionalStrWordSetUMapVec m_InfluencerUniqueStrings;
};
//! \brief Event rate data gathering class.
//!
//! DESCRIPTION:\n
//! This performs all pre-processing of the data, which we use in order
//! to model the event rate in an arbitrary time series.
//!
//! \sa CDataGatherer.
class MODEL_EXPORT CEventRateBucketGatherer final : public CBucketGatherer {
public:
using TCategoryAnyMap = std::map<model_t::EEventRateCategory, std::any>;
using TStrCRef = SEventRateFeatureData::TStrCRef;
using TDouble1Vec = SEventRateFeatureData::TDouble1Vec;
using TDouble1VecDoublePr = SEventRateFeatureData::TDouble1VecDoublePr;
using TStrCRefDouble1VecDoublePrPr = SEventRateFeatureData::TStrCRefDouble1VecDoublePrPr;
using TStrCRefDouble1VecDoublePrPrVec = SEventRateFeatureData::TStrCRefDouble1VecDoublePrPrVec;
using TStrCRefDouble1VecDoublePrPrVecVec = SEventRateFeatureData::TStrCRefDouble1VecDoublePrPrVecVec;
using TSizeFeatureDataPr = std::pair<std::size_t, SEventRateFeatureData>;
using TSizeFeatureDataPrVec = std::vector<TSizeFeatureDataPr>;
using TSizeSizePrFeatureDataPr = std::pair<TSizeSizePr, SEventRateFeatureData>;
using TSizeSizePrFeatureDataPrVec = std::vector<TSizeSizePrFeatureDataPr>;
public:
//! \name Life-cycle
//@{
//! Create an event rate bucket gatherer.
//!
//! \param[in] dataGatherer The owning data gatherer.
//! \param[in] bucketGathererInitData The parameter initialization object.
CEventRateBucketGatherer(CDataGatherer& dataGatherer,
const SBucketGathererInitData& bucketGathererInitData);
//! Construct from a state document.
CEventRateBucketGatherer(CDataGatherer& dataGatherer,
const SBucketGathererInitData& bucketGathererInitData,
core::CStateRestoreTraverser& traverser);
//! Create a copy that will result in the same persisted state as the
//! original. This is effectively a copy constructor that creates a
//! copy that's only valid for a single purpose. The boolean flag is
//! redundant except to create a signature that will not be mistaken
//! for a general purpose copy constructor.
CEventRateBucketGatherer(bool isForPersistence, const CEventRateBucketGatherer& other);
//@}
//! \name Persistence
//@{
//! Fill in the state from \p traverser.
bool acceptRestoreTraverser(core::CStateRestoreTraverser& traverser);
//! Persist state by passing information to the supplied inserter
void acceptPersistInserter(core::CStatePersistInserter& inserter) const override;
//! Create a clone of this data gatherer that will result in the same
//! persisted state. The clone may be incomplete in ways that do not
//! affect the persisted representation, and must not be used for any
//! other purpose.
//!
//! \warning The caller owns the object returned.
CBucketGatherer* cloneForPersistence() const override;
//! The persistence tag name of this derived class.
const std::string& persistenceTag() const override;
//@}
//! \name Fields
//@{
//! Get the person field name.
//!
//! This is the common field in all searches "along" which the
//! probabilities are aggregated, i.e. the "over" field name for
//! population searches and the "by" field name for individual
//! searches.
const std::string& personFieldName() const override;
//! Get the attribute field name if one exists, i.e. the "by" for
//! population searches, field name and returns empty otherwise.
const std::string& attributeFieldName() const override;
//! Returns an empty string.
const std::string& valueFieldName() const override;
//! Get an iterator at the beginning the influencing field names.
TStrVecCItr beginInfluencers() const override;
//! Get an iterator at the end of the influencing field names.
TStrVecCItr endInfluencers() const override;
//! Get the fields for which to gather data.
//!
//! For individual searches this gets the field which defines the
//! categories whose counts are being analyzed. For population
//! searches this gets the fields identifying the people and person
//! attributes which are being analyzed. An empty string acts like
//! a wild card and matches all records. This is used for analysis
//! which is attribute independent such as total count.
const TStrVec& fieldsOfInterest() const override;
//@}
//! Get a description of the search.
std::string description() const override;
//! \name Update
//@{
//! Process the specified fields.
//!
//! \note For individual searches \p fieldValues should contain one
//! field containing the by clause field value or a generic name if
//! none was specified. For population searches \p fieldValues should
//! contain two fields. The first field should contain the over clause
//! field value. The second field should the by clause field value
//! or a generic name if none was specified.
bool processFields(const TStrCPtrVec& fieldValues,
CEventData& result,
CResourceMonitor& resourceMonitor) override;
//@}
//! \name Person
//@{
//! Stop gathering data on the people identified by \p peopleToRemove.
void recyclePeople(const TSizeVec& peopleToRemove) override;
//! Remove all traces of people whose identifiers are greater than
//! or equal to \p lowestPersonToRemove.
void removePeople(std::size_t lowestPersonToRemove) override;
//@}
//! \name Attribute
//@{
//! Stop gathering data on the attributes identified by \p attributesToRemove.
void recycleAttributes(const TSizeVec& attributesToRemove) override;
//! Remove all traces of attributes whose identifiers are greater than
//! or equal to \p lowestAttributeToRemove.
void removeAttributes(std::size_t lowestAttributeToRemove) override;
//@}
//! Get the checksum of this gatherer.
std::uint64_t checksum() const override;
//! Get the memory used by this object.
void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const override;
//! Get the memory used by this object.
std::size_t memoryUsage() const override;
//! Get the static size of this object - used for virtual hierarchies
std::size_t staticSize() const override;
//! Clear this data gatherer.
void clear() override;
//! Reset bucket and return true if bucket was successfully reset or false otherwise.
bool resetBucket(core_t::TTime bucketStart) override;
//! Release memory that is no longer needed
void releaseMemory(core_t::TTime samplingCutoffTime) override;
//! \name Features
//@{
//! Get the raw data for all features for the bucketing time interval
//! containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[out] result Filled in with the feature data at \p time.
void featureData(core_t::TTime time,
core_t::TTime bucketLength,
TFeatureAnyPrVec& result) const override;
//@}
private:
//! No-op.
void sample(core_t::TTime time) override;
//! Append the counts by person for the bucketing interval containing
//! \p time.
//!
//! \param[in] time The time of interest.
//! \param[in,out] result Append (person identifier, count) for each
//! person. The collection is sorted by person.
void personCounts(model_t::EFeature feature, core_t::TTime time, TFeatureAnyPrVec& result) const;
//! Append the non-zero counts by person for bucketing interval
//! containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[in,out] result Append (person identifier, count) for each
//! person present in the bucketing interval containing \p time. The
//! collection is sorted by person.
void nonZeroPersonCounts(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result) const;
//! Append an indicator function for people present in the bucketing
//! interval containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[in,out] result Append (person identifier, 1) for each person
//! present in the bucketing interval containing \p time. The collection
//! is sorted by person identifier.
void personIndicator(model_t::EFeature feature, core_t::TTime time, TFeatureAnyPrVec& result) const;
//! Append the mean arrival times for people present in the current
//! bucketing interval.
//!
//! \param[in] time The time of interest.
//! \param[in,out] result Append (person identifier, mean arrival time)
//! for each person present in the bucketing interval containing \p time.
//! The collection is sorted by person identifier.
void personArrivalTimes(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result) const;
//! Append the non-zero counts for each attribute by person for the
//! bucketing interval containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[in,out] result Append the non-zero attribute counts by
//! person. The first element of the key is person and the second
//! attribute. The collection is sorted lexicographically by key.
//! \note We expect the pairs present to be sparse on the full outer
//! product space of attribute and person so use a sparse encoding.
void nonZeroAttributeCounts(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result) const;
//! Append the number of unique people hitting each attribute.
//!
//! \param[in,out] result Append the count of people per attribute.
//! The person identifier is dummied to zero so that the result
//! type matches other population features.
void peoplePerAttribute(model_t::EFeature feature, TFeatureAnyPrVec& result) const;
//! Append an indicator function for (person, attribute) pairs
//! present in the bucketing interval containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[in,out] result Append one for each (person, attribute)
//! pair present in the bucketing interval containing \p time. The
//! first element of the key is person and the second attribute. The
//! collection is sorted lexicographically by key.
//! \note We expect the pairs present to be sparse on the full outer
//! product space of attribute and person so use a sparse encoding.
void attributeIndicator(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result) const;
//! Append the number of unique values for each person
//! in the bucketing interval containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[out] result Filled in with the unique value counts
//! by person
void bucketUniqueValuesPerPerson(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result) const;
//! Append the number of unique values for each person and attribute
//! in the bucketing interval containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[out] result Filled in with the unique value counts
//! by person and attribute
void bucketUniqueValuesPerPersonAttribute(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result) const;
//! Append the compressed length of the unique attributes each person
//! hits in the bucketing interval containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[out] result Filled in with the compressed length of the
//! unique values by person and attribute
void bucketCompressedLengthPerPerson(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result) const;
//! Append the compressed length of the unique attributes each person
//! hits in the bucketing interval containing \p time.
//!
//! \param[in] time The time of interest.
//! \param[out] result Filled in with the compressed length of the
//! unique values by person and attribute
void bucketCompressedLengthPerPersonAttribute(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result) const;
//! Append the time-of-day/week values for each person in the
//! bucketing interval \p time.
//!
//! \param[in] time The time of interest.
//! \param[out] result Filled in with the arrival time values
//! by person.
void bucketMeanTimesPerPerson(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result) const;
//! Append the time-of-day/week values of each attribute and person
//! in the bucketing interval \p time.
//!
//! \param[in] time The time of interest.
//! \param[out] result Filled in with the arrival time values
//! by attribute and person
void bucketMeanTimesPerPersonAttribute(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result) const;
//! Resize the necessary data structures so they can accommodate
//! the person and attribute identified by \p pid and \p cid,
//! respectively.
//!
//! \param[in] pid The identifier of the person to accommodate.
//! \param[in] cid The identifier of the attribute to accommodate.
void resize(std::size_t pid, std::size_t cid) override;
//! Record the arrival of the \p values for the person identified
//! by \p pid.
//!
//! \param[in] pid The identifier of the person who generated the
//! record(s).
//! \param[in] cid The identifier of the attribute who generated
//! the record(s).
//! \param[in] time The end time of the record(s).
//! \param[in] values Ignored.
//! \param[in] count The number of records.
//! \param[in] stringValue The value for the function string argument
//! if there is one or null.
//! \param[in] influences The influencing field values which label
//! the value.
void addValue(std::size_t pid,
std::size_t cid,
core_t::TTime time,
const CEventData::TDouble1VecArray& values,
std::size_t count,
const CEventData::TOptionalStr& stringValue,
const TOptionalStrVec& influences) override;
//! Start a new bucket.
void startNewBucket(core_t::TTime time, bool skipUpdates) override;
//! Initialize the field names collection.
void initializeFieldNames(const CBucketGatherer::SBucketGathererInitData& initData);
//! Initialize the feature data gatherers.
void initializeFeatureData();
//! Copy the influencer person counts to \p results.
//!
//! \warning This assumes that \p result is sorted by person
//! identifier.
void addInfluencerCounts(core_t::TTime time, TSizeFeatureDataPrVec& result) const;
//! Copy the influencer person and attribute counts to \p results.
//!
//! \warning This assumes that \p result is sorted by person
//! and attribute identifier.
void addInfluencerCounts(core_t::TTime time, TSizeSizePrFeatureDataPrVec& result) const;
private:
//! The name of the field value of interest for keyed functions
std::string m_ValueFieldName;
//! The names of the fields of interest.
//!
//! This is of the form:
//! -# The name of the field which identifies people,
//! -# [The name of the field which identifies people's attributes],
//! -# [The names of the influencing fields],
//! -# [The name of the field which identifies a function to key off],
//! -# [The name of the field containing the person(/attribute) count
//! if summarized data are being gathered]
TStrVec m_FieldNames;
//! The position of the first influencer field
std::size_t m_BeginInfluencingFields{0};
//! The position of the first count/value field.
std::size_t m_BeginValueField{0};
//! The position of the field holding the summarised count.
std::size_t m_BeginSummaryFields{0};
//! The data features we are gathering.
TCategoryAnyMap m_FeatureData;
};
}
}
namespace std {
//! Overload pair swap so that we use efficient swap of the feature data
//! when sorting.
inline void swap(ml::model::CEventRateBucketGatherer::TSizeFeatureDataPr& lhs,
ml::model::CEventRateBucketGatherer::TSizeFeatureDataPr& rhs) {
swap(lhs.first, rhs.first);
lhs.second.swap(rhs.second);
}
//! Overload pair swap so that we use efficient swap of the feature data
//! when sorting.
inline void swap(ml::model::CEventRateBucketGatherer::TSizeSizePrFeatureDataPr& lhs,
ml::model::CEventRateBucketGatherer::TSizeSizePrFeatureDataPr& rhs) {
swap(lhs.first, rhs.first);
lhs.second.swap(rhs.second);
}
}
#endif // INCLUDED_ml_model_CEventRateBucketGatherer_h