lib/model/unittest/CEventRateAnomalyDetectorTest.cc (218 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <core/CJsonStatePersistInserter.h> #include <core/CJsonStateRestoreTraverser.h> #include <core/CLogger.h> #include <core/CStringUtils.h> #include <maths/common/CIntegerTools.h> #include <maths/common/CModelWeight.h> #include <model/CAnomalyDetector.h> #include <model/CHierarchicalResults.h> #include <model/CHierarchicalResultsAggregator.h> #include <model/CHierarchicalResultsProbabilityFinalizer.h> #include <model/CLimits.h> #include <model/CSearchKey.h> #include <model/FunctionTypes.h> #include <boost/test/unit_test.hpp> #include <fstream> #include <map> #include <set> BOOST_AUTO_TEST_SUITE(CEventRateAnomalyDetectorTest) namespace { using TTimeVec = std::vector<ml::core_t::TTime>; using TStrVec = std::vector<std::string>; using TTimeDoubleMap = std::map<ml::core_t::TTime, double>; using TTimeDoubleMapCItr = TTimeDoubleMap::const_iterator; using TTimeStrPr = std::pair<ml::core_t::TTime, std::string>; using TTimeStrPrSet = std::set<TTimeStrPr>; const std::string EMPTY_STRING; class CResultWriter : public ml::model::CHierarchicalResultsVisitor { public: CResultWriter(const ml::model::CAnomalyDetectorModelConfig& modelConfig, const ml::model::CLimits& limits) : m_ModelConfig(modelConfig), m_Limits(limits), m_Calls(0) {} void operator()(ml::model::CAnomalyDetector& detector, ml::core_t::TTime start, ml::core_t::TTime end) { ml::model::CHierarchicalResults results; detector.buildResults(start, end, results); results.buildHierarchy(); ml::model::CHierarchicalResultsAggregator aggregator(m_ModelConfig); results.bottomUpBreadthFirst(aggregator); ml::model::CHierarchicalResultsProbabilityFinalizer finalizer; results.bottomUpBreadthFirst(finalizer); results.bottomUpBreadthFirst(*this); } void visit(const ml::model::CHierarchicalResults& results, const ml::model::CHierarchicalResults::TNode& node, bool pivot) override { if (pivot) { return; } if (!this->shouldWriteResult(m_Limits, results, node, pivot)) { return; } if (this->isSimpleCount(node)) { return; } if (!this->isLeaf(node)) { return; } const std::string& analysisFieldValue = *node.s_Spec.s_PersonFieldValue; ml::core_t::TTime bucketTime = node.s_BucketStartTime; double anomalyFactor = node.s_RawAnomalyScore; LOG_DEBUG(<< analysisFieldValue << " bucket time " << bucketTime << " anomalyFactor " << anomalyFactor); ++m_Calls; m_AllAnomalies.insert(TTimeStrPr(bucketTime, analysisFieldValue)); m_AnomalyScores[bucketTime] += anomalyFactor; } bool operator()(ml::core_t::TTime time, const ml::model::CHierarchicalResults::TNode& node, bool isBucketInfluencer) { LOG_DEBUG(<< (isBucketInfluencer ? "BucketInfluencer" : "Influencer ") << node.s_Spec.print() << " initial score " << node.probability() << ", time: " << time); return true; } size_t calls() const { return m_Calls; } size_t numDistinctTimes() const { return m_AllAnomalies.size(); } const TTimeDoubleMap& anomalyScores() const { return m_AnomalyScores; } const TTimeStrPrSet& allAnomalies() const { return m_AllAnomalies; } private: const ml::model::CAnomalyDetectorModelConfig& m_ModelConfig; ml::model::CLimits m_Limits; std::size_t m_Calls; TTimeStrPrSet m_AllAnomalies; TTimeDoubleMap m_AnomalyScores; }; void importData(ml::core_t::TTime firstTime, ml::core_t::TTime lastTime, ml::core_t::TTime bucketLength, CResultWriter& outputResults, const TStrVec& fileNames, ml::model::CAnomalyDetector& detector) { using TifstreamPtr = std::shared_ptr<std::ifstream>; using TifstreamPtrVec = std::vector<TifstreamPtr>; TifstreamPtrVec ifss; for (std::size_t i = 0; i < fileNames.size(); ++i) { TifstreamPtr ifs(new std::ifstream(fileNames[i].c_str())); BOOST_TEST_REQUIRE(ifs->is_open()); ifss.push_back(ifs); } ml::core_t::TTime lastBucketTime = ml::maths::common::CIntegerTools::ceil(firstTime, bucketLength); TTimeVec times(ifss.size()); for (std::size_t i = 0; i < ifss.size(); ++i) { std::string line; std::getline(*ifss[i], line); BOOST_TEST_REQUIRE(ml::core::CStringUtils::stringToType(line, times[i])); } ml::core_t::TTime time(0); for (;;) { std::size_t file(std::min_element(times.begin(), times.end()) - times.begin()); std::string attributeFieldValue = fileNames[file]; time = times[file]; if (time == std::numeric_limits<ml::core_t::TTime>::max()) { break; } for (/**/; lastBucketTime + bucketLength <= time; lastBucketTime += bucketLength) { outputResults(detector, lastBucketTime, lastBucketTime + bucketLength); } ml::model::CAnomalyDetector::TStrCPtrVec fieldValues; fieldValues.push_back(&attributeFieldValue); detector.addRecord(time, fieldValues); std::string line; if (!std::getline(*ifss[file], line)) { times[file] = std::numeric_limits<ml::core_t::TTime>::max(); ifss[file].reset(); } else { BOOST_TEST_REQUIRE(ml::core::CStringUtils::stringToType(line, times[file])); } } for (/**/; lastBucketTime + bucketLength <= lastTime; lastBucketTime += bucketLength) { outputResults(detector, lastBucketTime, lastBucketTime + bucketLength); } } } BOOST_AUTO_TEST_CASE(testAnomalies) { // We have 11 instances of correlated rare 503s and SQL statements. static const double HIGH_ANOMALY_SCORE(0.0014); static const size_t EXPECTED_ANOMALOUS_HOURS(11); static const ml::core_t::TTime FIRST_TIME(1346713620); static const ml::core_t::TTime LAST_TIME(1347317974); static const ml::core_t::TTime BUCKET_SIZE(600); ml::model::CAnomalyDetectorModelConfig modelConfig = ml::model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); ml::model::CLimits limits; ml::model::CSearchKey key(1, // detectorIndex ml::model::function_t::E_IndividualRare, false, ml::model_t::E_XF_None, EMPTY_STRING, "status"); ml::model::CAnomalyDetector detector(limits, modelConfig, EMPTY_STRING, FIRST_TIME, modelConfig.factory(key)); CResultWriter writer(modelConfig, limits); TStrVec files; files.push_back("testfiles/status200.txt"); files.push_back("testfiles/status503.txt"); files.push_back("testfiles/mysqlabort.txt"); importData(FIRST_TIME, LAST_TIME, BUCKET_SIZE, writer, files, detector); LOG_DEBUG(<< "visitor.calls() = " << writer.calls()); const TTimeDoubleMap& anomalyScores = writer.anomalyScores(); TTimeVec peaks; for (const auto& score : anomalyScores) { if (score.second > HIGH_ANOMALY_SCORE) { peaks.push_back(score.first); } } BOOST_REQUIRE_EQUAL(EXPECTED_ANOMALOUS_HOURS, peaks.size()); std::size_t detected503 = 0; std::size_t detectedMySQL = 0; for (std::size_t i = 0; i < peaks.size(); ++i) { LOG_DEBUG(<< "Checking for status 503 anomaly at " << peaks[i]); if (writer.allAnomalies().count(TTimeStrPr(peaks[i], "testfiles/status503.txt"))) { ++detected503; } LOG_DEBUG(<< "Checking for MySQL anomaly at " << peaks[i]); if (writer.allAnomalies().count(TTimeStrPr(peaks[i], "testfiles/mysqlabort.txt"))) { ++detectedMySQL; } } LOG_DEBUG(<< "# 503 = " << detected503 << ", # My SQL = " << detectedMySQL); BOOST_REQUIRE_EQUAL(11, detected503); BOOST_REQUIRE_EQUAL(11, detectedMySQL); } BOOST_AUTO_TEST_CASE(testPersist) { static const ml::core_t::TTime FIRST_TIME(1346713620); static const ml::core_t::TTime LAST_TIME(1347317974); static const ml::core_t::TTime BUCKET_SIZE(3600); ml::model::CAnomalyDetectorModelConfig modelConfig = ml::model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); ml::model::CLimits limits; ml::model::CSearchKey key(1, // detectorIndex ml::model::function_t::E_IndividualCount, false, ml::model_t::E_XF_None, EMPTY_STRING, "status"); ml::model::CAnomalyDetector origDetector(limits, modelConfig, EMPTY_STRING, FIRST_TIME, modelConfig.factory(key)); CResultWriter writer(modelConfig, limits); TStrVec files; files.push_back("testfiles/status503.txt"); importData(FIRST_TIME, LAST_TIME, BUCKET_SIZE, writer, files, origDetector); std::ostringstream origJson; ml::core::CJsonStatePersistInserter::persist( origJson, std::bind_front(&ml::model::CAnomalyDetector::acceptPersistInserter, &origDetector)); LOG_DEBUG(<< "Event rate detector JSON representation:\n" << origJson.str()); // Restore the JSON into a new detector ml::model::CAnomalyDetector restoredDetector(limits, modelConfig, "", 0, modelConfig.factory(key)); { // The traverser expects the state json in a embedded document std::istringstream origJsonStrm("{\"topLevel\" : " + origJson.str() + "}"); ml::core::CJsonStateRestoreTraverser traverser(origJsonStrm); BOOST_TEST_REQUIRE(traverser.traverseSubLevel( std::bind(&ml::model::CAnomalyDetector::acceptRestoreTraverser, &restoredDetector, EMPTY_STRING, std::placeholders::_1))); } // The JSON representation of the new detector should be the same as the original std::ostringstream newJson; ml::core::CJsonStatePersistInserter::persist( newJson, std::bind_front(&ml::model::CAnomalyDetector::acceptPersistInserter, &restoredDetector)); BOOST_REQUIRE_EQUAL(origJson.str(), newJson.str()); } BOOST_AUTO_TEST_SUITE_END()