lib/model/unittest/CMetricAnomalyDetectorTest.cc (331 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <core/CContainerPrinter.h> #include <core/CJsonStatePersistInserter.h> #include <core/CJsonStateRestoreTraverser.h> #include <core/CRegex.h> #include <maths/common/CIntegerTools.h> #include <maths/common/CModelWeight.h> #include <model/CAnomalyDetector.h> #include <model/CHierarchicalResults.h> #include <model/CHierarchicalResultsAggregator.h> #include <model/CHierarchicalResultsPopulator.h> #include <model/CHierarchicalResultsProbabilityFinalizer.h> #include <model/CLimits.h> #include <model/CSearchKey.h> #include <model/FunctionTypes.h> #include <test/BoostTestCloseAbsolute.h> #include <test/CTimeSeriesTestData.h> #include <boost/test/unit_test.hpp> #include <algorithm> #include <fstream> #include <numeric> #include <string> #include <vector> BOOST_AUTO_TEST_SUITE(CMetricAnomalyDetectorTest) using namespace ml; namespace { using TTimeTimePr = std::pair<core_t::TTime, core_t::TTime>; using TTimeTimePrVec = std::vector<TTimeTimePr>; using TDoubleVec = std::vector<double>; bool doIntersect(const TTimeTimePr& i1, const TTimeTimePr& i2) { return !(i2.second <= i1.first || i1.second <= i2.first); } class CResultWriter : public ml::model::CHierarchicalResultsVisitor { public: static const double HIGH_ANOMALY_SCORE; public: CResultWriter(const model::CAnomalyDetectorModelConfig& modelConfig, const model::CLimits& limits, core_t::TTime bucketLength) : m_ModelConfig(modelConfig), m_Limits(limits), m_BucketLength(bucketLength) {} void operator()(ml::model::CAnomalyDetector& detector, ml::core_t::TTime start, ml::core_t::TTime end) { ml::model::CHierarchicalResults results; detector.buildResults(start, end, results); results.buildHierarchy(); ml::model::CHierarchicalResultsAggregator aggregator(m_ModelConfig); results.bottomUpBreadthFirst(aggregator); ml::model::CHierarchicalResultsProbabilityFinalizer finalizer; results.bottomUpBreadthFirst(finalizer); ml::model::CHierarchicalResultsPopulator populator(m_Limits); results.bottomUpBreadthFirst(populator); results.bottomUpBreadthFirst(*this); } //! Visit a node. void visit(const ml::model::CHierarchicalResults& results, const ml::model::CHierarchicalResults::TNode& node, bool pivot) override { if (pivot) { return; } if (!this->shouldWriteResult(m_Limits, results, node, pivot)) { return; } if (this->isSimpleCount(node)) { return; } if (!this->isLeaf(node)) { return; } core_t::TTime bucketTime = node.s_BucketStartTime; double anomalyFactor = node.s_RawAnomalyScore; if (anomalyFactor > HIGH_ANOMALY_SCORE) { m_HighAnomalyTimes.push_back(TTimeTimePr(bucketTime, bucketTime + m_BucketLength)); m_HighAnomalyFactors.push_back(anomalyFactor); } else if (anomalyFactor > 0.0) { m_AnomalyFactors.push_back(anomalyFactor); std::uint64_t currentRate(0); if (node.s_AnnotatedProbability.s_CurrentBucketCount) { currentRate = *node.s_AnnotatedProbability.s_CurrentBucketCount; } m_AnomalyRates.push_back(static_cast<double>(currentRate)); } } bool operator()(ml::core_t::TTime time, const ml::model::CHierarchicalResults::TNode& node, bool isBucketInfluencer) { LOG_DEBUG(<< (isBucketInfluencer ? "BucketInfluencer" : "Influencer ") << node.s_Spec.print() << " initial score " << node.probability() << ", time: " << time); return true; } const TTimeTimePrVec& highAnomalyTimes() const { return m_HighAnomalyTimes; } const TDoubleVec& highAnomalyFactors() const { return m_HighAnomalyFactors; } const TDoubleVec& anomalyFactors() const { return m_AnomalyFactors; } const TDoubleVec& anomalyRates() const { return m_AnomalyRates; } private: const model::CAnomalyDetectorModelConfig& m_ModelConfig; const model::CLimits& m_Limits; core_t::TTime m_BucketLength; TTimeTimePrVec m_HighAnomalyTimes; TDoubleVec m_HighAnomalyFactors; TDoubleVec m_AnomalyFactors; TDoubleVec m_AnomalyRates; }; const double CResultWriter::HIGH_ANOMALY_SCORE(0.35); void importData(core_t::TTime firstTime, core_t::TTime lastTime, core_t::TTime bucketLength, CResultWriter& outputResults, const std::string& fileName, model::CAnomalyDetector& detector) { test::CTimeSeriesTestData::TTimeDoublePrVec timeData; BOOST_TEST_REQUIRE(test::CTimeSeriesTestData::parse(fileName, timeData)); core_t::TTime lastBucketTime = maths::common::CIntegerTools::ceil(firstTime, bucketLength); for (std::size_t i = 0; i < timeData.size(); ++i) { core_t::TTime time = timeData[i].first; for (/**/; lastBucketTime + bucketLength <= time; lastBucketTime += bucketLength) { outputResults(detector, lastBucketTime, lastBucketTime + bucketLength); } std::string value = core::CStringUtils::typeToString(timeData[i].second); model::CAnomalyDetector::TStrCPtrVec fieldValues; fieldValues.push_back(&fileName); fieldValues.push_back(&value); detector.addRecord(time, fieldValues); } for (/**/; lastBucketTime + bucketLength <= lastTime; lastBucketTime += bucketLength) { outputResults(detector, lastBucketTime, lastBucketTime + bucketLength); } } void importCsvData(core_t::TTime firstTime, core_t::TTime bucketLength, CResultWriter& outputResults, const std::string& fileName, model::CAnomalyDetector& detector) { using TifstreamPtr = std::shared_ptr<std::ifstream>; TifstreamPtr ifs(new std::ifstream(fileName.c_str())); BOOST_TEST_REQUIRE(ifs->is_open()); core::CRegex regex; BOOST_TEST_REQUIRE(regex.init(",")); std::string line; // read the header BOOST_TEST_REQUIRE(std::getline(*ifs, line).good()); core_t::TTime lastBucketTime = firstTime; while (std::getline(*ifs, line)) { LOG_TRACE(<< "Got string: " << line); core::CRegex::TStrVec tokens; regex.split(line, tokens); core_t::TTime time{0}; BOOST_TEST_REQUIRE(core::CStringUtils::stringToType(tokens[0], time)); for (/**/; lastBucketTime + bucketLength <= time; lastBucketTime += bucketLength) { outputResults(detector, lastBucketTime, lastBucketTime + bucketLength); } model::CAnomalyDetector::TStrCPtrVec fieldValues; fieldValues.push_back(&tokens[2]); fieldValues.push_back(&tokens[1]); detector.addRecord(time, fieldValues); } outputResults(detector, lastBucketTime, lastBucketTime + bucketLength); ifs.reset(); } const std::string EMPTY_STRING; } BOOST_AUTO_TEST_CASE(testAnomalies) { // The test data has one genuine anomaly in the interval // [1360617335, 1360617481]. The rest of the samples are // Gaussian with mean 30 and standard deviation 5. The // arrival rate it Poisson distributed with constant mean // in each of the 24 hour periods. However, the rate varies // from hour to hour. In particular, the mean rates are: // // Interval | Mean // ------------------+-------- // [ 0hr - 1hr ] | 1000 // [ 1hr - 2hr ] | 1000 // [ 2hr - 3hr ] | 1200 // [ 3hr - 4hr ] | 1400 // [ 4hr - 5hr ] | 1600 // [ 5hr - 6hr ] | 1800 // [ 6hr - 7hr ] | 500 // [ 7hr - 8hr ] | 200 // [ 8hr - 9hr ] | 100 // [ 9hr - 10hr ] | 50 // [ 10hr - 11hr ] | 50 // [ 11hr - 12hr ] | 50 // [ 12hr - 13hr ] | 50 // [ 13hr - 14hr ] | 50 // [ 14hr - 15hr ] | 50 // [ 15hr - 16hr ] | 50 // [ 16hr - 17hr ] | 50 // [ 17hr - 18hr ] | 100 // [ 18hr - 19hr ] | 200 // [ 19hr - 20hr ] | 500 // [ 20hr - 21hr ] | 1500 // [ 21hr - 22hr ] | 1400 // [ 22hr - 23hr ] | 1000 // [ 23hr - 24hr ] | 1000 static const core_t::TTime FIRST_TIME(1360540800); static const core_t::TTime LAST_TIME(FIRST_TIME + 86400); static const core_t::TTime BUCKET_LENGTHS[] = {120, 150, 180, 210, 240, 300, 450, 600, 900, 1200}; static const TTimeTimePr ANOMALOUS_INTERVALS[] = { TTimeTimePr(1360576852, 1360578629), TTimeTimePr(1360617335, 1360617481)}; double highRateNoise = 0.0; double lowRateNoise = 0.0; for (auto bucketLength : BUCKET_LENGTHS) { model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(bucketLength); modelConfig.useMultibucketFeatures(false); model::CLimits limits; model::CSearchKey key(1, // detectorIndex model::function_t::E_IndividualMetric, false, model_t::E_XF_None, "n/a", "n/a"); model::CAnomalyDetector detector(limits, modelConfig, "", FIRST_TIME, modelConfig.factory(key)); CResultWriter writer(modelConfig, limits, bucketLength); importData(FIRST_TIME, LAST_TIME, bucketLength, writer, "testfiles/variable_rate_metric.data", detector); TTimeTimePrVec highAnomalyTimes(writer.highAnomalyTimes()); TDoubleVec highAnomalyFactors(writer.highAnomalyFactors()); TDoubleVec anomalyFactors(writer.anomalyFactors()); TDoubleVec anomalyRates(writer.anomalyRates()); LOG_DEBUG(<< "bucket length = " << bucketLength); LOG_DEBUG(<< "high anomalies in = " << highAnomalyTimes); LOG_DEBUG(<< "high anomaly factors = " << highAnomalyFactors); LOG_DEBUG(<< "anomaly factors = " << anomalyFactors); LOG_DEBUG(<< "anomaly rates = " << anomalyRates); for (std::size_t j = 0; j < highAnomalyTimes.size(); ++j) { LOG_DEBUG(<< "Testing " << highAnomalyTimes[j] << " " << highAnomalyFactors[j]); BOOST_TEST_REQUIRE( (doIntersect(highAnomalyTimes[j], ANOMALOUS_INTERVALS[0]) || doIntersect(highAnomalyTimes[j], ANOMALOUS_INTERVALS[1]))); } if (!anomalyFactors.empty()) { double signal = std::accumulate(highAnomalyFactors.begin(), highAnomalyFactors.end(), 0.0); double noise = std::accumulate(anomalyFactors.begin(), anomalyFactors.end(), 0.0); LOG_DEBUG(<< "S/N = " << (signal / noise)); BOOST_TEST_REQUIRE(signal / noise > 33.0); } // Find the high/low rate partition point. TDoubleVec orderedAnomalyRates(anomalyRates); std::sort(orderedAnomalyRates.begin(), orderedAnomalyRates.end()); std::size_t maxStep = 1; for (std::size_t j = 2; j < orderedAnomalyRates.size(); ++j) { if (orderedAnomalyRates[j] - orderedAnomalyRates[j - 1] > orderedAnomalyRates[maxStep] - orderedAnomalyRates[maxStep - 1]) { maxStep = j; } } double partitionRate = 0.0; if (maxStep < orderedAnomalyRates.size()) { partitionRate = 0.5 * (orderedAnomalyRates[maxStep] + orderedAnomalyRates[maxStep - 1]); } LOG_DEBUG(<< "partition rate = " << partitionRate); // Compute the ratio of noise in the two rate channels. for (std::size_t j = 0; j < anomalyFactors.size(); ++j) { (anomalyRates[j] > partitionRate ? highRateNoise : lowRateNoise) += anomalyFactors[j]; } } LOG_DEBUG(<< "high rate noise = " << highRateNoise << ", low rate noise = " << lowRateNoise); // We don't have significantly more noise in the low rate channel. BOOST_TEST_REQUIRE(lowRateNoise / highRateNoise < 1.5); } BOOST_AUTO_TEST_CASE(testPersist) { static const core_t::TTime FIRST_TIME(1360540800); static const core_t::TTime LAST_TIME(FIRST_TIME + 86400); static const core_t::TTime BUCKET_LENGTH(300); core::CProgramCounters& counters = core::CProgramCounters::instance(); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH); model::CLimits limits; model::CSearchKey key(1, // detectorIndex model::function_t::E_IndividualMetric, false, model_t::E_XF_None, "responsetime", "Airline"); model::CAnomalyDetector origDetector(limits, modelConfig, EMPTY_STRING, FIRST_TIME, modelConfig.factory(key)); CResultWriter writer(modelConfig, limits, BUCKET_LENGTH); importData(FIRST_TIME, LAST_TIME, BUCKET_LENGTH, writer, "testfiles/variable_rate_metric.data", origDetector); std::ostringstream origJson; core::CJsonStatePersistInserter::persist( origJson, std::bind_front(&model::CAnomalyDetector::acceptPersistInserter, &origDetector)); std::ostringstream origStaticsJson; core::CJsonStatePersistInserter::persist( origStaticsJson, &core::CProgramCounters::staticsAcceptPersistInserter); LOG_TRACE(<< "Event rate detector JSON representation:\n" << origJson.str()); std::uint64_t peakMemoryUsageBeforeRestoring = counters.counter(counter_t::E_TSADPeakMemoryUsage); // Clear the counter to verify that restoring detector also restores counter's value. counters.counter(counter_t::E_TSADPeakMemoryUsage) = 0; BOOST_REQUIRE_EQUAL(0, counters.counter(counter_t::E_TSADPeakMemoryUsage)); // Restore the JSON into a new detector model::CAnomalyDetector restoredDetector(limits, modelConfig, EMPTY_STRING, 0, modelConfig.factory(key)); { // The traverser expects the state json in a embedded document std::istringstream origJsonStrm("{\"topLevel\" : " + origJson.str() + "}"); core::CJsonStateRestoreTraverser traverser(origJsonStrm); BOOST_TEST_REQUIRE(traverser.traverseSubLevel( std::bind(&model::CAnomalyDetector::acceptRestoreTraverser, &restoredDetector, EMPTY_STRING, std::placeholders::_1))); } { std::istringstream origStaticsJsonStrm{ "{\"topLevel\":" + origStaticsJson.str() + "}"}; core::CJsonStateRestoreTraverser traverser(origStaticsJsonStrm); BOOST_TEST_REQUIRE(traverser.traverseSubLevel( &core::CProgramCounters::staticsAcceptRestoreTraverser)); } std::uint64_t peakMemoryUsageAfterRestoring = counters.counter(counter_t::E_TSADPeakMemoryUsage); BOOST_REQUIRE_EQUAL(peakMemoryUsageBeforeRestoring, peakMemoryUsageAfterRestoring); // The JSON representation of the new detector should be the same as the original std::ostringstream newJson; core::CJsonStatePersistInserter::persist( newJson, std::bind_front(&model::CAnomalyDetector::acceptPersistInserter, &restoredDetector)); BOOST_REQUIRE_EQUAL(origJson.str(), newJson.str()); } BOOST_AUTO_TEST_CASE(testExcludeFrequent) { static const core_t::TTime FIRST_TIME(1406916000); static const core_t::TTime BUCKET_LENGTH(3600); { model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH); modelConfig.useMultibucketFeatures(false); model::CLimits limits; model::CSearchKey key(1, // detectorIndex model::function_t::E_IndividualMetric, false, model_t::E_XF_None, "bytes", "host"); model::CAnomalyDetector detector(limits, modelConfig, "", FIRST_TIME, modelConfig.factory(key)); CResultWriter writer(modelConfig, limits, BUCKET_LENGTH); importCsvData(FIRST_TIME, BUCKET_LENGTH, writer, "testfiles/excludefrequent_two_series.txt", detector); TTimeTimePrVec highAnomalyTimes(writer.highAnomalyTimes()); TDoubleVec highAnomalyFactors(writer.highAnomalyFactors()); std::sort(highAnomalyFactors.begin(), highAnomalyFactors.end()); LOG_DEBUG(<< "high anomalies in = " << highAnomalyTimes); LOG_DEBUG(<< "high anomaly factors = " << highAnomalyFactors); // expect there to be 2 anomalies BOOST_REQUIRE_EQUAL(2, highAnomalyTimes.size()); BOOST_REQUIRE_CLOSE_ABSOLUTE(99.0, highAnomalyFactors[1], 2.0); } { model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH); modelConfig.useMultibucketFeatures(false); model::CLimits limits; model::CSearchKey key(1, // detectorIndex model::function_t::E_IndividualMetric, false, model_t::E_XF_By, "bytes", "host"); model::CAnomalyDetector detector(limits, modelConfig, "", FIRST_TIME, modelConfig.factory(key)); CResultWriter writer(modelConfig, limits, BUCKET_LENGTH); importCsvData(FIRST_TIME, BUCKET_LENGTH, writer, "testfiles/excludefrequent_two_series.txt", detector); TTimeTimePrVec highAnomalyTimes(writer.highAnomalyTimes()); TDoubleVec highAnomalyFactors(writer.highAnomalyFactors()); LOG_DEBUG(<< "high anomalies in = " << highAnomalyTimes); LOG_DEBUG(<< "high anomaly factors = " << highAnomalyFactors); // expect there to be 1 anomaly BOOST_REQUIRE_EQUAL(1, highAnomalyTimes.size()); BOOST_REQUIRE_CLOSE_ABSOLUTE(12.0, highAnomalyFactors[0], 2.0); } } BOOST_AUTO_TEST_SUITE_END()