include/api/CJsonOutputWriter.h (149 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#ifndef INCLUDED_ml_api_CJsonOutputWriter_h
#define INCLUDED_ml_api_CJsonOutputWriter_h
#include <core/CBoostJsonConcurrentLineWriter.h>
#include <core/CSmallVector.h>
#include <core/CoreTypes.h>
#include <model/CCategoryExamplesCollector.h>
#include <model/CHierarchicalResults.h>
#include <model/CResourceMonitor.h>
#include <api/CGlobalCategoryId.h>
#include <api/CHierarchicalResultsWriter.h>
#include <api/ImportExport.h>
#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
namespace ml {
namespace core {
class CJsonOutputStreamWrapper;
}
namespace model {
class CHierarchicalResultsNormalizer;
}
namespace core {
template<typename>
class CScopedBoostJsonPoolAllocator;
}
namespace api {
//! \brief
//! Write output data in JSON format
//!
//! DESCRIPTION:\n
//! Outputs the anomaly detector results.
//!
//! The fields written for each detector type are hardcoded. If new fields
//! are passed then this class needs updating.
//!
//! Use limitNumberRecords() to limit the number of records and influencers
//! written for each bucket. If set to a non-zero value only the top N
//! least probable records, the top N influencers and top N bucket influencers
//! are written. This useful in situations where a very
//! large number of field combinations are being analysed. Setting the limit
//! to zero implies all results are to be kept.
//!
//! IMPLEMENTATION DECISIONS:\n
//! The writer buffers results until endOutputBatch() is called. All results
//! for a given bucket must be passed to the writer between calls to
//! endOutputBatch(). Once endOutputBatch() has been called, no further
//! results can be added for any buckets for which results were previously
//! added.
//!
//! The simple count detector is not output. Instead, its "actual" value
//! is set as the bucket's eventCount field.
//!
//! Empty string fields are not written to the output.
//!
//! Memory for values added to the output documents is allocated from a pool (to
//! reduce allocation cost and memory fragmentation). This pool is cleared
//! between buckets to avoid excessive accumulation of memory over long periods.
//! It is crucial that the pool is only cleared when no documents reference
//! memory within it. This is achieved by only clearing the pool
//! (m_JsonPoolAllocator) when the m_BucketDataByTime vector is empty. Care
//! must be taken if the memory pool is used to allocate memory for long-lived
//! documents that are not stored within m_BucketDataByTime. (It might be
//! better to have a separate pool if this situation ever arises in the future.)
//!
//! Population anomalies consist of overall results and breakdown results.
//! There is an assumption that the overall result for a population anomaly
//! immediately follows the corresponding breakdown results. If this assumption
//! ever becomes invalid then the way this class nests breakdown results inside
//! overall results will need changing.
//!
//! Unusual scores and anomaly scores are generated for output bucket/record
//! using the latest quantiles. The idea is to reduce the processing load
//! by avoiding the need for the Java process to request re-normalisation of
//! every single result as soon as it's written to Elasticsearch. Quantiles
//! are only written to the output when they change a lot, when 2 hours of
//! wall-clock time elapses, or when the writer is finalised. (When the
//! Java process receives these updated quantiles it will request
//! re-normalisation of previous results using the normalize
//! process, so it's best that this doesn't happen too often.)
//!
class API_EXPORT CJsonOutputWriter {
public:
using TDocumentPtr = std::shared_ptr<json::object>;
using TDocumentWeakPtr = std::weak_ptr<json::object>;
using TDocumentWeakPtrVec = std::vector<TDocumentWeakPtr>;
using TDocumentWeakPtrVecItr = TDocumentWeakPtrVec::iterator;
using TDocumentWeakPtrVecCItr = TDocumentWeakPtrVec::const_iterator;
using TDocumentWeakPtrIntPr = std::pair<TDocumentWeakPtr, int>;
using TDocumentWeakPtrIntPrVec = std::vector<TDocumentWeakPtrIntPr>;
using TDocumentWeakPtrIntPrVecItr = TDocumentWeakPtrIntPrVec::iterator;
using TStrDocumentPtrVecMap = std::map<std::string, TDocumentWeakPtrVec>;
using TStrVec = std::vector<std::string>;
using TStr1Vec = core::CSmallVector<std::string, 1>;
using TTimeVec = std::vector<core_t::TTime>;
using TGlobalCategoryIdVec = std::vector<CGlobalCategoryId>;
using TDoubleVec = std::vector<double>;
using TDoubleDoublePr = std::pair<double, double>;
using TDoubleDoublePrVec = std::vector<TDoubleDoublePr>;
using TDoubleDoubleDoublePrPr = std::pair<double, TDoubleDoublePr>;
using TDoubleDoubleDoublePrPrVec = std::vector<TDoubleDoubleDoublePrPr>;
using TStringDoublePr = std::pair<std::string, double>;
using TStringDoublePrVec = std::vector<TStringDoublePr>;
using TValuePtr = std::shared_ptr<json::value>;
using TAnomalyScoreExplanation = CHierarchicalResultsWriter::TAnomalyScoreExplanation;
//! Structure to buffer up information about each bucket that we have
//! unwritten results for
struct SBucketData {
SBucketData();
//! The max normalized anomaly score of the bucket influencers
double s_MaxBucketInfluencerNormalizedAnomalyScore;
//! Count of input events for the bucket
std::size_t s_InputEventCount;
//! Count of result records in the bucket for which results are
//! being built up
std::size_t s_RecordCount;
//! The bucketspan of this bucket
core_t::TTime s_BucketSpan;
//! The result record documents to be written, in a vector keyed on
//! detector index
TDocumentWeakPtrIntPrVec s_DocumentsToWrite;
//! Bucket Influencer documents
TDocumentWeakPtrVec s_BucketInfluencerDocuments;
//! Influencer documents
TDocumentWeakPtrVec s_InfluencerDocuments;
// The highest probability of all the records stored
// in the s_DocumentsToWrite array. Used for filtering
// new records with a higher probability
double s_HighestProbability;
// Information explaining different impact factors of
// the anomaly score.
TAnomalyScoreExplanation s_AnomalyScoreExplanation;
// Used for filtering new influencers
// when the number to write is limited
double s_LowestInfluencerScore;
// Used for filtering new bucket influencers
// when the number to write is limited
double s_LowestBucketInfluencerScore;
//! scheduled event descriptions
TStr1Vec s_ScheduledEventDescriptions;
};
using TOptionalTime = std::optional<core_t::TTime>;
using TTimeBucketDataMap = std::map<core_t::TTime, SBucketData>;
using TTimeBucketDataMapItr = TTimeBucketDataMap::iterator;
using TTimeBucketDataMapCItr = TTimeBucketDataMap::const_iterator;
using TStrFSet = model::CCategoryExamplesCollector::TStrFSet;
using TStrFSetCItr = TStrFSet::const_iterator;
public:
//! Constructor that causes output to be written to the specified wrapped stream
CJsonOutputWriter(const std::string& jobId, core::CJsonOutputStreamWrapper& strmOut);
//! Destructor flushes the stream
~CJsonOutputWriter();
//! Access to job ID
const std::string& jobId() const;
//! Limit the output to the top count anomalous records and influencers.
//! Each detector will write no more than count records and influencers
//! per bucket (i.e. a max of N records, N influencers and N bucket
//! influencers).
//! The bucket time influencer does not add to this count but only
//! if it is added after all the other bucket influencers
void limitNumberRecords(std::size_t count);
//! A value of 0 indicates no limit has been set
std::size_t limitNumberRecords() const;
//! Close the JSON structures and flush output.
//! This method should only be called once and will have no affect
//! on subsequent invocations
void finalise();
//! Accept a result from the anomaly detector
//! Virtual for testing mocks
bool acceptResult(const CHierarchicalResultsWriter::TResults& results);
//! Accept the influencer
bool acceptInfluencer(core_t::TTime time,
const model::CHierarchicalResults::TNode& node,
bool isBucketInfluencer);
//! Creates a time bucket influencer.
//! If limitNumberRecords is set add this influencer after all other influencers
//! have been added otherwise it may be filtered out if its anomaly score is lower
//! than the others.
//! Only one per bucket is expected, this does not add to the influencer
//! count if limitNumberRecords is used
void acceptBucketTimeInfluencer(core_t::TTime time,
double probability,
double rawAnomalyScore,
double normalizedAnomalyScore);
//! This method must be called after all the results for a given bucket
//! are available. It triggers the writing of the results.
bool endOutputBatch(bool isInterim, std::uint64_t bucketProcessingTime);
//! Report the current levels of resource usage, as given to us
//! from the CResourceMonitor via a callback
void reportMemoryUsage(const model::CResourceMonitor::SModelSizeStats& modelSizeStats);
//! Return the number of bytes currently used to output JSON documents. .
std::size_t getOutputMemoryAllocatorUsage() const;
//! Write categorizer stats
void writeCategorizerStats(const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const model::SCategorizerStats& categorizerStats,
const TOptionalTime& timestamp);
//! Acknowledge a flush request by echoing back the flush ID and the "refreshRequired" flag
void acknowledgeFlush(const std::string& flushId,
core_t::TTime lastFinalizedBucketEnd,
bool refreshRequired = true);
//! Write a category definition
void writeCategoryDefinition(const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const CGlobalCategoryId& categoryId,
const std::string& terms,
const std::string& regex,
std::size_t maxMatchingFieldLength,
const TStrFSet& examples,
std::size_t numMatches,
const TGlobalCategoryIdVec& usurpedCategories);
//! Persist a normalizer by writing its state to the output
void persistNormalizer(const model::CHierarchicalResultsNormalizer& normalizer,
core_t::TTime& persistTime);
private:
template<typename>
friend class core::CScopedBoostJsonPoolAllocator;
// hooks for the CScopedBoostJsonPoolAllocator interface
//! use a new allocator for JSON output processing
//! \p allocatorName A unique identifier for the allocator
void pushAllocator(const std::string& allocatorName);
//! remove allocator from cache
void removeAllocator(const std::string& allocatorName);
//! revert to using the previous allocator for JSON output processing
void popAllocator();
private:
//! Write out all the JSON documents that have been built up for
//! a particular bucket
void writeBucket(bool isInterim,
core_t::TTime bucketTime,
SBucketData& bucketData,
std::uint64_t bucketProcessingTime);
//! Add the fields for a metric detector
void addMetricFields(const CHierarchicalResultsWriter::TResults& results,
TDocumentWeakPtr weakDoc);
//! Write the fields for a population detector
void addPopulationFields(const CHierarchicalResultsWriter::TResults& results,
TDocumentWeakPtr weakDoc);
//! Write the fields for a population detector cause
void addPopulationCauseFields(const CHierarchicalResultsWriter::TResults& results,
TDocumentWeakPtr weakDoc);
//! Write the fields for an event rate detector
void addEventRateFields(const CHierarchicalResultsWriter::TResults& results,
TDocumentWeakPtr weakDoc);
//! Add the influencer fields to the doc
void addInfluencerFields(bool isBucketInfluencer,
const model::CHierarchicalResults::TNode& node,
TDocumentWeakPtr weakDoc);
//! Write the influence results.
void addInfluences(const CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec& influenceResults,
TDocumentWeakPtr weakDoc);
//! Write anomaly score explanation object.
void writeAnomalyScoreExplanationObject(const CHierarchicalResultsWriter::TResults& results,
json::object& anomalyScoreExplanation);
private:
//! The job ID
std::string m_JobId;
//! JSON line writer
core::CBoostJsonConcurrentLineWriter m_Writer;
//! Time of last non-interim bucket written to output
core_t::TTime m_LastNonInterimBucketTime;
//! Has the output been finalised?
bool m_Finalised;
//! Max number of records to write for each bucket/detector
std::size_t m_RecordOutputLimit;
//! Vector for building up documents representing nested sub-results.
//! The documents in this vector will reference memory owned by
//! m_JsonPoolAllocator. (Hence this is declared after the memory pool
//! so that it's destroyed first when the destructor runs.)
TDocumentWeakPtrVec m_NestedDocs;
//! Bucket data waiting to be written. The map is keyed on bucket time.
//! The documents in this map will reference memory owned by
//! m_JsonPoolAllocator. (Hence this is declared after the memory pool
//! so that it's destroyed first when the destructor runs.)
TTimeBucketDataMap m_BucketDataByTime;
};
}
}
#endif // INCLUDED_ml_api_CJsonOutputWriter_h