include/api/CAnomalyJob.h (250 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #ifndef INCLUDED_ml_api_CAnomalyJob_h #define INCLUDED_ml_api_CAnomalyJob_h #include <core/CoreTypes.h> #include <model/CAnomalyDetector.h> #include <model/CHierarchicalResultsAggregator.h> #include <model/CHierarchicalResultsNormalizer.h> #include <model/CInterimBucketCorrector.h> #include <model/CResourceMonitor.h> #include <model/CSearchKey.h> #include <api/CDataProcessor.h> #include <api/CForecastRunner.h> #include <api/CJsonOutputWriter.h> #include <api/CModelSnapshotJsonWriter.h> #include <api/ImportExport.h> #include <boost/unordered_map.hpp> #include <cstdint> #include <functional> #include <memory> #include <string> #include <utility> #include <vector> namespace CAnomalyJobTest { struct testParsePersistControlMessageArgs; struct testConfigUpdate; struct testOutputBucketResultsUntilGivenIncompleteInitialBucket; } namespace ml { namespace core { class CDataAdder; class CDataSearcher; class CJsonOutputStreamWrapper; class CStatePersistInserter; class CStateRestoreTraverser; } namespace model { class CAnomalyDetectorModelConfig; class CHierarchicalResults; class CLimits; } namespace api { class CAnomalyJobConfig; class CPersistenceManager; //! \brief //! The Ml anomaly detector. //! //! DESCRIPTION:\n //! Take a stream of input records and read those records //! according to the given field config. //! //! IMPLEMENTATION DECISIONS:\n //! Input must be in ascending time order. //! //! The output format is so complex that this class requires its output //! handler to be a CJsonOutputWriter rather than a writer for an //! arbitrary format //! class API_EXPORT CAnomalyJob : public CDataProcessor { public: //! Discriminant for Elasticsearch IDs static const std::string STATE_TYPE; //! Input field names static const std::string EMPTY_STRING; static const std::string DEFAULT_TIME_FIELD_NAME; public: //! Enum represents the result of persisted Model restoration //! Possible states are: //! -# IncorrectVersion: The version of the stored model state //! does not match the anomaly detector version. //! -# UnexpectedTag: State is malformed or could not be parsed //! correctly //! -# MemoryLimitReached: The detector could not be allocated //! becasuse it would violate the memory usage restrictions //! -# NotRestoredToTime: The detector was not restored to the //! requested time //! -# Success: //! -# Failure: enum ERestoreStateStatus { E_IncorrectVersion, E_UnexpectedTag, E_MemoryLimitReached, E_NotRestoredToTime, E_NoDetectorsRecovered, E_Success, E_Failure, E_Uninitialised }; public: using TPersistCompleteFunc = std::function<void(const CModelSnapshotJsonWriter::SModelSnapshotReport&)>; using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>; using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>; using TKeyVec = std::vector<model::CSearchKey>; using TKeyAnomalyDetectorPtrUMap = boost::unordered_map<model::CSearchKey::TStrKeyPr, TAnomalyDetectorPtr, model::CStrKeyPrHash, model::CStrKeyPrEqual>; using TKeyCRefAnomalyDetectorPtrPr = std::pair<model::CSearchKey::TStrCRefKeyCRefPr, TAnomalyDetectorPtr>; using TKeyCRefAnomalyDetectorPtrPrVec = std::vector<TKeyCRefAnomalyDetectorPtrPr>; using TModelPlotDataVec = model::CAnomalyDetector::TModelPlotDataVec; using TAnnotationVec = model::CAnomalyDetector::TAnnotationVec; struct API_EXPORT SRestoredStateDetail { ERestoreStateStatus s_RestoredStateStatus{E_Uninitialised}; std::optional<std::string> s_Extra; }; struct SBackgroundPersistArgs { SBackgroundPersistArgs(core_t::TTime time, const model::CResourceMonitor::SModelSizeStats& modelSizeStats, model::CInterimBucketCorrector interimBucketCorrector, const model::CHierarchicalResultsAggregator& aggregator, core_t::TTime latestRecordTime, core_t::TTime lastResultsTime, core_t::TTime initialLastFinalisedBucketEndTime); core_t::TTime s_Time; model::CResourceMonitor::SModelSizeStats s_ModelSizeStats; model::CInterimBucketCorrector s_InterimBucketCorrector; model::CHierarchicalResultsAggregator s_Aggregator; std::string s_NormalizerState; core_t::TTime s_LatestRecordTime; core_t::TTime s_LastResultsTime; core_t::TTime s_InitialLastFinalizedBucketEndTime; TKeyCRefAnomalyDetectorPtrPrVec s_Detectors; }; using TBackgroundPersistArgsPtr = std::shared_ptr<SBackgroundPersistArgs>; public: CAnomalyJob(std::string jobId, model::CLimits& limits, CAnomalyJobConfig& jobConfig, model::CAnomalyDetectorModelConfig& modelConfig, core::CJsonOutputStreamWrapper& outputBuffer, TPersistCompleteFunc persistCompleteFunc, CPersistenceManager* persistenceManager, core_t::TTime maxQuantileInterval, const std::string& timeFieldName, const std::string& timeFieldFormat, std::size_t maxAnomalyRecords); ~CAnomalyJob() override; //! Receive a single record to be processed, and produce output //! with any required modifications bool handleRecord(const TStrStrUMap& dataRowFields, TOptionalTime time) override; //! Perform any final processing once all input data has been seen. void finalise() override; //! Restore previously saved state bool restoreState(core::CDataSearcher& restoreSearcher, core_t::TTime& completeToTime) override; //! Persist state in the foreground. As this blocks the current thread of execution //! it should only be called in special circumstances, e.g. at job close, where it won't impact job analysis. bool persistStateInForeground(core::CDataAdder& persister, const std::string& descriptionPrefix) override; //! Persist the current model state in the foreground regardless of whether //! any results have been output. bool doPersistStateInForeground(core::CDataAdder& persister, const std::string& description, const std::string& snapshotId, core_t::TTime snapshotTimestamp); //! Persist state of the residual models only. //! This method is not intended to be called in production code //! as it only persists a very small subset of model state with longer, //! human readable tags. bool persistModelsState(core::CDataAdder& persister, core_t::TTime timestamp); //! Initialise normalizer from quantiles state virtual bool initNormalizer(const std::string& quantilesStateFile); //! How many records did we handle? std::uint64_t numRecordsHandled() const override; //! Is persistence needed? bool isPersistenceNeeded(const std::string& description) const override; //! Log a list of the detectors and keys void description() const; //! Log a list of the detectors, keys and their memory usage void descriptionAndDebugMemoryUsage() const; //! Extra information on the success/failure of restoring the model state. //! In certain situations such as no data being loaded from the restorer //! or the stored state version is wrong the restoreState function will //! still return true. If interested in these kinds of errors check them //! here. const SRestoredStateDetail& restoreStateStatus() const; private: //! NULL pointer that we can take a long-lived const reference to static const TAnomalyDetectorPtr NULL_DETECTOR; private: //! Handle a control message. The first character of the control //! message indicates its type. Currently defined types are: //! ' ' => Dummy message to force all previously uploaded data through //! buffers //! 'f' => Echo a flush ID so that the attached process knows that data //! sent previously has all been processed //! 'i' => Generate interim results bool handleControlMessage(const std::string& controlMessage); //! Helper function to set the last bucket end time in the detectors. void setDetectorsLastBucketEndTime(core_t::TTime lastBucketEndTime); //! Write out the results for the bucket starting at \p bucketStartTime. void outputResults(core_t::TTime bucketStartTime); //! Write out interim results for the bucket starting at \p bucketStartTime. void outputInterimResults(core_t::TTime bucketStartTime); //! Helper function for outputResults. //! \p processingTime is the processing time of the bucket void writeOutResults(bool interim, model::CHierarchicalResults& results, core_t::TTime bucketTime, std::uint64_t processingTime); //! Reset buckets in the range specified by the control message. void resetBuckets(const std::string& controlMessage); //! Attempt to restore the detectors bool restoreState(core::CStateRestoreTraverser& traverser, core_t::TTime& completeToTime, std::size_t& numDetectors); //! Attempt to restore one detector from an already-created traverser. bool restoreSingleDetector(core::CStateRestoreTraverser& traverser); //! Restore the detector identified by \p key and \p partitionFieldValue //! from \p traverser. bool restoreDetectorState(const model::CSearchKey& key, const std::string& partitionFieldValue, core::CStateRestoreTraverser& traverser); //! Persist current state in the background bool backgroundPersistState(); //! This is the function that is called in a different thread to the //! main processing when background persistence is triggered. bool runBackgroundPersist(const TBackgroundPersistArgsPtr& args, core::CDataAdder& persister); //! This function is called from the persistence manager when foreground persistence is triggered bool runForegroundPersist(core::CDataAdder& persister); //! Persist the detectors to a stream. bool persistCopiedState(const std::string& description, const std::string& snapshotId, core_t::TTime snapshotTimestamp, core_t::TTime time, const TKeyCRefAnomalyDetectorPtrPrVec& detectors, const model::CResourceMonitor::SModelSizeStats& modelSizeStats, const model::CInterimBucketCorrector& interimBucketCorrector, const model::CHierarchicalResultsAggregator& aggregator, const std::string& normalizerState, core_t::TTime latestRecordTime, core_t::TTime lastResultsTime, core_t::TTime initialLastFinalisedBucketEndTime, core::CDataAdder& persister); //! Persist current state due to the periodic persistence being triggered. bool periodicPersistStateInBackground() override; bool periodicPersistStateInForeground() override; //! Persist state of the residual models only. //! This method is not intended to be called in production code. bool persistModelsState(const TKeyCRefAnomalyDetectorPtrPrVec& detectors, core::CDataAdder& persister, core_t::TTime timestamp); //! Acknowledge a flush request void acknowledgeFlush(const std::string& flushId); //! Advance time until \p time, if it can be parsed. //! //! This also calls outputBucketResultsUntil, so may generate results if //! a bucket boundary is crossed and updates time in *all* the detector //! models. void advanceTime(const std::string& time); //! Output any results new results which are available at \p time. void outputBucketResultsUntil(core_t::TTime time); //! Skip time to the bucket end of \p time, if it can be parsed. void skipTime(const std::string& time); //! Rolls time to \p endTime while skipping sampling the models for buckets //! within the gap. //! //! \param[in] endTime The end of the time interval to skip sampling. void skipSampling(core_t::TTime endTime); //! Roll time forward to \p time void timeNow(core_t::TTime time); //! Update configuration void updateConfig(const std::string& config); //! Generate interim results. void generateInterimResults(const std::string& controlMessage); //! Parses the time range in a control message assuming the time range follows after a //! single character code (e.g. starts with 'i10 20'). static bool parseTimeRangeInControlMessage(const std::string& controlMessage, core_t::TTime& start, core_t::TTime& end); //! Update equalizers if not interim and aggregate. void updateAggregatorAndAggregate(bool isInterim, model::CHierarchicalResults& results); //! Update quantiles if not interim and normalize. void updateNormalizerAndNormalizeResults(bool isInterim, model::CHierarchicalResults& results); //! Outputs results for the buckets that are within the specified range. //! The range includes the start but does not include the end. void outputResultsWithinRange(bool isInterim, core_t::TTime start, core_t::TTime end); //! Generate the model plot for the models of the specified detector in the //! specified time range. void generateModelPlot(core_t::TTime startTime, core_t::TTime endTime, const model::CAnomalyDetector& detector, TModelPlotDataVec& modelPlotData); //! Write the pre-generated model plot to the output stream of the user's //! choosing: either file or streamed to the API void writeOutModelPlot(const TModelPlotDataVec& modelPlotData); //! Write the annotations to the output stream. void writeOutAnnotations(const TAnnotationVec& annotations); //! Persist one detector to a stream. //! This method is static so that there is no danger of it accessing //! the member variables of an object. This makes it safer to call //! from within a persistence thread that's working off a cloned //! anomaly detector. static void persistIndividualDetector(const model::CAnomalyDetector& detector, core::CStatePersistInserter& inserter); //! Iterate over the models, refresh their memory status, and send a report //! to the API void refreshMemoryAndReport(); //! Update configuration void doForecast(const std::string& controlMessage); static TAnomalyDetectorPtr makeDetector(const model::CAnomalyDetectorModelConfig& modelConfig, model::CLimits& limits, const std::string& partitionFieldValue, core_t::TTime firstTime, const model::CAnomalyDetector::TModelFactoryCPtr& modelFactory); //! Populate detector keys from the anomaly job config. static void populateDetectorKeys(const CAnomalyJobConfig& jobConfig, TKeyVec& keys); //! Extract the field called \p fieldName from \p dataRowFields. static const std::string* fieldValue(const std::string& fieldName, const TStrStrUMap& dataRowFields); //! Extract the required fields from \p dataRowFields //! and add the new record to \p detector static void addRecord(const TAnomalyDetectorPtr& detector, core_t::TTime time, const TStrStrUMap& dataRowFields); //! Parses a control message requesting that model state be persisted. //! Extracts optional arguments to be used for persistence. static bool parsePersistControlMessageArgs(const std::string& controlMessageArgs, core_t::TTime& snapshotTimestamp, std::string& snapshotId, std::string& snapshotDescription); //! Perform foreground persistence if control message contains valid optional //! arguments else request a background persist void processPersistControlMessage(const std::string& controlMessageArgs); protected: //! Get all the detectors. void detectors(TAnomalyDetectorPtrVec& detectors) const; //! Get the detectors by partition const TKeyAnomalyDetectorPtrUMap& detectorPartitionMap() const; //! Get all sorted references to the detectors. void sortedDetectors(TKeyCRefAnomalyDetectorPtrPrVec& detectors) const; //! Get a reference to the detector for a given key const TAnomalyDetectorPtr& detectorForKey(bool isRestoring, core_t::TTime time, const model::CSearchKey& key, const std::string& partitionFieldValue, const model::CResourceMonitor& resourceMonitor); //! Prune all the models that exceed \p buckets in age //! A value of 0 for \buckets indicates that only 'obsolete' models will //! be pruned, i.e. those which are so old as to be effectively dead. void pruneAllModels(std::size_t buckets = 0) const; const model::CHierarchicalResultsNormalizer& normalizer() const; private: //! The job ID std::string m_JobId; //! Configurable limits model::CLimits& m_Limits; //! Stream used by the output writer core::CJsonOutputStreamWrapper& m_OutputStream; //! Responsible for performing forecasts CForecastRunner m_ForecastRunner; //! Object to which the output is passed CJsonOutputWriter m_JsonOutputWriter; //! Configuration settings for the analysis parsed from //! JSON configuration file. //! Note that this is a non-const reference as it needs to be capable of //! being modified by job updates (and those changes reflected wherever a //! reference is held). CAnomalyJobConfig& m_JobConfig; //! The model configuration model::CAnomalyDetectorModelConfig& m_ModelConfig; //! Keep count of how many records we've handled std::uint64_t m_NumRecordsHandled = 0; //! Detector keys. TKeyVec m_DetectorKeys; //! Map of objects to provide the inner workings TKeyAnomalyDetectorPtrUMap m_Detectors; //! The end time of the last bucket out of latency window we've seen core_t::TTime m_LastFinalisedBucketEndTime = 0; //! Optional function to be called when persistence is complete TPersistCompleteFunc m_PersistCompleteFunc; //! License restriction on the number of detectors allowed std::size_t m_MaxDetectors; //! Pointer to the persistence manager. May be nullptr if state persistence //! is not required, for example in unit tests. CPersistenceManager* m_PersistenceManager; //! If we haven't output quantiles for this long due to a big anomaly //! we'll output them to reflect decay. Non-positive values mean never. core_t::TTime m_MaxQuantileInterval; //! What was the wall clock time when we last persisted the //! normalizer? The normalizer is persisted for two reasons: //! either there was a significant change or more than a //! certain period of time has passed since last time it was persisted. core_t::TTime m_LastNormalizerPersistTime; //! Latest record time seen. core_t::TTime m_LatestRecordTime = 0; //! Last time we sent a finalised result to the API. core_t::TTime m_LastResultsTime = 0; //! When the model state was restored was it entirely successful. //! Extra information about any errors that may have occurred SRestoredStateDetail m_RestoredStateDetail; //! The hierarchical results aggregator. model::CHierarchicalResultsAggregator m_Aggregator; //! The hierarchical results normalizer. model::CHierarchicalResultsNormalizer m_Normalizer; //! Flag indicating whether or not time has been advanced. bool m_TimeAdvanced{false}; //! Flag indicating whether or not a flush control message should trigger a refresh of the datafeed bool m_RefreshRequired{true}; //! Introduced in version 8.6 //! The initial value of the end time of the last bucket //! out of latency window we've seen, i.e. this member records //! the first non-zero value of \p m_LastFinalisedBucketEndTime //! and then never changes. //! When restoring jobs that ran successfully for many buckets before //! being persisted by a version earlier than 8.6 this member will always //! have a value of 0, therefore it is crucial that this member is never //! assumed to be non-zero and should only be used for its intended purpose //! of aiding in detecting an incomplete initial bucket after state //! restoration. core_t::TTime m_InitialLastFinalisedBucketEndTime{0}; // Test case access friend struct CAnomalyJobTest::testConfigUpdate; friend struct CAnomalyJobTest::testParsePersistControlMessageArgs; friend struct CAnomalyJobTest::testOutputBucketResultsUntilGivenIncompleteInitialBucket; }; } } #endif // INCLUDED_ml_api_CAnomalyJob_h