include/api/CJsonOutputWriter.h (149 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #ifndef INCLUDED_ml_api_CJsonOutputWriter_h #define INCLUDED_ml_api_CJsonOutputWriter_h #include <core/CBoostJsonConcurrentLineWriter.h> #include <core/CSmallVector.h> #include <core/CoreTypes.h> #include <model/CCategoryExamplesCollector.h> #include <model/CHierarchicalResults.h> #include <model/CResourceMonitor.h> #include <api/CGlobalCategoryId.h> #include <api/CHierarchicalResultsWriter.h> #include <api/ImportExport.h> #include <cstdint> #include <map> #include <memory> #include <optional> #include <string> #include <utility> #include <vector> namespace ml { namespace core { class CJsonOutputStreamWrapper; } namespace model { class CHierarchicalResultsNormalizer; } namespace core { template<typename> class CScopedBoostJsonPoolAllocator; } namespace api { //! \brief //! Write output data in JSON format //! //! DESCRIPTION:\n //! Outputs the anomaly detector results. //! //! The fields written for each detector type are hardcoded. If new fields //! are passed then this class needs updating. //! //! Use limitNumberRecords() to limit the number of records and influencers //! written for each bucket. If set to a non-zero value only the top N //! least probable records, the top N influencers and top N bucket influencers //! are written. This useful in situations where a very //! large number of field combinations are being analysed. Setting the limit //! to zero implies all results are to be kept. //! //! IMPLEMENTATION DECISIONS:\n //! The writer buffers results until endOutputBatch() is called. All results //! for a given bucket must be passed to the writer between calls to //! endOutputBatch(). Once endOutputBatch() has been called, no further //! results can be added for any buckets for which results were previously //! added. //! //! The simple count detector is not output. Instead, its "actual" value //! is set as the bucket's eventCount field. //! //! Empty string fields are not written to the output. //! //! Memory for values added to the output documents is allocated from a pool (to //! reduce allocation cost and memory fragmentation). This pool is cleared //! between buckets to avoid excessive accumulation of memory over long periods. //! It is crucial that the pool is only cleared when no documents reference //! memory within it. This is achieved by only clearing the pool //! (m_JsonPoolAllocator) when the m_BucketDataByTime vector is empty. Care //! must be taken if the memory pool is used to allocate memory for long-lived //! documents that are not stored within m_BucketDataByTime. (It might be //! better to have a separate pool if this situation ever arises in the future.) //! //! Population anomalies consist of overall results and breakdown results. //! There is an assumption that the overall result for a population anomaly //! immediately follows the corresponding breakdown results. If this assumption //! ever becomes invalid then the way this class nests breakdown results inside //! overall results will need changing. //! //! Unusual scores and anomaly scores are generated for output bucket/record //! using the latest quantiles. The idea is to reduce the processing load //! by avoiding the need for the Java process to request re-normalisation of //! every single result as soon as it's written to Elasticsearch. Quantiles //! are only written to the output when they change a lot, when 2 hours of //! wall-clock time elapses, or when the writer is finalised. (When the //! Java process receives these updated quantiles it will request //! re-normalisation of previous results using the normalize //! process, so it's best that this doesn't happen too often.) //! class API_EXPORT CJsonOutputWriter { public: using TDocumentPtr = std::shared_ptr<json::object>; using TDocumentWeakPtr = std::weak_ptr<json::object>; using TDocumentWeakPtrVec = std::vector<TDocumentWeakPtr>; using TDocumentWeakPtrVecItr = TDocumentWeakPtrVec::iterator; using TDocumentWeakPtrVecCItr = TDocumentWeakPtrVec::const_iterator; using TDocumentWeakPtrIntPr = std::pair<TDocumentWeakPtr, int>; using TDocumentWeakPtrIntPrVec = std::vector<TDocumentWeakPtrIntPr>; using TDocumentWeakPtrIntPrVecItr = TDocumentWeakPtrIntPrVec::iterator; using TStrDocumentPtrVecMap = std::map<std::string, TDocumentWeakPtrVec>; using TStrVec = std::vector<std::string>; using TStr1Vec = core::CSmallVector<std::string, 1>; using TTimeVec = std::vector<core_t::TTime>; using TGlobalCategoryIdVec = std::vector<CGlobalCategoryId>; using TDoubleVec = std::vector<double>; using TDoubleDoublePr = std::pair<double, double>; using TDoubleDoublePrVec = std::vector<TDoubleDoublePr>; using TDoubleDoubleDoublePrPr = std::pair<double, TDoubleDoublePr>; using TDoubleDoubleDoublePrPrVec = std::vector<TDoubleDoubleDoublePrPr>; using TStringDoublePr = std::pair<std::string, double>; using TStringDoublePrVec = std::vector<TStringDoublePr>; using TValuePtr = std::shared_ptr<json::value>; using TAnomalyScoreExplanation = CHierarchicalResultsWriter::TAnomalyScoreExplanation; //! Structure to buffer up information about each bucket that we have //! unwritten results for struct SBucketData { SBucketData(); //! The max normalized anomaly score of the bucket influencers double s_MaxBucketInfluencerNormalizedAnomalyScore; //! Count of input events for the bucket std::size_t s_InputEventCount; //! Count of result records in the bucket for which results are //! being built up std::size_t s_RecordCount; //! The bucketspan of this bucket core_t::TTime s_BucketSpan; //! The result record documents to be written, in a vector keyed on //! detector index TDocumentWeakPtrIntPrVec s_DocumentsToWrite; //! Bucket Influencer documents TDocumentWeakPtrVec s_BucketInfluencerDocuments; //! Influencer documents TDocumentWeakPtrVec s_InfluencerDocuments; // The highest probability of all the records stored // in the s_DocumentsToWrite array. Used for filtering // new records with a higher probability double s_HighestProbability; // Information explaining different impact factors of // the anomaly score. TAnomalyScoreExplanation s_AnomalyScoreExplanation; // Used for filtering new influencers // when the number to write is limited double s_LowestInfluencerScore; // Used for filtering new bucket influencers // when the number to write is limited double s_LowestBucketInfluencerScore; //! scheduled event descriptions TStr1Vec s_ScheduledEventDescriptions; }; using TOptionalTime = std::optional<core_t::TTime>; using TTimeBucketDataMap = std::map<core_t::TTime, SBucketData>; using TTimeBucketDataMapItr = TTimeBucketDataMap::iterator; using TTimeBucketDataMapCItr = TTimeBucketDataMap::const_iterator; using TStrFSet = model::CCategoryExamplesCollector::TStrFSet; using TStrFSetCItr = TStrFSet::const_iterator; public: //! Constructor that causes output to be written to the specified wrapped stream CJsonOutputWriter(const std::string& jobId, core::CJsonOutputStreamWrapper& strmOut); //! Destructor flushes the stream ~CJsonOutputWriter(); //! Access to job ID const std::string& jobId() const; //! Limit the output to the top count anomalous records and influencers. //! Each detector will write no more than count records and influencers //! per bucket (i.e. a max of N records, N influencers and N bucket //! influencers). //! The bucket time influencer does not add to this count but only //! if it is added after all the other bucket influencers void limitNumberRecords(std::size_t count); //! A value of 0 indicates no limit has been set std::size_t limitNumberRecords() const; //! Close the JSON structures and flush output. //! This method should only be called once and will have no affect //! on subsequent invocations void finalise(); //! Accept a result from the anomaly detector //! Virtual for testing mocks bool acceptResult(const CHierarchicalResultsWriter::TResults& results); //! Accept the influencer bool acceptInfluencer(core_t::TTime time, const model::CHierarchicalResults::TNode& node, bool isBucketInfluencer); //! Creates a time bucket influencer. //! If limitNumberRecords is set add this influencer after all other influencers //! have been added otherwise it may be filtered out if its anomaly score is lower //! than the others. //! Only one per bucket is expected, this does not add to the influencer //! count if limitNumberRecords is used void acceptBucketTimeInfluencer(core_t::TTime time, double probability, double rawAnomalyScore, double normalizedAnomalyScore); //! This method must be called after all the results for a given bucket //! are available. It triggers the writing of the results. bool endOutputBatch(bool isInterim, std::uint64_t bucketProcessingTime); //! Report the current levels of resource usage, as given to us //! from the CResourceMonitor via a callback void reportMemoryUsage(const model::CResourceMonitor::SModelSizeStats& modelSizeStats); //! Return the number of bytes currently used to output JSON documents. . std::size_t getOutputMemoryAllocatorUsage() const; //! Write categorizer stats void writeCategorizerStats(const std::string& partitionFieldName, const std::string& partitionFieldValue, const model::SCategorizerStats& categorizerStats, const TOptionalTime& timestamp); //! Acknowledge a flush request by echoing back the flush ID and the "refreshRequired" flag void acknowledgeFlush(const std::string& flushId, core_t::TTime lastFinalizedBucketEnd, bool refreshRequired = true); //! Write a category definition void writeCategoryDefinition(const std::string& partitionFieldName, const std::string& partitionFieldValue, const CGlobalCategoryId& categoryId, const std::string& terms, const std::string& regex, std::size_t maxMatchingFieldLength, const TStrFSet& examples, std::size_t numMatches, const TGlobalCategoryIdVec& usurpedCategories); //! Persist a normalizer by writing its state to the output void persistNormalizer(const model::CHierarchicalResultsNormalizer& normalizer, core_t::TTime& persistTime); private: template<typename> friend class core::CScopedBoostJsonPoolAllocator; // hooks for the CScopedBoostJsonPoolAllocator interface //! use a new allocator for JSON output processing //! \p allocatorName A unique identifier for the allocator void pushAllocator(const std::string& allocatorName); //! remove allocator from cache void removeAllocator(const std::string& allocatorName); //! revert to using the previous allocator for JSON output processing void popAllocator(); private: //! Write out all the JSON documents that have been built up for //! a particular bucket void writeBucket(bool isInterim, core_t::TTime bucketTime, SBucketData& bucketData, std::uint64_t bucketProcessingTime); //! Add the fields for a metric detector void addMetricFields(const CHierarchicalResultsWriter::TResults& results, TDocumentWeakPtr weakDoc); //! Write the fields for a population detector void addPopulationFields(const CHierarchicalResultsWriter::TResults& results, TDocumentWeakPtr weakDoc); //! Write the fields for a population detector cause void addPopulationCauseFields(const CHierarchicalResultsWriter::TResults& results, TDocumentWeakPtr weakDoc); //! Write the fields for an event rate detector void addEventRateFields(const CHierarchicalResultsWriter::TResults& results, TDocumentWeakPtr weakDoc); //! Add the influencer fields to the doc void addInfluencerFields(bool isBucketInfluencer, const model::CHierarchicalResults::TNode& node, TDocumentWeakPtr weakDoc); //! Write the influence results. void addInfluences(const CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec& influenceResults, TDocumentWeakPtr weakDoc); //! Write anomaly score explanation object. void writeAnomalyScoreExplanationObject(const CHierarchicalResultsWriter::TResults& results, json::object& anomalyScoreExplanation); private: //! The job ID std::string m_JobId; //! JSON line writer core::CBoostJsonConcurrentLineWriter m_Writer; //! Time of last non-interim bucket written to output core_t::TTime m_LastNonInterimBucketTime; //! Has the output been finalised? bool m_Finalised; //! Max number of records to write for each bucket/detector std::size_t m_RecordOutputLimit; //! Vector for building up documents representing nested sub-results. //! The documents in this vector will reference memory owned by //! m_JsonPoolAllocator. (Hence this is declared after the memory pool //! so that it's destroyed first when the destructor runs.) TDocumentWeakPtrVec m_NestedDocs; //! Bucket data waiting to be written. The map is keyed on bucket time. //! The documents in this map will reference memory owned by //! m_JsonPoolAllocator. (Hence this is declared after the memory pool //! so that it's destroyed first when the destructor runs.) TTimeBucketDataMap m_BucketDataByTime; }; } } #endif // INCLUDED_ml_api_CJsonOutputWriter_h