lib/api/unittest/CAnomalyJobTest.cc (919 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <core/CDataSearcher.h> #include <core/CJsonOutputStreamWrapper.h> #include <core/CLogger.h> #include <core/COsFileFuncs.h> #include <core/CRegex.h> #include <model/CAnomalyDetectorModelConfig.h> #include <model/CDataGatherer.h> #include <model/CLimits.h> #include <api/CAnomalyJobConfig.h> #include <api/CCsvInputParser.h> #include <api/CHierarchicalResultsWriter.h> #include <api/CNdJsonInputParser.h> #include <api/CSingleStreamDataAdder.h> #include <api/CSingleStreamSearcher.h> #include <api/CStateRestoreStreamFilter.h> #include "CTestAnomalyJob.h" #include <boost/iostreams/filtering_stream.hpp> #include <boost/json.hpp> #include <boost/test/unit_test.hpp> #include <cstdio> #include <fstream> #include <map> #include <random> #include <sstream> BOOST_TEST_DONT_PRINT_LOG_VALUE(json::array::const_iterator) BOOST_TEST_DONT_PRINT_LOG_VALUE(json::object::const_iterator) BOOST_AUTO_TEST_SUITE(CAnomalyJobTest) namespace { void reportPersistComplete(ml::api::CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport, std::string& snapshotIdOut, size_t& numDocsOut) { LOG_INFO(<< "Persist complete with description: " << modelSnapshotReport.s_Description); snapshotIdOut = modelSnapshotReport.s_SnapshotId; numDocsOut = modelSnapshotReport.s_NumDocs; } //! \brief //! Mock object for state restore unit tests. //! //! DESCRIPTION:\n //! CDataSearcher that returns an empty stream. //! class CEmptySearcher : public ml::core::CDataSearcher { public: //! Do a search that results in an empty input stream. TIStreamP search(size_t /*currentDocNum*/, size_t /*limit*/) override { return TIStreamP(new std::istringstream()); } }; //! \brief //! Mock object for unit tests //! //! DESCRIPTION:\n //! Mock object for gathering anomaly results. //! //! IMPLEMENTATION DECISIONS:\n //! Only the minimal set of required functions are implemented. //! class CSingleResultVisitor : public ml::model::CHierarchicalResultsVisitor { public: void visit(const ml::model::CHierarchicalResults& /*results*/, const TNode& node, bool /*pivot*/) override { if (!isSimpleCount(node) && isLeaf(node)) { if (node.s_AnnotatedProbability.s_AttributeProbabilities.empty()) { return; } if (node.s_Model == nullptr) { return; } const ml::model::SAttributeProbability& attribute = node.s_AnnotatedProbability.s_AttributeProbabilities[0]; m_LastResult = node.s_Model->currentBucketValue( attribute.s_Feature, 0, 0, node.s_BucketStartTime)[0]; } } double lastResults() const { return m_LastResult; } private: double m_LastResult{0.0}; }; class CMultiResultVisitor : public ml::model::CHierarchicalResultsVisitor { public: void visit(const ml::model::CHierarchicalResults& /*results*/, const TNode& node, bool /*pivot*/) override { if (!isSimpleCount(node) && isLeaf(node)) { if (node.s_AnnotatedProbability.s_AttributeProbabilities.empty()) { return; } if (node.s_Model == nullptr) { return; } std::size_t pid; const ml::model::CDataGatherer& gatherer = node.s_Model->dataGatherer(); if (!gatherer.personId(node.s_Spec.s_PersonFieldValue.value_or(""), pid)) { LOG_ERROR(<< "No identifier for '" << node.s_Spec.s_PersonFieldValue.value_or("") << "'"); return; } for (std::size_t i = 0; i < node.s_AnnotatedProbability.s_AttributeProbabilities.size(); ++i) { const ml::model::SAttributeProbability& attribute = node.s_AnnotatedProbability.s_AttributeProbabilities[i]; m_LastResult += node.s_Model->currentBucketValue( attribute.s_Feature, pid, attribute.s_Cid, node.s_BucketStartTime)[0]; } } } double lastResults() const { return m_LastResult; } private: double m_LastResult{0.0}; }; class CResultsScoreVisitor : public ml::model::CHierarchicalResultsVisitor { public: explicit CResultsScoreVisitor(int score) : m_Score(score) {} void visit(const ml::model::CHierarchicalResults& /*results*/, const TNode& node, bool /*pivot*/) override { if (isRoot(node)) { node.s_NormalizedAnomalyScore = m_Score; } } private: int m_Score; }; size_t countBuckets(const std::string& key, const std::string& output) { size_t count = 0; json::error_code ec; json::value results = json::parse(output, ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(results.is_array()); const json::array& allRecords = results.as_array(); for (const auto& r : allRecords) { BOOST_TEST_REQUIRE(r.is_object()); json::object::const_iterator recordsIt = r.as_object().find(key); if (recordsIt != r.as_object().end()) { ++count; } } return count; } bool findLine(const std::string& regex, const ml::core::CRegex::TStrVec& lines) { ml::core::CRegex rx; rx.init(regex); std::size_t pos = 0; for (const auto& line : lines) { if (rx.search(line, pos)) { return true; } } return false; } const ml::core_t::TTime BUCKET_SIZE(3600); using TStrStrPr = std::pair<std::string, std::string>; using TStrStrPrVec = std::vector<TStrStrPr>; } using namespace ml; BOOST_AUTO_TEST_CASE(testBadTimes) { { // Test with no time field model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig( "metric", "value", "", "", "greenhouse"); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); CTestAnomalyJob::TStrStrUMap dataRows; dataRows["wibble"] = "12345678"; dataRows["value"] = "1.0"; dataRows["greenhouse"] = "rhubarb"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_REQUIRE_EQUAL(uint64_t(0), job.numRecordsHandled()); } { // Test with bad time field model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig( "metric", "value", "", "", "greenhouse"); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); CTestAnomalyJob::TStrStrUMap dataRows; dataRows["time"] = "hello"; dataRows["value"] = "1.0"; dataRows["greenhouse"] = "rhubarb"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_REQUIRE_EQUAL(uint64_t(0), job.numRecordsHandled()); } { // Test with bad time field format model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig( "metric", "value", "", "", "greenhouse"); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream, CTestAnomalyJob::TPersistCompleteFunc(), nullptr, -1, "time", "%Y%m%m%H%M%S"); CTestAnomalyJob::TStrStrUMap dataRows; dataRows["time"] = "hello world"; dataRows["value"] = "1.0"; dataRows["greenhouse"] = "rhubarb"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_REQUIRE_EQUAL(uint64_t(0), job.numRecordsHandled()); } } BOOST_AUTO_TEST_CASE(testOutOfSequence) { { // Test out of sequence record model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig( "metric", "value", "", "", "greenhouse"); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); job.description(); job.descriptionAndDebugMemoryUsage(); // add records which create partitions CTestAnomalyJob::TStrStrUMap dataRows; dataRows["time"] = "12345678"; dataRows["value"] = "1.0"; dataRows["greenhouse"] = "rhubarb"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_REQUIRE_EQUAL(uint64_t(1), job.numRecordsHandled()); dataRows["time"] = "1234567"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_REQUIRE_EQUAL(uint64_t(1), job.numRecordsHandled()); job.finalise(); } } BOOST_AUTO_TEST_CASE(testOutputBucketResultsUntilGivenIncompleteInitialBucket) { const std::string inputFileName{"testfiles/incomplete_initial_bucket.txt"}; const std::string configFileName{"testfiles/pop_sum_bytes_by_status_over_clientip.json"}; const char* logFile{"test.log"}; std::remove(logFile); BOOST_TEST_REQUIRE(ml::core::CLogger::instance().reconfigureFromFile( "testfiles/testLogErrors.boost.log.ini")); // Start by creating a detector with non-trivial state static const core_t::TTime BUCKET_SIZE{900}; static const std::string JOB_ID{"pop_sum_bytes_by_status_over_clientip"}; // Open the input and output files std::ifstream inputStrm{inputFileName.c_str()}; BOOST_TEST_REQUIRE(inputStrm.is_open()); std::ofstream outputStrm{core::COsFileFuncs::NULL_FILENAME}; BOOST_TEST_REQUIRE(outputStrm.is_open()); model::CLimits limits; api::CAnomalyJobConfig jobConfig; BOOST_TEST_REQUIRE(jobConfig.initFromFile(configFileName)); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE, model_t::E_None, "", 0, false); core::CJsonOutputStreamWrapper wrappedOutputStream{outputStrm}; std::string origSnapshotId; std::size_t numOrigDocs{0}; CTestAnomalyJob origJob{JOB_ID, limits, jobConfig, modelConfig, wrappedOutputStream, std::bind(&reportPersistComplete, std::placeholders::_1, std::ref(origSnapshotId), std::ref(numOrigDocs)), nullptr, -1, api::CAnomalyJob::DEFAULT_TIME_FIELD_NAME, api::CAnomalyJob::EMPTY_STRING}; api::CDataProcessor* firstProcessor{&origJob}; using TInputParserUPtr = std::unique_ptr<api::CInputParser>; const TInputParserUPtr parser{[&inputStrm]() -> TInputParserUPtr { return std::make_unique<api::CNdJsonInputParser>(inputStrm); }()}; BOOST_TEST_REQUIRE(parser->readStreamIntoMaps( [firstProcessor](const api::CDataProcessor::TStrStrUMap& dataRowFields) { return firstProcessor->handleRecord( dataRowFields, api::CDataProcessor::TOptionalTime{}); })); // Persist the detector state to a stringstream std::ostringstream* strm{nullptr}; api::CSingleStreamDataAdder::TOStreamP ptr{strm = new std::ostringstream()}; api::CSingleStreamDataAdder persister{ptr}; BOOST_TEST_REQUIRE(firstProcessor->persistStateInForeground(persister, "")); const std::string origPersistedState{strm->str()}; // restore the job and start the datafeed running in realtime std::string restoredSnapshotId; std::size_t numRestoredDocs{0}; CTestAnomalyJob restoredJob{ JOB_ID, limits, jobConfig, modelConfig, wrappedOutputStream, std::bind(&reportPersistComplete, std::placeholders::_1, std::ref(restoredSnapshotId), std::ref(numRestoredDocs))}; api::CDataProcessor* restoredFirstProcessor{&restoredJob}; core_t::TTime completeToTime{0}; auto restoredStrm = std::make_shared<boost::iostreams::filtering_istream>(); restoredStrm->push(api::CStateRestoreStreamFilter()); std::istringstream inputStream{origPersistedState}; restoredStrm->push(inputStream); api::CSingleStreamSearcher retriever{restoredStrm}; BOOST_TEST_REQUIRE(restoredFirstProcessor->restoreState(retriever, completeToTime)); BOOST_TEST_REQUIRE(completeToTime > 0); restoredJob.outputBucketResultsUntil(1585701000); // Revert to the default logger settings ml::core::CLogger::instance().reset(); std::ifstream log{logFile}; // Boost.Log only creates files when the first message is logged, // and here we're asserting no messages logged if (log.is_open()) { char line[256]; while (log.getline(line, 256)) { LOG_DEBUG(<< "Got '" << line << "'"); BOOST_TEST_REQUIRE(false); } log.close(); std::remove(logFile); } } BOOST_AUTO_TEST_CASE(testControlMessages) { { // Test control messages model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig( "metric", "value", "", "", "greenhouse"); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); CTestAnomalyJob::TStrStrUMap dataRows; dataRows["."] = " "; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_REQUIRE_EQUAL(uint64_t(0), job.numRecordsHandled()); dataRows["."] = "."; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_REQUIRE_EQUAL(uint64_t(0), job.numRecordsHandled()); dataRows["."] = "f"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_REQUIRE_EQUAL(uint64_t(0), job.numRecordsHandled()); dataRows["."] = "f1"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_REQUIRE_EQUAL(uint64_t(0), job.numRecordsHandled()); } { // Test reset bucket model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig("count", "", "", "", "greenhouse"); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); CTestAnomalyJob::TStrStrUMap dataRows; dataRows["value"] = "2.0"; dataRows["greenhouse"] = "rhubarb"; std::stringstream outputStrm; { core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); core_t::TTime time = 12345678; for (std::size_t i = 0; i < 50; i++, time += (BUCKET_SIZE / 2)) { std::stringstream ss; ss << time; dataRows["time"] = ss.str(); if (i == 40) { for (std::size_t j = 0; j < 100; j++) { BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); } } BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); if (i < 2) { // We haven't processed one full bucket but it should be safe to flush. dataRows["."] = "f1"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows.erase("."); } } } json::error_code ec; json::value results = json::parse(outputStrm.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(results.is_array()); const json::array& allRecords = results.as_array(); bool foundRecord = false; for (auto& r : allRecords) { BOOST_TEST_REQUIRE(r.is_object()); json::object::const_iterator recordsIt = r.as_object().find("records"); if (recordsIt != r.as_object().end()) { auto& recordsArray = recordsIt->value().as_array().at(0); const json::value* actualIt = recordsArray.as_object().if_contains("actual"); BOOST_TEST_REQUIRE(actualIt != nullptr); const json::array& values = actualIt->as_array(); LOG_DEBUG(<< "values: " << values); BOOST_REQUIRE_EQUAL(102.0, values[0].to_number<double>()); foundRecord = true; } } BOOST_TEST_REQUIRE(foundRecord); std::stringstream outputStrm2; { core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm2); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); core_t::TTime time = 12345678; for (std::size_t i = 0; i < 50; i++, time += (BUCKET_SIZE / 2)) { std::stringstream ss; ss << time; dataRows["time"] = ss.str(); if (i == 40) { for (std::size_t j = 0; j < 100; j++) { BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); } } BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); if (i == 40) { CTestAnomalyJob::TStrStrUMap rows; rows["."] = "r" + ss.str() + " " + ss.str(); BOOST_TEST_REQUIRE(job.handleRecord(rows)); for (std::size_t j = 0; j < 100; j++) { BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); } } } } json::value doc2_ = json::parse(outputStrm2.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(doc2_.is_array()); const json::value& allRecords2 = doc2_.as_array(); foundRecord = false; for (auto& r : allRecords2.as_array()) { json::object::const_iterator recordsIt = r.as_object().find("records"); if (recordsIt != r.as_object().end()) { auto& recordsArray = recordsIt->value().as_array().at(0); json::object::const_iterator actualIt = recordsArray.as_object().find("actual"); BOOST_TEST_REQUIRE(actualIt != recordsArray.as_object().end()); const json::array& values = actualIt->value().as_array(); BOOST_REQUIRE_EQUAL(101.0, values[0].to_number<double>()); foundRecord = true; } } BOOST_TEST_REQUIRE(foundRecord); } } BOOST_AUTO_TEST_CASE(testSkipTimeControlMessage) { model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig("count", "", "", "", ""); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); CTestAnomalyJob::TStrStrUMap dataRows; core_t::TTime time = 3600; for (std::size_t i = 0; i < 10; ++i, time += BUCKET_SIZE) { std::ostringstream ss; ss << time; dataRows["time"] = ss.str(); BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); } wrappedOutputStream.syncFlush(); BOOST_REQUIRE_EQUAL(9, countBuckets("bucket", outputStrm.str() + "]")); // Now let's skip time to Thursday, June 29, 2017 12:00:00 AM time = 1498694400; dataRows["."] = "s1498694400"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows.erase("."); // Check no new bucket results were written wrappedOutputStream.syncFlush(); BOOST_REQUIRE_EQUAL(9, countBuckets("bucket", outputStrm.str() + "]")); // Let's send a few buckets after skip time for (std::size_t i = 0; i < 3; ++i, time += BUCKET_SIZE) { std::ostringstream ss; ss << time; dataRows["time"] = ss.str(); BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); } // Assert only 2 new buckets were written wrappedOutputStream.syncFlush(); BOOST_REQUIRE_EQUAL(11, countBuckets("bucket", outputStrm.str() + "]")); } BOOST_AUTO_TEST_CASE(testIsPersistenceNeeded) { model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig("count", "", "", "", ""); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); { // check that persistence is not needed if no input records have been handled // and the time has not been advanced std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); BOOST_REQUIRE_EQUAL(false, job.isPersistenceNeeded("test state")); job.finalise(); wrappedOutputStream.syncFlush(); std::string output = outputStrm.str(); LOG_DEBUG(<< "Output has yielded: " << output); // check that no quantile state was persisted core::CRegex regex; regex.init("\n"); core::CRegex::TStrVec lines; regex.split(output, lines); BOOST_REQUIRE_EQUAL(false, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*", lines)); } core_t::TTime time = 3600; { // check that persistence is needed if an input record has been handled std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); CTestAnomalyJob::TStrStrUMap dataRows; std::ostringstream ss; ss << time; dataRows["time"] = ss.str(); BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_REQUIRE_EQUAL(true, job.isPersistenceNeeded("test state")); job.finalise(); wrappedOutputStream.syncFlush(); std::string output = outputStrm.str(); LOG_DEBUG(<< "Output has yielded: " << output); // check that the quantile state has actually been persisted core::CRegex regex; regex.init("\n"); core::CRegex::TStrVec lines; regex.split(output, lines); BOOST_REQUIRE_EQUAL(true, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*", lines)); } { // check that persistence is needed if time has been advanced (via a control message) // even if no input data has been handled std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); CTestAnomalyJob::TStrStrUMap dataRows; time = 39600; dataRows["."] = "t39600"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); BOOST_TEST_REQUIRE(job.isPersistenceNeeded("test state")); BOOST_REQUIRE_EQUAL(true, job.isPersistenceNeeded("test state")); job.finalise(); wrappedOutputStream.syncFlush(); std::string output = outputStrm.str(); LOG_DEBUG(<< "Output has yielded: " << output); // check that the quantile state has actually been persisted core::CRegex regex; regex.init("\n"); core::CRegex::TStrVec lines; regex.split(output, lines); BOOST_REQUIRE_EQUAL(true, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*", lines)); } } BOOST_AUTO_TEST_CASE(testModelPlot) { core_t::TTime bucketSize = 10000; model::CLimits limits; ml::api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig("mean", "value", "animal", "", ""); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(bucketSize, model_t::E_None, "", 0, false); modelConfig.modelPlotBoundsPercentile(1.0); std::stringstream outputStrm; { core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); CTestAnomalyJob::TStrStrUMap dataRows; dataRows["time"] = "10000000"; dataRows["value"] = "2.0"; dataRows["animal"] = "baboon"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["value"] = "5.0"; dataRows["animal"] = "shark"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["time"] = "10010000"; dataRows["value"] = "2.0"; dataRows["animal"] = "baboon"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["value"] = "5.0"; dataRows["animal"] = "shark"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["time"] = "10020000"; dataRows["value"] = "2.0"; dataRows["animal"] = "baboon"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["value"] = "5.0"; dataRows["animal"] = "shark"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["time"] = "10030000"; dataRows["value"] = "2.0"; dataRows["animal"] = "baboon"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["value"] = "5.0"; dataRows["animal"] = "shark"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["time"] = "10040000"; dataRows["value"] = "3.0"; dataRows["animal"] = "baboon"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["value"] = "5.0"; dataRows["animal"] = "shark"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); job.finalise(); } std::string output = outputStrm.str(); LOG_TRACE(<< "Output has yielded: " << output); core::CRegex regex; regex.init("\n"); core::CRegex::TStrVec lines; regex.split(output, lines); BOOST_TEST_REQUIRE(findLine("model_feature.*timestamp.*10000000.*baboon", lines)); BOOST_TEST_REQUIRE(findLine("model_feature.*timestamp.*10000000.*shark", lines)); BOOST_TEST_REQUIRE(findLine("model_feature.*timestamp.*10010000.*baboon", lines)); BOOST_TEST_REQUIRE(findLine("model_feature.*timestamp.*10010000.*shark", lines)); BOOST_TEST_REQUIRE(findLine("model_feature.*timestamp.*10020000.*baboon", lines)); BOOST_TEST_REQUIRE(findLine("model_feature.*timestamp.*10020000.*shark", lines)); BOOST_TEST_REQUIRE(findLine("model_feature.*timestamp.*10030000.*baboon", lines)); BOOST_TEST_REQUIRE(findLine("model_feature.*timestamp.*10030000.*shark", lines)); } BOOST_AUTO_TEST_CASE(testInterimResultEdgeCases) { const char* logFile = "test.log"; core_t::TTime bucketSize = 3600; model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig("count", "", "error", "", ""); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(bucketSize); std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); std::remove(logFile); BOOST_TEST_REQUIRE(ml::core::CLogger::instance().reconfigureFromFile( "testfiles/testLogErrors.boost.log.ini")); CTestAnomalyJob::TStrStrUMap dataRows; dataRows["time"] = "3610"; dataRows["error"] = "e1"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["time"] = "3670"; dataRows["error"] = "e2"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["time"] = "6820"; dataRows["error"] = "e1"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["time"] = "6820"; dataRows["error"] = "e1"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["time"] = "7850"; dataRows["error"] = "e1"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["time"] = "9310"; dataRows["error"] = "e2"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["."] = "t7200"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); dataRows["."] = "i"; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); // Revert to the default logger settings ml::core::CLogger::instance().reset(); std::ifstream log(logFile); // Boost.Log only creates files when the first message is logged, // and here we're asserting no messages logged if (log.is_open()) { char line[256]; while (log.getline(line, 256)) { LOG_DEBUG(<< "Got '" << line << "'"); BOOST_TEST_REQUIRE(false); } log.close(); std::remove(logFile); } } BOOST_AUTO_TEST_CASE(testRestoreFailsWithEmptyStream) { model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig("value", "", "", "", "greenhouse"); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); std::ostringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); core_t::TTime completeToTime(0); CEmptySearcher restoreSearcher; BOOST_TEST_REQUIRE(job.restoreState(restoreSearcher, completeToTime) == false); } BOOST_AUTO_TEST_CASE(testConfigUpdate) { // This, in part, is essentially replicating the DetectionRulesIT/testScope Java REST test. // It proves useful to have the test here too, as it provides an entrypoint for investigating // any issues related to filters, especially when updating them when already referenced by anomaly detector models. // We simply want to see the job run to completion. ml::api::CAnomalyJobConfig jobConfig; BOOST_REQUIRE_EQUAL(true, jobConfig.initFromFiles("testfiles/count_over_ip_config.json", "testfiles/filterConfig.json", "testfiles/eventConfig.json")); const ml::api::CAnomalyJobConfig::CAnalysisConfig& analysisConfig = jobConfig.analysisConfig(); model::CLimits limits; model::CAnomalyDetectorModelConfig modelConfig = analysisConfig.makeModelConfig(); std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); auto generateRandomAlpha = [](int strLen) { std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution dis(0, 25); std::string str; for (int i = 0; i < strLen; ++i) { str += char('a' + dis(gen)); } return str; }; long timestamp = 1509062400000L; TStrStrPrVec data; for (int bucket = 0; bucket < 20; bucket++) { for (int i = 0; i < 5; i++) { data.emplace_back(core::CStringUtils::typeToString(timestamp), generateRandomAlpha(10)); } timestamp += 3600 * 1000; } // Now send anomalous counts for our filtered IPs plus 333.333.333.333 auto namedIps = std::vector{"111.111.111.111", "222.222.222.222", "333.333.333.333"}; for (int i = 0; i < 10; i++) { for (auto& ip : namedIps) { data.emplace_back(core::CStringUtils::typeToString(timestamp), ip); } } for (int bucket = 0; bucket < 3; bucket++) { for (int i = 0; i < 5; i++) { data.emplace_back(core::CStringUtils::typeToString(timestamp), generateRandomAlpha(10)); } timestamp += 3600 * 1000; } CTestAnomalyJob::TStrStrUMap dataRows; for (const auto & [ time, ip ] : data) { dataRows["time"] = time; dataRows["ip"] = ip; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); } BOOST_REQUIRE_EQUAL(145, job.numRecordsHandled()); const std::string& detectorConfig1{R"( { "filters":[{"filter_id":"safe_ips", "items":["111.111.111.111","222.222.222.222"]}], "events":[{"description":"event_1", "rules":[{"actions":["skip_result","skip_model_update"],"conditions":[{"applies_to":"time","operator":"gte","value": 1.0},{"applies_to":"time","operator":"lt","value": 2.0}]}]}], "model_plot_config":{"enabled":true,"annotations_enabled":false}, "detector_rules":{"detector_index":0,"custom_rules":[{"actions":["skip_result"],"conditions":[{"applies_to":"actual","operator":"gte","value":15.0},{"applies_to":"actual","operator":"lte","value":30.0}]}]} } )"}; job.updateConfig(detectorConfig1); BOOST_REQUIRE_EQUAL(1, jobConfig.analysisConfig().detectionRules().size()); auto itr = jobConfig.analysisConfig().detectionRules().find(0); BOOST_REQUIRE_EQUAL(1, itr->second.size()); std::string rule{itr->second[0].print()}; BOOST_REQUIRE_EQUAL( std::string("SKIP_RESULT IF ACTUAL >= 15.000000 AND ACTUAL <= 30.000000"), rule); api::CAnomalyJobConfig::CModelPlotConfig& modelPlotConfig = jobConfig.modelPlotConfig(); BOOST_REQUIRE_EQUAL(false, modelPlotConfig.annotationsEnabled()); BOOST_REQUIRE_EQUAL(true, modelPlotConfig.enabled()); auto events = jobConfig.analysisConfig().scheduledEvents(); BOOST_REQUIRE_EQUAL(1, events.size()); BOOST_REQUIRE_EQUAL(std::string("event_1"), events[0].first); BOOST_REQUIRE_EQUAL(std::string("SKIP_RESULT AND SKIP_MODEL_UPDATE IF TIME >= 1.000000 AND TIME < 2.000000"), events[0].second.print()); auto ruleFilters = jobConfig.ruleFilters(); BOOST_REQUIRE_EQUAL(1, ruleFilters.size()); BOOST_REQUIRE_EQUAL(true, ruleFilters["safe_ips"].contains("111.111.111.111")); BOOST_REQUIRE_EQUAL(true, ruleFilters["safe_ips"].contains("222.222.222.222")); BOOST_REQUIRE_EQUAL(false, ruleFilters["safe_ips"].contains("333.333.333.333")); const std::string& detectorConfig2{R"( { "filters":[{"filter_id":"safe_ips", "items":["333.333.333.333"]}], "events":[{"description":"event_1", "rules":[{"actions":["skip_result","skip_model_update"],"conditions":[{"applies_to":"time","operator":"gte","value": 2.0},{"applies_to":"time","operator":"lt","value": 3.0}]}]}], "model_plot_config":{"enabled":false,"annotations_enabled":true}, "detector_rules":{"detector_index":0,"custom_rules":[{"actions":["skip_result"],"conditions":[{"applies_to":"typical","operator":"gte","value":10.0},{"applies_to":"typical","operator":"lte","value":50.0}]}]} })"}; job.updateConfig(detectorConfig2); data.clear(); // Send another anomalous bucket for (int i = 0; i < 10; i++) { for (auto& ip : namedIps) { data.emplace_back(core::CStringUtils::typeToString(timestamp), ip); } } // Some more normal buckets for (int bucket = 0; bucket < 3; bucket++) { for (int i = 0; i < 5; i++) { data.emplace_back(core::CStringUtils::typeToString(timestamp), generateRandomAlpha(10)); } timestamp += 3600 * 1000; } dataRows.clear(); for (const auto & [ time, ip ] : data) { dataRows["time"] = time; dataRows["ip"] = ip; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); } BOOST_REQUIRE_EQUAL(190, job.numRecordsHandled()); BOOST_REQUIRE_EQUAL(1, jobConfig.analysisConfig().detectionRules().size()); itr = jobConfig.analysisConfig().detectionRules().find(0); BOOST_REQUIRE_EQUAL(1, itr->second.size()); rule = itr->second[0].print(); BOOST_REQUIRE_EQUAL( std::string("SKIP_RESULT IF TYPICAL >= 10.000000 AND TYPICAL <= 50.000000"), rule); modelPlotConfig = jobConfig.modelPlotConfig(); BOOST_REQUIRE_EQUAL(true, modelPlotConfig.annotationsEnabled()); BOOST_REQUIRE_EQUAL(false, modelPlotConfig.enabled()); events = jobConfig.analysisConfig().scheduledEvents(); BOOST_REQUIRE_EQUAL(1, events.size()); BOOST_REQUIRE_EQUAL(std::string("event_1"), events[0].first); BOOST_REQUIRE_EQUAL(std::string("SKIP_RESULT AND SKIP_MODEL_UPDATE IF TIME >= 2.000000 AND TIME < 3.000000"), events[0].second.print()); ruleFilters = jobConfig.ruleFilters(); BOOST_REQUIRE_EQUAL(1, ruleFilters.size()); BOOST_REQUIRE_EQUAL(false, ruleFilters["safe_ips"].contains("111.111.111.111")); BOOST_REQUIRE_EQUAL(false, ruleFilters["safe_ips"].contains("222.222.222.222")); BOOST_REQUIRE_EQUAL(true, ruleFilters["safe_ips"].contains("333.333.333.333")); job.finalise(); wrappedOutputStream.syncFlush(); std::string output = outputStrm.str(); LOG_TRACE(<< "Output has yielded: " << output); // check that the quantile state has actually been persisted core::CRegex regex; regex.init("\n"); core::CRegex::TStrVec lines; regex.split(output, lines); BOOST_REQUIRE_EQUAL( true, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*", lines)); } BOOST_AUTO_TEST_CASE(testParsePersistControlMessageArgs) { { const ml::core_t::TTime expectedSnapshotTimestamp{1283524206}; const std::string expectedSnapshotId{"my_special_snapshot"}; const std::string expectedSnapshotDescription{ "Supplied description for snapshot at " + ml::core::CTimeUtils::toIso8601(expectedSnapshotTimestamp)}; std::ostringstream ostrm; ostrm << expectedSnapshotTimestamp << " " << expectedSnapshotId << " " << expectedSnapshotDescription; const std::string& validPersistControlMessage{ostrm.str()}; ml::core_t::TTime snapshotTimestamp; std::string snapshotId; std::string snapshotDescription; BOOST_TEST_REQUIRE(ml::api::CAnomalyJob::parsePersistControlMessageArgs( validPersistControlMessage, snapshotTimestamp, snapshotId, snapshotDescription)); BOOST_TEST_REQUIRE(expectedSnapshotTimestamp == snapshotTimestamp); BOOST_TEST_REQUIRE(expectedSnapshotId == snapshotId); BOOST_TEST_REQUIRE(expectedSnapshotDescription == snapshotDescription); } { const std::string invalidPersistControlMessage{ "junk_timestamp snapshot_id invalid snapshot control message"}; ml::core_t::TTime snapshotTimestamp; std::string snapshotId; std::string snapshotDescription; BOOST_TEST_REQUIRE(ml::api::CAnomalyJob::parsePersistControlMessageArgs( invalidPersistControlMessage, snapshotTimestamp, snapshotId, snapshotDescription) == false); } { const std::string invalidPersistControlMessage{" "}; ml::core_t::TTime snapshotTimestamp; std::string snapshotId; std::string snapshotDescription; BOOST_TEST_REQUIRE(ml::api::CAnomalyJob::parsePersistControlMessageArgs( invalidPersistControlMessage, snapshotTimestamp, snapshotId, snapshotDescription) == false); } { const std::string invalidPersistControlMessage; ml::core_t::TTime snapshotTimestamp; std::string snapshotId; std::string snapshotDescription; BOOST_TEST_REQUIRE(ml::api::CAnomalyJob::parsePersistControlMessageArgs( invalidPersistControlMessage, snapshotTimestamp, snapshotId, snapshotDescription) == false); } { const ml::core_t::TTime expectedSnapshotTimestamp{1283524206}; const std::string expectedSnapshotId{"my_special_snapshot"}; // Empty description is valid. const std::string expectedSnapshotDescription; std::ostringstream ostrm; ostrm << expectedSnapshotTimestamp << " " << expectedSnapshotId << " " << expectedSnapshotDescription; const std::string& validPersistControlMessage{ostrm.str()}; ml::core_t::TTime snapshotTimestamp; std::string snapshotId; std::string snapshotDescription; BOOST_TEST_REQUIRE(ml::api::CAnomalyJob::parsePersistControlMessageArgs( validPersistControlMessage, snapshotTimestamp, snapshotId, snapshotDescription) == true); } { const ml::core_t::TTime expectedSnapshotTimestamp{1283524206}; const std::string expectedSnapshotId; const std::string expectedSnapshotDescription; std::ostringstream ostrm; ostrm << expectedSnapshotTimestamp << " " << expectedSnapshotId << " " << expectedSnapshotDescription; const std::string& invalidPersistControlMessage{ostrm.str()}; ml::core_t::TTime snapshotTimestamp; std::string snapshotId; std::string snapshotDescription; BOOST_TEST_REQUIRE(ml::api::CAnomalyJob::parsePersistControlMessageArgs( invalidPersistControlMessage, snapshotTimestamp, snapshotId, snapshotDescription) == false); } { ml::core_t::TTime snapshotTimestamp; std::string snapshotId; std::string snapshotDescription; const std::string invalidPersistControlMessage{"invalid_control_message"}; BOOST_TEST_REQUIRE(ml::api::CAnomalyJob::parsePersistControlMessageArgs( invalidPersistControlMessage, snapshotTimestamp, snapshotId, snapshotDescription) == false); } } BOOST_AUTO_TEST_CASE(testRestoreFromBadState) { using TStrIntMap = std::map<std::string, int>; // map of names of state files to the number of times the fatal error message // "Failed to restore time series decomposition." occurs in the output TStrIntMap stateFiles{{"testfiles/badState1.json", 0}, {"testfiles/badState2.json", 0}, {"testfiles/badState3.json", 2}, {"testfiles/badState4.json", 1}}; for (const auto& stateFile : stateFiles) { // Open the input state file std::ifstream inputStrm(stateFile.first.c_str()); BOOST_TEST_REQUIRE(inputStrm.is_open()); std::string persistedState(std::istreambuf_iterator<char>{inputStrm}, std::istreambuf_iterator<char>{}); model::CLimits limits; api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig( "high_sum", "responsetime", "airline", "", ""); model::CAnomalyDetectorModelConfig modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); ml::core_t::TTime completeToTime(0); std::stringstream* output = new std::stringstream(); ml::api::CSingleStreamSearcher::TIStreamP strm(output); boost::iostreams::filtering_ostream in; in.push(ml::api::CStateRestoreStreamFilter()); in.push(*output); in << persistedState; in.flush(); ml::api::CSingleStreamSearcher restoreSearcher(strm); BOOST_TEST_REQUIRE(job.restoreState(restoreSearcher, completeToTime) == false); } } BOOST_AUTO_TEST_CASE(testHierarchicalResultsNormalizerShouldIncreaseMemoryUsage) { model::CLimits limits; auto jobConfig = CTestAnomalyJob::makeSimpleJobConfig("metric", "value", "", "", ""); auto modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); std::stringstream outputStrm; core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); CTestAnomalyJob::TStrStrUMap const dataRows = { {"time", "12345678"}, {"value", "1.0"}, {"greenhouse", "rhubarb"}}; BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); auto resourceMonitor = limits.resourceMonitor(); resourceMonitor.forceRefreshAll(); BOOST_TEST_REQUIRE(job.mutableNormalizer().memoryUsage() > 0); // Unregister the normalizer and check that memory usage decreases auto memoryUsageBeforeUnregister = resourceMonitor.totalMemory(); resourceMonitor.unRegisterComponent(job.mutableNormalizer()); resourceMonitor.forceRefreshAll(); BOOST_TEST_REQUIRE(resourceMonitor.totalMemory() < memoryUsageBeforeUnregister); } BOOST_AUTO_TEST_SUITE_END()