lib/api/CJsonOutputWriter.cc (835 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <api/CJsonOutputWriter.h>
#include <core/CTimeUtils.h>
#include <model/CHierarchicalResultsNormalizer.h>
#include <api/CAnomalyJobConfig.h>
#include <api/CModelSizeStatsJsonWriter.h>
#include <api/CModelSnapshotJsonWriter.h>
#include <boost/unordered_map.hpp>
#include <algorithm>
#include <sstream>
namespace ml {
namespace api {
namespace {
// JSON field names
const std::string JOB_ID("job_id");
const std::string TIMESTAMP("timestamp");
const std::string BUCKET("bucket");
const std::string DETECTOR_INDEX("detector_index");
const std::string RECORDS("records");
const std::string EVENT_COUNT("event_count");
const std::string IS_INTERIM("is_interim");
const std::string PROBABILITY("probability");
const std::string MULTI_BUCKET_IMPACT("multi_bucket_impact");
const std::string RAW_ANOMALY_SCORE("raw_anomaly_score");
const std::string ANOMALY_SCORE("anomaly_score");
const std::string RECORD_SCORE("record_score");
const std::string INITIAL_RECORD_SCORE("initial_record_score");
const std::string INFLUENCER_SCORE("influencer_score");
const std::string INITIAL_INFLUENCER_SCORE("initial_influencer_score");
const std::string FIELD_NAME("field_name");
const std::string BY_FIELD_NAME("by_field_name");
const std::string BY_FIELD_VALUE("by_field_value");
const std::string CORRELATED_BY_FIELD_VALUE("correlated_by_field_value");
const std::string TYPICAL("typical");
const std::string ACTUAL("actual");
const std::string CAUSES("causes");
const std::string FUNCTION("function");
const std::string FUNCTION_DESCRIPTION("function_description");
const std::string OVER_FIELD_NAME("over_field_name");
const std::string OVER_FIELD_VALUE("over_field_value");
const std::string PARTITION_FIELD_NAME("partition_field_name");
const std::string PARTITION_FIELD_VALUE("partition_field_value");
const std::string INITIAL_SCORE("initial_anomaly_score");
const std::string INFLUENCER_FIELD_NAME("influencer_field_name");
const std::string INFLUENCER_FIELD_VALUE("influencer_field_value");
const std::string INFLUENCER_FIELD_VALUES("influencer_field_values");
const std::string BUCKET_INFLUENCERS("bucket_influencers");
const std::string INFLUENCERS("influencers");
const std::string FLUSH("flush");
const std::string ID("id");
const std::string LAST_FINALIZED_BUCKET_END("last_finalized_bucket_end");
const std::string REFRESH_REQUIRED("refresh_required");
const std::string CATEGORY_ID("category_id");
const std::string CATEGORY_DEFINITION("category_definition");
const std::string TERMS("terms");
const std::string REGEX("regex");
const std::string MAX_MATCHING_LENGTH("max_matching_length");
const std::string EXAMPLES("examples");
const std::string NUM_MATCHES("num_matches");
const std::string PREFERRED_TO_CATEGORIES("preferred_to_categories");
const std::string BUCKET_SPAN("bucket_span");
const std::string PROCESSING_TIME("processing_time_ms");
const std::string TIME_INFLUENCER("bucket_time");
const std::string SCHEDULED_EVENTS("scheduled_events");
const std::string QUANTILES("quantiles");
const std::string GEO_RESULTS("geo_results");
const std::string ACTUAL_POINT("actual_point");
const std::string TYPICAL_POINT("typical_point");
const std::string ANOMALY_SCORE_EXPLANATION("anomaly_score_explanation");
const std::string ANOMALY_TYPE("anomaly_type");
const std::string ANOMALY_TYPE_DIP("dip");
const std::string ANOMALY_TYPE_SPIKE("spike");
const std::string ANOMALY_LENGTH("anomaly_length");
const std::string SINGLE_BUCKET_IMPACT("single_bucket_impact");
const std::string ANOMALY_CHARACTERISTICS_IMPACT("anomaly_characteristics_impact");
const std::string LOWER_CONFIDENCE_BOUND("lower_confidence_bound");
const std::string TYPICAL_VALUE("typical_value");
const std::string UPPER_CONFIDENCE_BOUND("upper_confidence_bound");
const std::string HIGH_VARIANCE_PENALTY("high_variance_penalty");
const std::string INCOMPLETE_BUCKET_PENALTY("incomplete_bucket_penalty");
const std::string MULTIMODAL_DISTRIBUTION("multimodal_distribution");
const std::string BY_FIELD_FIRST_OCCURRENCE("by_field_first_occurrence");
const std::string BY_FIELD_RELATIVE_RARITY("by_field_relative_rarity");
//! Get a numeric field from a JSON document.
//! Assumes the document contains the field.
//! The caller is responsible for ensuring this, and a
//! program crash is likely if this requirement is not met.
double doubleFromDocument(const CJsonOutputWriter::TDocumentWeakPtr& weakDoc,
const std::string& field) {
CJsonOutputWriter::TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
return 0.0;
}
return (*docPtr)[field].to_number<double>();
}
//! Sort json documents by the probability lowest to highest
class CProbabilityLess {
public:
bool operator()(const CJsonOutputWriter::TDocumentWeakPtrIntPr& lhs,
const CJsonOutputWriter::TDocumentWeakPtrIntPr& rhs) const {
return doubleFromDocument(lhs.first, PROBABILITY) <
doubleFromDocument(rhs.first, PROBABILITY);
}
};
const CProbabilityLess PROBABILITY_LESS = CProbabilityLess();
//! Sort json documents by detector name first then probability lowest to highest
class CDetectorThenProbabilityLess {
public:
bool operator()(const CJsonOutputWriter::TDocumentWeakPtrIntPr& lhs,
const CJsonOutputWriter::TDocumentWeakPtrIntPr& rhs) const {
if (lhs.second == rhs.second) {
return doubleFromDocument(lhs.first, PROBABILITY) <
doubleFromDocument(rhs.first, PROBABILITY);
}
return lhs.second < rhs.second;
}
};
const CDetectorThenProbabilityLess DETECTOR_PROBABILITY_LESS = CDetectorThenProbabilityLess();
//! Sort influences from highes to lowest
class CInfluencesLess {
public:
bool operator()(const std::pair<const char*, double>& lhs,
const std::pair<const char*, double>& rhs) const {
return lhs.second > rhs.second;
}
};
const CInfluencesLess INFLUENCE_LESS = CInfluencesLess();
//! Sort influencer from highest to lowest by score
class CInfluencerGreater {
public:
CInfluencerGreater(const std::string& field) : m_Field(field) {}
bool operator()(const CJsonOutputWriter::TDocumentWeakPtr& lhs,
const CJsonOutputWriter::TDocumentWeakPtr& rhs) const {
return doubleFromDocument(lhs, m_Field) > doubleFromDocument(rhs, m_Field);
}
private:
const std::string& m_Field;
};
const CInfluencerGreater INFLUENCER_GREATER = CInfluencerGreater(INITIAL_INFLUENCER_SCORE);
const CInfluencerGreater BUCKET_INFLUENCER_GREATER = CInfluencerGreater(INITIAL_SCORE);
}
CJsonOutputWriter::CJsonOutputWriter(const std::string& jobId,
core::CJsonOutputStreamWrapper& strmOut)
: m_JobId(jobId), m_Writer(strmOut), m_LastNonInterimBucketTime(0),
m_Finalised(false), m_RecordOutputLimit(0) {
// Don't write any output in the constructor because, the way things work at
// the moment, the output stream might be redirected after construction
}
CJsonOutputWriter::~CJsonOutputWriter() {
finalise();
}
const std::string& CJsonOutputWriter::jobId() const {
return m_JobId;
}
void CJsonOutputWriter::finalise() {
if (m_Finalised) {
return;
}
// Flush the output This ensures that any buffers are written out
// Note: This is still asynchronous.
m_Writer.flush();
m_Finalised = true;
}
bool CJsonOutputWriter::acceptResult(const CHierarchicalResultsWriter::TResults& results) {
SBucketData& bucketData = m_BucketDataByTime[results.s_BucketStartTime];
if (results.s_ResultType == CHierarchicalResultsWriter::E_SimpleCountResult) {
if (!results.s_CurrentRate) {
LOG_ERROR(<< "Simple count detector has no current rate");
return false;
}
bucketData.s_InputEventCount = *results.s_CurrentRate;
bucketData.s_BucketSpan = results.s_BucketSpan;
bucketData.s_ScheduledEventDescriptions = results.s_ScheduledEventDescriptions;
return true;
}
std::weak_ptr<json::object> newDoc;
if (!results.s_IsOverallResult) {
newDoc = m_Writer.makeStorableDoc();
this->addPopulationCauseFields(results, newDoc);
m_NestedDocs.push_back(newDoc);
return true;
}
++bucketData.s_RecordCount;
TDocumentWeakPtrIntPrVec& detectorDocumentsToWrite = bucketData.s_DocumentsToWrite;
bool makeHeap(false);
// If a max number of records to output has not been set or we haven't
// reached that limit yet just append the new document to the array
if (m_RecordOutputLimit == 0 || bucketData.s_RecordCount <= m_RecordOutputLimit) {
newDoc = m_Writer.makeStorableDoc();
detectorDocumentsToWrite.push_back(TDocumentWeakPtrIntPr(newDoc, results.s_Identifier));
// the document array is now full, make a max heap
makeHeap = bucketData.s_RecordCount == m_RecordOutputLimit;
} else {
// Have reached the limit of records to write so compare the new doc
// to the highest probability anomaly doc and replace if more anomalous
if (results.s_Probability >= bucketData.s_HighestProbability) {
// Discard any associated nested docs
m_NestedDocs.clear();
return true;
}
newDoc = m_Writer.makeStorableDoc();
// remove the highest prob doc and insert new one
std::pop_heap(detectorDocumentsToWrite.begin(),
detectorDocumentsToWrite.end(), PROBABILITY_LESS);
detectorDocumentsToWrite.pop_back();
detectorDocumentsToWrite.push_back(TDocumentWeakPtrIntPr(newDoc, results.s_Identifier));
makeHeap = true;
}
// The check for population results must come first because some population
// results are also metrics
if (results.s_ResultType == CHierarchicalResultsWriter::E_PopulationResult) {
this->addPopulationFields(results, newDoc);
} else if (results.s_IsMetric) {
this->addMetricFields(results, newDoc);
} else {
this->addEventRateFields(results, newDoc);
}
this->addInfluences(results.s_Influences, newDoc);
if (makeHeap) {
std::make_heap(detectorDocumentsToWrite.begin(),
detectorDocumentsToWrite.end(), PROBABILITY_LESS);
bucketData.s_HighestProbability =
doubleFromDocument(detectorDocumentsToWrite.front().first, PROBABILITY);
}
return true;
}
bool CJsonOutputWriter::acceptInfluencer(core_t::TTime time,
const model::CHierarchicalResults::TNode& node,
bool isBucketInfluencer) {
TDocumentWeakPtr newDoc = m_Writer.makeStorableDoc();
SBucketData& bucketData = m_BucketDataByTime[time];
TDocumentWeakPtrVec& documents = (isBucketInfluencer)
? bucketData.s_BucketInfluencerDocuments
: bucketData.s_InfluencerDocuments;
bool isLimitedWrite(m_RecordOutputLimit > 0);
if (isLimitedWrite && documents.size() == m_RecordOutputLimit) {
double& lowestScore = (isBucketInfluencer)
? bucketData.s_LowestBucketInfluencerScore
: bucketData.s_LowestInfluencerScore;
if (node.s_NormalizedAnomalyScore < lowestScore) {
// Don't write this influencer
return true;
}
// need to remove the lowest score record
documents.pop_back();
}
this->addInfluencerFields(isBucketInfluencer, node, newDoc);
documents.push_back(newDoc);
bool sortVectorAfterWritingDoc = isLimitedWrite && documents.size() >= m_RecordOutputLimit;
if (sortVectorAfterWritingDoc) {
std::sort(documents.begin(), documents.end(),
isBucketInfluencer ? BUCKET_INFLUENCER_GREATER : INFLUENCER_GREATER);
}
if (isBucketInfluencer) {
bucketData.s_MaxBucketInfluencerNormalizedAnomalyScore =
std::max(bucketData.s_MaxBucketInfluencerNormalizedAnomalyScore,
node.s_NormalizedAnomalyScore);
bucketData.s_LowestBucketInfluencerScore =
std::min(bucketData.s_LowestBucketInfluencerScore,
doubleFromDocument(documents.back(), INITIAL_SCORE));
} else {
bucketData.s_LowestInfluencerScore =
std::min(bucketData.s_LowestInfluencerScore,
doubleFromDocument(documents.back(), INITIAL_INFLUENCER_SCORE));
}
return true;
}
void CJsonOutputWriter::acceptBucketTimeInfluencer(core_t::TTime time,
double probability,
double rawAnomalyScore,
double normalizedAnomalyScore) {
SBucketData& bucketData = m_BucketDataByTime[time];
if (bucketData.s_RecordCount == 0) {
return;
}
TDocumentWeakPtr doc = m_Writer.makeStorableDoc();
TDocumentPtr newDoc = doc.lock();
if (!newDoc) {
LOG_ERROR(<< "Failed to create new JSON document");
return;
}
m_Writer.addStringFieldCopyToObj(INFLUENCER_FIELD_NAME, TIME_INFLUENCER, *newDoc);
m_Writer.addDoubleFieldToObj(PROBABILITY, probability, *newDoc);
m_Writer.addDoubleFieldToObj(RAW_ANOMALY_SCORE, rawAnomalyScore, *newDoc);
m_Writer.addDoubleFieldToObj(INITIAL_SCORE, normalizedAnomalyScore, *newDoc);
m_Writer.addDoubleFieldToObj(ANOMALY_SCORE, normalizedAnomalyScore, *newDoc);
bucketData.s_MaxBucketInfluencerNormalizedAnomalyScore = std::max(
bucketData.s_MaxBucketInfluencerNormalizedAnomalyScore, normalizedAnomalyScore);
bucketData.s_BucketInfluencerDocuments.push_back(doc);
}
bool CJsonOutputWriter::endOutputBatch(bool isInterim, std::uint64_t bucketProcessingTime) {
for (TTimeBucketDataMapItr iter = m_BucketDataByTime.begin();
iter != m_BucketDataByTime.end(); ++iter) {
this->writeBucket(isInterim, iter->first, iter->second, bucketProcessingTime);
if (!isInterim) {
m_LastNonInterimBucketTime = iter->first;
}
}
// After writing the buckets clear all the bucket data so that we don't
// accumulate memory.
m_BucketDataByTime.clear();
m_NestedDocs.clear();
return true;
}
void CJsonOutputWriter::writeBucket(bool isInterim,
core_t::TTime bucketTime,
SBucketData& bucketData,
std::uint64_t bucketProcessingTime) {
// Write records
if (!bucketData.s_DocumentsToWrite.empty()) {
// Sort the results so they are grouped by detector and
// ordered by probability
std::sort(bucketData.s_DocumentsToWrite.begin(),
bucketData.s_DocumentsToWrite.end(), DETECTOR_PROBABILITY_LESS);
m_Writer.onObjectBegin();
m_Writer.onKey(RECORDS);
m_Writer.onArrayBegin();
// Iterate over the different detectors that we have results for
for (TDocumentWeakPtrIntPrVecItr detectorIter =
bucketData.s_DocumentsToWrite.begin();
detectorIter != bucketData.s_DocumentsToWrite.end(); ++detectorIter) {
// Write the document, adding some extra fields as we go
int detectorIndex = detectorIter->second;
TDocumentWeakPtr weakDoc = detectorIter->first;
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
continue;
}
m_Writer.addIntFieldToObj(DETECTOR_INDEX, detectorIndex, *docPtr);
m_Writer.addIntFieldToObj(BUCKET_SPAN, bucketData.s_BucketSpan, *docPtr);
m_Writer.addStringFieldCopyToObj(JOB_ID, m_JobId, *docPtr);
m_Writer.addTimeFieldToObj(TIMESTAMP, bucketTime, *docPtr);
if (isInterim) {
m_Writer.addBoolFieldToObj(IS_INTERIM, isInterim, *docPtr);
}
m_Writer.write(*docPtr);
}
m_Writer.onArrayEnd();
m_Writer.onObjectEnd();
}
// Write influencers
if (!bucketData.s_InfluencerDocuments.empty()) {
m_Writer.onObjectBegin();
m_Writer.onKey(INFLUENCERS);
m_Writer.onArrayBegin();
for (TDocumentWeakPtrVecItr influencerIter =
bucketData.s_InfluencerDocuments.begin();
influencerIter != bucketData.s_InfluencerDocuments.end(); ++influencerIter) {
TDocumentWeakPtr weakDoc = *influencerIter;
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
continue;
}
m_Writer.addStringFieldCopyToObj(JOB_ID, m_JobId, *docPtr);
m_Writer.addTimeFieldToObj(TIMESTAMP, bucketTime, *docPtr);
if (isInterim) {
m_Writer.addBoolFieldToObj(IS_INTERIM, isInterim, *docPtr);
}
m_Writer.addIntFieldToObj(BUCKET_SPAN, bucketData.s_BucketSpan, *docPtr);
m_Writer.write(*docPtr);
}
m_Writer.onArrayEnd();
m_Writer.onObjectEnd();
}
// Write bucket at the end, as some of its values need to iterate over records, etc.
m_Writer.onObjectBegin();
m_Writer.onKey(BUCKET);
m_Writer.onObjectBegin();
m_Writer.onKey(JOB_ID);
m_Writer.onString(m_JobId);
m_Writer.onKey(TIMESTAMP);
m_Writer.onTime(bucketTime);
m_Writer.onKey(ANOMALY_SCORE);
m_Writer.onDouble(bucketData.s_MaxBucketInfluencerNormalizedAnomalyScore);
m_Writer.onKey(INITIAL_SCORE);
m_Writer.onDouble(bucketData.s_MaxBucketInfluencerNormalizedAnomalyScore);
m_Writer.onKey(EVENT_COUNT);
m_Writer.onUint64(bucketData.s_InputEventCount);
if (isInterim) {
m_Writer.onKey(IS_INTERIM);
m_Writer.onBool(isInterim);
}
m_Writer.onKey(BUCKET_SPAN);
m_Writer.onInt64(bucketData.s_BucketSpan);
if (!bucketData.s_BucketInfluencerDocuments.empty()) {
// Write the array of influencers
m_Writer.onKey(BUCKET_INFLUENCERS);
m_Writer.onArrayBegin();
for (TDocumentWeakPtrVecItr influencerIter =
bucketData.s_BucketInfluencerDocuments.begin();
influencerIter != bucketData.s_BucketInfluencerDocuments.end();
++influencerIter) {
TDocumentWeakPtr weakDoc = *influencerIter;
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
continue;
}
m_Writer.addStringFieldCopyToObj(JOB_ID, m_JobId, *docPtr);
m_Writer.addTimeFieldToObj(TIMESTAMP, bucketTime, *docPtr);
m_Writer.addIntFieldToObj(BUCKET_SPAN, bucketData.s_BucketSpan, *docPtr);
if (isInterim) {
m_Writer.addBoolFieldToObj(IS_INTERIM, isInterim, *docPtr);
}
m_Writer.write(*docPtr);
}
m_Writer.onArrayEnd();
}
m_Writer.onKey(PROCESSING_TIME);
m_Writer.onUint64(bucketProcessingTime);
if (bucketData.s_ScheduledEventDescriptions.empty() == false) {
m_Writer.onKey(SCHEDULED_EVENTS);
m_Writer.onArrayBegin();
for (const auto& it : bucketData.s_ScheduledEventDescriptions) {
m_Writer.onString(it);
}
m_Writer.onArrayEnd();
}
m_Writer.onObjectEnd();
m_Writer.onObjectEnd();
}
void CJsonOutputWriter::addMetricFields(const CHierarchicalResultsWriter::TResults& results,
TDocumentWeakPtr weakDoc) {
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
return;
}
// record_score, probability, fieldName, byFieldName, byFieldValue, partitionFieldName,
// partitionFieldValue, function, typical, actual. influences?
m_Writer.addDoubleFieldToObj(INITIAL_RECORD_SCORE,
results.s_NormalizedAnomalyScore, *docPtr);
m_Writer.addDoubleFieldToObj(RECORD_SCORE, results.s_NormalizedAnomalyScore, *docPtr);
m_Writer.addDoubleFieldToObj(PROBABILITY, results.s_Probability, *docPtr);
json::object anomalyScoreExplanation = m_Writer.makeObject();
this->writeAnomalyScoreExplanationObject(results, anomalyScoreExplanation);
m_Writer.addMember(ANOMALY_SCORE_EXPLANATION, anomalyScoreExplanation, *docPtr);
m_Writer.addDoubleFieldToObj(MULTI_BUCKET_IMPACT, results.s_MultiBucketImpact, *docPtr);
m_Writer.addStringFieldCopyToObj(FIELD_NAME, results.s_MetricValueField, *docPtr);
if (!results.s_ByFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(BY_FIELD_NAME, results.s_ByFieldName, *docPtr);
// If name is present then force output of value too, even when empty
m_Writer.addStringFieldCopyToObj(BY_FIELD_VALUE, results.s_ByFieldValue,
*docPtr, true);
// But allow correlatedByFieldValue to be unset if blank
m_Writer.addStringFieldCopyToObj(CORRELATED_BY_FIELD_VALUE,
results.s_CorrelatedByFieldValue, *docPtr);
}
if (!results.s_PartitionFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(PARTITION_FIELD_NAME,
results.s_PartitionFieldName, *docPtr);
// If name is present then force output of value too, even when empty
m_Writer.addStringFieldCopyToObj(
PARTITION_FIELD_VALUE, results.s_PartitionFieldValue, *docPtr, true);
}
m_Writer.addStringFieldCopyToObj(FUNCTION, results.s_FunctionName, *docPtr);
m_Writer.addStringFieldCopyToObj(FUNCTION_DESCRIPTION,
results.s_FunctionDescription, *docPtr);
m_Writer.addDoubleArrayFieldToObj(TYPICAL, results.s_BaselineMean, *docPtr);
m_Writer.addDoubleArrayFieldToObj(ACTUAL, results.s_CurrentMean, *docPtr);
if (results.s_FunctionName ==
CAnomalyJobConfig::CAnalysisConfig::CDetectorConfig::FUNCTION_LAT_LONG) {
json::object geoResults = m_Writer.makeObject();
auto geoPointToString = [](const auto& point) -> std::string {
std::ostringstream result;
// We don't want scientific notation and geo points only have precision up to 12 digits
result << std::fixed << std::setprecision(12) << point[0] << "," << point[1];
return result.str();
};
if (results.s_BaselineMean.size() == 2) {
m_Writer.addStringFieldCopyToObj(
TYPICAL_POINT, geoPointToString(results.s_BaselineMean), geoResults);
}
if (results.s_CurrentMean.size() == 2) {
m_Writer.addStringFieldCopyToObj(
ACTUAL_POINT, geoPointToString(results.s_CurrentMean), geoResults);
}
m_Writer.addMember(GEO_RESULTS, geoResults, *docPtr);
}
}
void CJsonOutputWriter::addPopulationFields(const CHierarchicalResultsWriter::TResults& results,
TDocumentWeakPtr weakDoc) {
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
return;
}
// record_score, probability, fieldName, byFieldName,
// overFieldName, overFieldValue, partitionFieldName, partitionFieldValue,
// function, causes, influences?
m_Writer.addDoubleFieldToObj(INITIAL_RECORD_SCORE,
results.s_NormalizedAnomalyScore, *docPtr);
m_Writer.addDoubleFieldToObj(RECORD_SCORE, results.s_NormalizedAnomalyScore, *docPtr);
m_Writer.addDoubleFieldToObj(PROBABILITY, results.s_Probability, *docPtr);
m_Writer.addStringFieldCopyToObj(FIELD_NAME, results.s_MetricValueField, *docPtr);
// There are no by field values at this level for population
// results - they're in the "causes" object
m_Writer.addStringFieldCopyToObj(BY_FIELD_NAME, results.s_ByFieldName, *docPtr);
if (!results.s_OverFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(OVER_FIELD_NAME, results.s_OverFieldName, *docPtr);
// If name is present then force output of value too, even when empty
m_Writer.addStringFieldCopyToObj(OVER_FIELD_VALUE,
results.s_OverFieldValue, *docPtr, true);
}
if (!results.s_PartitionFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(PARTITION_FIELD_NAME,
results.s_PartitionFieldName, *docPtr);
// If name is present then force output of value too, even when empty
m_Writer.addStringFieldCopyToObj(
PARTITION_FIELD_VALUE, results.s_PartitionFieldValue, *docPtr, true);
}
m_Writer.addStringFieldCopyToObj(FUNCTION, results.s_FunctionName, *docPtr);
m_Writer.addStringFieldCopyToObj(FUNCTION_DESCRIPTION,
results.s_FunctionDescription, *docPtr);
// Add nested causes
if (m_NestedDocs.size() > 0) {
json::value causeArray = m_Writer.makeArray(m_NestedDocs.size());
for (std::size_t index = 0; index < m_NestedDocs.size(); ++index) {
TDocumentWeakPtr nwDocPtr = m_NestedDocs[index];
TDocumentPtr nDocPtr = nwDocPtr.lock();
if (!nDocPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
continue;
}
json::object& docAsValue = *nDocPtr;
m_Writer.pushBack(docAsValue, causeArray);
}
m_Writer.addMember(CAUSES, causeArray, *docPtr);
m_NestedDocs.clear();
} else {
LOG_WARN(<< "Expected some causes for a population anomaly but got none");
}
}
void CJsonOutputWriter::addPopulationCauseFields(const CHierarchicalResultsWriter::TResults& results,
TDocumentWeakPtr weakDoc) {
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
return;
}
// probability, fieldName, byFieldName, byFieldValue,
// overFieldName, overFieldValue, partitionFieldName, partitionFieldValue,
// function, typical, actual, influences
m_Writer.addDoubleFieldToObj(PROBABILITY, results.s_Probability, *docPtr);
m_Writer.addStringFieldCopyToObj(FIELD_NAME, results.s_MetricValueField, *docPtr);
if (!results.s_ByFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(BY_FIELD_NAME, results.s_ByFieldName, *docPtr);
// If name is present then force output of value too, even when empty
m_Writer.addStringFieldCopyToObj(BY_FIELD_VALUE, results.s_ByFieldValue,
*docPtr, true);
// But allow correlatedByFieldValue to be unset if blank
m_Writer.addStringFieldCopyToObj(CORRELATED_BY_FIELD_VALUE,
results.s_CorrelatedByFieldValue, *docPtr);
}
if (!results.s_OverFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(OVER_FIELD_NAME, results.s_OverFieldName, *docPtr);
// If name is present then force output of value too, even when empty
m_Writer.addStringFieldCopyToObj(OVER_FIELD_VALUE,
results.s_OverFieldValue, *docPtr, true);
}
if (!results.s_PartitionFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(PARTITION_FIELD_NAME,
results.s_PartitionFieldName, *docPtr);
// If name is present then force output of value too, even when empty
m_Writer.addStringFieldCopyToObj(
PARTITION_FIELD_VALUE, results.s_PartitionFieldValue, *docPtr, true);
}
m_Writer.addStringFieldCopyToObj(FUNCTION, results.s_FunctionName, *docPtr);
m_Writer.addStringFieldCopyToObj(FUNCTION_DESCRIPTION,
results.s_FunctionDescription, *docPtr);
m_Writer.addDoubleArrayFieldToObj(TYPICAL, results.s_PopulationAverage, *docPtr);
m_Writer.addDoubleArrayFieldToObj(ACTUAL, results.s_FunctionValue, *docPtr);
if (results.s_FunctionName ==
CAnomalyJobConfig::CAnalysisConfig::CDetectorConfig::FUNCTION_LAT_LONG) {
json::object geoResults = m_Writer.makeObject();
auto geoPointToString = [](const auto& point) -> std::string {
std::ostringstream result;
// We don't want scientific notation and geo points only have precision up to 12 digits
result << std::fixed << std::setprecision(12) << point[0] << "," << point[1];
return result.str();
};
if (results.s_BaselineMean.size() == 2) {
m_Writer.addStringFieldCopyToObj(
TYPICAL_POINT, geoPointToString(results.s_PopulationAverage), geoResults);
}
if (results.s_FunctionValue.size() == 2) {
m_Writer.addStringFieldCopyToObj(
ACTUAL_POINT, geoPointToString(results.s_FunctionValue), geoResults);
}
m_Writer.addMember(GEO_RESULTS, geoResults, *docPtr);
}
}
void CJsonOutputWriter::addInfluences(const CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec& influenceResults,
TDocumentWeakPtr weakDoc) {
if (influenceResults.empty()) {
return;
}
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
return;
}
//! This function takes the raw c_str pointers of the string objects in
//! influenceResults. These strings must exist up to the time the results
//! are written
using TCharPtrDoublePr = std::pair<const char*, double>;
using TCharPtrDoublePrVec = std::vector<TCharPtrDoublePr>;
using TCharPtrDoublePrVecIter = TCharPtrDoublePrVec::iterator;
using TCharPtrCharPtrDoublePrVecPr = std::pair<const char*, TCharPtrDoublePrVec>;
using TStrCharPtrCharPtrDoublePrVecPrUMap =
boost::unordered_map<std::string, TCharPtrCharPtrDoublePrVecPr>;
using TStrCharPtrCharPtrDoublePrVecPrUMapIter = TStrCharPtrCharPtrDoublePrVecPrUMap::iterator;
TStrCharPtrCharPtrDoublePrVecPrUMap influences;
// group by influence field
for (const auto& influenceResult : influenceResults) {
TCharPtrCharPtrDoublePrVecPr infResult(influenceResult.first.first->c_str(),
TCharPtrDoublePrVec());
auto insertResult = influences.emplace(*influenceResult.first.first, infResult);
insertResult.first->second.second.emplace_back(
influenceResult.first.second->c_str(), influenceResult.second);
}
// Order by influence
for (TStrCharPtrCharPtrDoublePrVecPrUMapIter iter = influences.begin();
iter != influences.end(); ++iter) {
std::sort(iter->second.second.begin(), iter->second.second.end(), INFLUENCE_LESS);
}
json::value influencesDoc = m_Writer.makeArray(influences.size());
for (TStrCharPtrCharPtrDoublePrVecPrUMapIter iter = influences.begin();
iter != influences.end(); ++iter) {
json::object influenceDoc;
json::array values = m_Writer.makeArray(influences.size());
for (TCharPtrDoublePrVecIter arrayIter = iter->second.second.begin();
arrayIter != iter->second.second.end(); ++arrayIter) {
m_Writer.pushBack(arrayIter->first, values);
}
m_Writer.addMember(INFLUENCER_FIELD_NAME, json::value(iter->second.first), influenceDoc);
m_Writer.addMember(INFLUENCER_FIELD_VALUES, values, influenceDoc);
m_Writer.pushBack(influenceDoc, influencesDoc);
}
// Note influences are written using the field name "influencers"
m_Writer.addMember(INFLUENCERS, influencesDoc, *docPtr);
}
void CJsonOutputWriter::addEventRateFields(const CHierarchicalResultsWriter::TResults& results,
TDocumentWeakPtr weakDoc) {
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
return;
}
// record_score, probability, fieldName, byFieldName, byFieldValue, partitionFieldName,
// partitionFieldValue, functionName, typical, actual, influences?
m_Writer.addDoubleFieldToObj(INITIAL_RECORD_SCORE,
results.s_NormalizedAnomalyScore, *docPtr);
m_Writer.addDoubleFieldToObj(RECORD_SCORE, results.s_NormalizedAnomalyScore, *docPtr);
m_Writer.addDoubleFieldToObj(PROBABILITY, results.s_Probability, *docPtr);
json::object anomalyScoreExplanation = m_Writer.makeObject();
this->writeAnomalyScoreExplanationObject(results, anomalyScoreExplanation);
m_Writer.addMember(ANOMALY_SCORE_EXPLANATION, anomalyScoreExplanation, *docPtr);
m_Writer.addDoubleFieldToObj(MULTI_BUCKET_IMPACT, results.s_MultiBucketImpact, *docPtr);
m_Writer.addStringFieldCopyToObj(FIELD_NAME, results.s_MetricValueField, *docPtr);
if (!results.s_ByFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(BY_FIELD_NAME, results.s_ByFieldName, *docPtr);
// If name is present then force output of value too, even when empty
m_Writer.addStringFieldCopyToObj(BY_FIELD_VALUE, results.s_ByFieldValue,
*docPtr, true);
// But allow correlatedByFieldValue to be unset if blank
m_Writer.addStringFieldCopyToObj(CORRELATED_BY_FIELD_VALUE,
results.s_CorrelatedByFieldValue, *docPtr);
}
if (!results.s_PartitionFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(PARTITION_FIELD_NAME,
results.s_PartitionFieldName, *docPtr);
// If name is present then force output of value too, even when empty
m_Writer.addStringFieldCopyToObj(
PARTITION_FIELD_VALUE, results.s_PartitionFieldValue, *docPtr, true);
}
m_Writer.addStringFieldCopyToObj(FUNCTION, results.s_FunctionName, *docPtr);
m_Writer.addStringFieldCopyToObj(FUNCTION_DESCRIPTION,
results.s_FunctionDescription, *docPtr);
m_Writer.addDoubleArrayFieldToObj(TYPICAL, results.s_BaselineMean, *docPtr);
m_Writer.addDoubleArrayFieldToObj(ACTUAL, results.s_CurrentMean, *docPtr);
}
void CJsonOutputWriter::addInfluencerFields(bool isBucketInfluencer,
const model::CHierarchicalResults::TNode& node,
TDocumentWeakPtr weakDoc) {
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
return;
}
m_Writer.addDoubleFieldToObj(PROBABILITY, node.probability(), *docPtr);
m_Writer.addDoubleFieldToObj(isBucketInfluencer ? INITIAL_SCORE : INITIAL_INFLUENCER_SCORE,
node.s_NormalizedAnomalyScore, *docPtr);
m_Writer.addDoubleFieldToObj(isBucketInfluencer ? ANOMALY_SCORE : INFLUENCER_SCORE,
node.s_NormalizedAnomalyScore, *docPtr);
const std::string& personFieldName = *node.s_Spec.s_PersonFieldName;
m_Writer.addStringFieldCopyToObj(INFLUENCER_FIELD_NAME, personFieldName, *docPtr);
if (isBucketInfluencer) {
m_Writer.addDoubleFieldToObj(RAW_ANOMALY_SCORE, node.s_RawAnomalyScore, *docPtr);
} else {
if (!personFieldName.empty()) {
// If name is present then force output of value too, even when empty
m_Writer.addStringFieldCopyToObj(
INFLUENCER_FIELD_VALUE, *node.s_Spec.s_PersonFieldValue, *docPtr, true);
}
}
}
void CJsonOutputWriter::limitNumberRecords(std::size_t count) {
m_RecordOutputLimit = count;
}
std::size_t CJsonOutputWriter::limitNumberRecords() const {
return m_RecordOutputLimit;
}
void CJsonOutputWriter::persistNormalizer(const model::CHierarchicalResultsNormalizer& normalizer,
core_t::TTime& persistTime) {
std::string quantilesState;
normalizer.toJson(m_LastNonInterimBucketTime, "api", quantilesState, true);
m_Writer.onObjectBegin();
m_Writer.onKey(QUANTILES);
// No need to copy the strings as the doc is written straight away
CModelSnapshotJsonWriter::writeQuantileState(
m_JobId, quantilesState, m_LastNonInterimBucketTime, m_Writer);
m_Writer.onObjectEnd();
persistTime = core::CTimeUtils::now();
LOG_TRACE(<< "Wrote quantiles state at " << persistTime);
}
void CJsonOutputWriter::pushAllocator(const std::string& allocatorName) {
m_Writer.pushAllocator(allocatorName);
}
void CJsonOutputWriter::removeAllocator(const std::string& allocatorName) {
m_Writer.removeAllocator(allocatorName);
}
void CJsonOutputWriter::popAllocator() {
m_Writer.popAllocator();
}
std::size_t CJsonOutputWriter::getOutputMemoryAllocatorUsage() const {
return m_Writer.getOutputMemoryAllocatorUsage();
}
void CJsonOutputWriter::reportMemoryUsage(const model::CResourceMonitor::SModelSizeStats& results) {
m_Writer.onObjectBegin();
CModelSizeStatsJsonWriter::write(m_JobId, results, m_Writer);
m_Writer.onObjectEnd();
LOG_TRACE(<< "Wrote memory usage results");
}
void CJsonOutputWriter::writeCategorizerStats(const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const model::SCategorizerStats& categorizerStats,
const TOptionalTime& timestamp) {
m_Writer.onObjectBegin();
CModelSizeStatsJsonWriter::writeCategorizerStats(m_JobId, partitionFieldName,
partitionFieldValue, categorizerStats,
timestamp, m_Writer);
m_Writer.onObjectEnd();
}
void CJsonOutputWriter::acknowledgeFlush(const std::string& flushId,
core_t::TTime lastFinalizedBucketEnd,
bool refreshRequired) {
m_Writer.onObjectBegin();
m_Writer.onKey(FLUSH);
m_Writer.onObjectBegin();
m_Writer.onKey(ID);
m_Writer.onString(flushId);
m_Writer.onKey(LAST_FINALIZED_BUCKET_END);
m_Writer.onTime(lastFinalizedBucketEnd);
m_Writer.onKey(REFRESH_REQUIRED);
m_Writer.onBool(refreshRequired);
m_Writer.onObjectEnd();
m_Writer.onObjectEnd();
// this shouldn't hang in buffers, so flush
m_Writer.flush();
LOG_TRACE(<< "Wrote flush with ID " << flushId);
}
void CJsonOutputWriter::writeCategoryDefinition(const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const CGlobalCategoryId& categoryId,
const std::string& terms,
const std::string& regex,
std::size_t maxMatchingFieldLength,
const TStrFSet& examples,
std::size_t numMatches,
const TGlobalCategoryIdVec& usurpedCategories) {
m_Writer.onObjectBegin();
m_Writer.onKey(CATEGORY_DEFINITION);
m_Writer.onObjectBegin();
m_Writer.onKey(JOB_ID);
m_Writer.onString(m_JobId);
if (partitionFieldName.empty() == false) {
m_Writer.onKey(PARTITION_FIELD_NAME);
m_Writer.onString(partitionFieldName);
m_Writer.onKey(PARTITION_FIELD_VALUE);
m_Writer.onString(partitionFieldValue);
}
m_Writer.onKey(CATEGORY_ID);
m_Writer.onInt(categoryId.globalId());
m_Writer.onKey(TERMS);
m_Writer.onString(terms);
m_Writer.onKey(REGEX);
m_Writer.onString(regex);
m_Writer.onKey(MAX_MATCHING_LENGTH);
m_Writer.onUint64(maxMatchingFieldLength);
m_Writer.onKey(EXAMPLES);
m_Writer.onArrayBegin();
for (TStrFSetCItr itr = examples.begin(); itr != examples.end(); ++itr) {
const std::string& example = *itr;
m_Writer.onString(example);
}
m_Writer.onArrayEnd();
m_Writer.onKey(NUM_MATCHES);
m_Writer.onUint64(numMatches);
m_Writer.onKey(PREFERRED_TO_CATEGORIES);
m_Writer.onArrayBegin();
for (const auto& globalCategoryId : usurpedCategories) {
m_Writer.onInt(globalCategoryId.globalId());
}
m_Writer.onArrayEnd();
m_Writer.onObjectEnd();
m_Writer.onObjectEnd();
}
CJsonOutputWriter::SBucketData::SBucketData()
: s_MaxBucketInfluencerNormalizedAnomalyScore(0.0), s_InputEventCount(0),
s_RecordCount(0), s_BucketSpan(0), s_HighestProbability(-1),
s_LowestInfluencerScore(101.0), s_LowestBucketInfluencerScore(101.0) {
}
void CJsonOutputWriter::writeAnomalyScoreExplanationObject(
const CHierarchicalResultsWriter::TResults& results,
json::object& anomalyScoreExplanation) {
switch (results.s_AnomalyScoreExplanation.s_AnomalyType) {
case TAnomalyScoreExplanation::E_DIP:
m_Writer.addStringFieldCopyToObj(ANOMALY_TYPE, ANOMALY_TYPE_DIP, anomalyScoreExplanation);
break;
case TAnomalyScoreExplanation::E_SPIKE:
m_Writer.addStringFieldCopyToObj(ANOMALY_TYPE, ANOMALY_TYPE_SPIKE,
anomalyScoreExplanation);
break;
case TAnomalyScoreExplanation::E_UNKNOWN:
break;
}
if (results.s_AnomalyScoreExplanation.s_AnomalyLength > 0) {
m_Writer.addUIntFieldToObj(ANOMALY_LENGTH,
results.s_AnomalyScoreExplanation.s_AnomalyLength,
anomalyScoreExplanation);
}
if (results.s_AnomalyScoreExplanation.s_SingleBucketImpact != 0) {
m_Writer.addIntFieldToObj(SINGLE_BUCKET_IMPACT,
results.s_AnomalyScoreExplanation.s_SingleBucketImpact,
anomalyScoreExplanation);
}
if (results.s_AnomalyScoreExplanation.s_MultiBucketImpact != 0) {
m_Writer.addIntFieldToObj(MULTI_BUCKET_IMPACT,
results.s_AnomalyScoreExplanation.s_MultiBucketImpact,
anomalyScoreExplanation);
}
if (results.s_AnomalyScoreExplanation.s_AnomalyCharacteristicsImpact != 0) {
m_Writer.addIntFieldToObj(ANOMALY_CHARACTERISTICS_IMPACT,
results.s_AnomalyScoreExplanation.s_AnomalyCharacteristicsImpact,
anomalyScoreExplanation);
}
if (std::isnan(results.s_AnomalyScoreExplanation.s_LowerConfidenceBound) == false) {
m_Writer.addDoubleFieldToObj(LOWER_CONFIDENCE_BOUND,
results.s_AnomalyScoreExplanation.s_LowerConfidenceBound,
anomalyScoreExplanation);
}
if (std::isnan(results.s_AnomalyScoreExplanation.s_TypicalValue) == false) {
m_Writer.addDoubleFieldToObj(TYPICAL_VALUE,
results.s_AnomalyScoreExplanation.s_TypicalValue,
anomalyScoreExplanation);
}
if (std::isnan(results.s_AnomalyScoreExplanation.s_UpperConfidenceBound) == false) {
m_Writer.addDoubleFieldToObj(UPPER_CONFIDENCE_BOUND,
results.s_AnomalyScoreExplanation.s_UpperConfidenceBound,
anomalyScoreExplanation);
}
if (results.s_AnomalyScoreExplanation.s_HighVariancePenalty) {
m_Writer.addBoolFieldToObj(HIGH_VARIANCE_PENALTY,
results.s_AnomalyScoreExplanation.s_HighVariancePenalty,
anomalyScoreExplanation);
}
if (results.s_AnomalyScoreExplanation.s_IncompleteBucketPenalty) {
m_Writer.addBoolFieldToObj(INCOMPLETE_BUCKET_PENALTY,
results.s_AnomalyScoreExplanation.s_IncompleteBucketPenalty,
anomalyScoreExplanation);
}
if (results.s_AnomalyScoreExplanation.s_MultimodalDistribution) {
m_Writer.addBoolFieldToObj(MULTIMODAL_DISTRIBUTION,
results.s_AnomalyScoreExplanation.s_MultimodalDistribution,
anomalyScoreExplanation);
}
if (results.s_AnomalyScoreExplanation.s_ByFieldFirstOccurrence) {
m_Writer.addBoolFieldToObj(BY_FIELD_FIRST_OCCURRENCE,
results.s_AnomalyScoreExplanation.s_ByFieldFirstOccurrence,
anomalyScoreExplanation);
}
if (results.s_AnomalyScoreExplanation.s_ByFieldActualConcentration > 0.0 &&
results.s_AnomalyScoreExplanation.s_ByFieldTypicalConcentration > 0.0) {
double byFieldRelativeRarity{
results.s_AnomalyScoreExplanation.s_ByFieldTypicalConcentration /
results.s_AnomalyScoreExplanation.s_ByFieldActualConcentration};
m_Writer.addDoubleFieldToObj(BY_FIELD_RELATIVE_RARITY, byFieldRelativeRarity,
anomalyScoreExplanation);
}
}
}
}