lib/api/unittest/CMultiFileDataAdderTest.cc (199 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <core/CDataAdder.h> #include <core/CJsonOutputStreamWrapper.h> #include <core/COsFileFuncs.h> #include <core/CoreTypes.h> #include <maths/common/CModelWeight.h> #include <model/CAnomalyDetectorModelConfig.h> #include <model/CLimits.h> #include <api/CAnomalyJobConfig.h> #include <api/CCsvInputParser.h> #include <api/CJsonOutputWriter.h> #include <api/CNdJsonInputParser.h> #include <test/CMultiFileDataAdder.h> #include <test/CMultiFileSearcher.h> #include <test/CTestTmpDir.h> #include "CTestAnomalyJob.h" #include <boost/filesystem.hpp> #include <boost/test/unit_test.hpp> #include <fstream> #include <ios> #include <iterator> #include <memory> #include <string> #include <vector> BOOST_AUTO_TEST_SUITE(CMultiFileDataAdderTest) namespace { using TStrVec = std::vector<std::string>; void reportPersistComplete(ml::api::CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport, std::string& snapshotIdOut, size_t& numDocsOut) { LOG_INFO(<< "Persist complete with description: " << modelSnapshotReport.s_Description); snapshotIdOut = modelSnapshotReport.s_SnapshotId; numDocsOut = modelSnapshotReport.s_NumDocs; } void detectorPersistHelper(const std::string& configFileName, const std::string& inputFilename, int latencyBuckets, const std::string& timeFormat = std::string()) { // Start by creating a detector with non-trivial state static const ml::core_t::TTime BUCKET_SIZE(3600); static const std::string JOB_ID("job"); // Open the input and output files std::ifstream inputStrm(inputFilename); BOOST_TEST_REQUIRE(inputStrm.is_open()); std::ofstream outputStrm(ml::core::COsFileFuncs::NULL_FILENAME); BOOST_TEST_REQUIRE(outputStrm.is_open()); ml::core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); ml::model::CLimits limits; ml::api::CAnomalyJobConfig jobConfig; BOOST_TEST_REQUIRE(jobConfig.initFromFile(configFileName)); ml::model::CAnomalyDetectorModelConfig modelConfig = ml::model::CAnomalyDetectorModelConfig::defaultConfig( BUCKET_SIZE, ml::model_t::E_None, "", BUCKET_SIZE * latencyBuckets, false); std::string origSnapshotId; std::size_t numOrigDocs(0); CTestAnomalyJob origJob(JOB_ID, limits, jobConfig, modelConfig, wrappedOutputStream, std::bind(&reportPersistComplete, std::placeholders::_1, std::ref(origSnapshotId), std::ref(numOrigDocs)), nullptr, -1, "time", timeFormat); using TInputParserUPtr = std::unique_ptr<ml::api::CInputParser>; const TInputParserUPtr parser{[&inputFilename, &inputStrm]() -> TInputParserUPtr { if (inputFilename.rfind(".csv") == inputFilename.length() - 4) { return std::make_unique<ml::api::CCsvInputParser>(inputStrm); } return std::make_unique<ml::api::CNdJsonInputParser>(inputStrm); }()}; BOOST_TEST_REQUIRE(parser->readStreamIntoMaps( [&origJob](const CTestAnomalyJob::TStrStrUMap& dataRowFields) { return origJob.handleRecord(dataRowFields); })); // Persist the detector state to file(s) std::string baseOrigOutputFilename(ml::test::CTestTmpDir::tmpDir() + "/orig"); { // Clean up any leftovers of previous failures boost::filesystem::path origDir(baseOrigOutputFilename); BOOST_REQUIRE_NO_THROW(boost::filesystem::remove_all(origDir)); ml::test::CMultiFileDataAdder persister(baseOrigOutputFilename); BOOST_TEST_REQUIRE(origJob.persistStateInForeground(persister, "")); } std::string origBaseDocId(JOB_ID + '_' + CTestAnomalyJob::STATE_TYPE + '_' + origSnapshotId); std::string temp; TStrVec origFileContents(numOrigDocs); for (size_t index = 0; index < numOrigDocs; ++index) { std::string expectedOrigFilename(baseOrigOutputFilename); expectedOrigFilename += "/_index/"; expectedOrigFilename += ml::core::CDataAdder::makeCurrentDocId(origBaseDocId, 1 + index); expectedOrigFilename += ml::test::CMultiFileDataAdder::JSON_FILE_EXT; LOG_DEBUG(<< "Trying to open file: " << expectedOrigFilename); std::ifstream origFile(expectedOrigFilename); BOOST_TEST_REQUIRE(origFile.is_open()); std::string json((std::istreambuf_iterator<char>(origFile)), std::istreambuf_iterator<char>()); origFileContents[index] = json; // Ensure that the JSON is valid, by parsing string using boost::json json::error_code ec; json::value document = json::parse(origFileContents[index].c_str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(document.is_object()); } // Now restore the state into a different detector std::string restoredSnapshotId; std::size_t numRestoredDocs(0); CTestAnomalyJob restoredJob( JOB_ID, limits, jobConfig, modelConfig, wrappedOutputStream, std::bind(&reportPersistComplete, std::placeholders::_1, std::ref(restoredSnapshotId), std::ref(numRestoredDocs))); { ml::core_t::TTime completeToTime(0); ml::test::CMultiFileSearcher retriever(baseOrigOutputFilename, origBaseDocId); BOOST_TEST_REQUIRE(restoredJob.restoreState(retriever, completeToTime)); BOOST_TEST_REQUIRE(completeToTime > 0); } // Finally, persist the new detector state to a file std::string baseRestoredOutputFilename(ml::test::CTestTmpDir::tmpDir() + "/restored"); { // Clean up any leftovers of previous failures boost::filesystem::path restoredDir(baseRestoredOutputFilename); BOOST_REQUIRE_NO_THROW(boost::filesystem::remove_all(restoredDir)); ml::test::CMultiFileDataAdder persister(baseRestoredOutputFilename); BOOST_TEST_REQUIRE(restoredJob.persistStateInForeground(persister, "")); } std::string restoredBaseDocId(JOB_ID + '_' + CTestAnomalyJob::STATE_TYPE + '_' + restoredSnapshotId); for (size_t index = 0; index < numRestoredDocs; ++index) { std::string expectedRestoredFilename(baseRestoredOutputFilename); expectedRestoredFilename += "/_index/"; expectedRestoredFilename += ml::core::CDataAdder::makeCurrentDocId(restoredBaseDocId, 1 + index); expectedRestoredFilename += ml::test::CMultiFileDataAdder::JSON_FILE_EXT; std::ifstream restoredFile(expectedRestoredFilename); BOOST_TEST_REQUIRE(restoredFile.is_open()); std::string json((std::istreambuf_iterator<char>(restoredFile)), std::istreambuf_iterator<char>()); BOOST_REQUIRE_EQUAL(origFileContents[index], json); } // Clean up boost::filesystem::path origDir(baseOrigOutputFilename); BOOST_REQUIRE_NO_THROW(boost::filesystem::remove_all(origDir)); boost::filesystem::path restoredDir(baseRestoredOutputFilename); BOOST_REQUIRE_NO_THROW(boost::filesystem::remove_all(restoredDir)); } } BOOST_AUTO_TEST_CASE(testSimpleWrite) { static const std::string EVENT("Hello Event"); static const std::string SUMMARY_EVENT("Hello Summary Event"); static const std::string EXTENSION(".txt"); std::string baseOutputFilename(ml::test::CTestTmpDir::tmpDir() + "/filepersister"); std::string expectedFilename(baseOutputFilename); expectedFilename += "/_index/1"; expectedFilename += EXTENSION; { // Clean up any leftovers of previous failures boost::filesystem::path workDir(baseOutputFilename); BOOST_REQUIRE_NO_THROW(boost::filesystem::remove_all(workDir)); ml::test::CMultiFileDataAdder persister(baseOutputFilename, EXTENSION); ml::core::CDataAdder::TOStreamP strm = persister.addStreamed("1"); BOOST_TEST_REQUIRE(strm); (*strm) << EVENT; BOOST_TEST_REQUIRE(persister.streamComplete(strm, true)); } { std::ifstream persistedFile(expectedFilename); BOOST_TEST_REQUIRE(persistedFile.is_open()); std::string line; std::getline(persistedFile, line); BOOST_REQUIRE_EQUAL(EVENT, line); } BOOST_REQUIRE_EQUAL(0, ::remove(expectedFilename.c_str())); expectedFilename = baseOutputFilename; expectedFilename += "/_index/2"; expectedFilename += EXTENSION; { ml::test::CMultiFileDataAdder persister(baseOutputFilename, EXTENSION); ml::core::CDataAdder::TOStreamP strm = persister.addStreamed("2"); BOOST_TEST_REQUIRE(strm); (*strm) << SUMMARY_EVENT; BOOST_TEST_REQUIRE(persister.streamComplete(strm, true)); } { std::ifstream persistedFile(expectedFilename); BOOST_TEST_REQUIRE(persistedFile.is_open()); std::string line; std::getline(persistedFile, line); BOOST_REQUIRE_EQUAL(SUMMARY_EVENT, line); } // Clean up boost::filesystem::path workDir(baseOutputFilename); BOOST_REQUIRE_NO_THROW(boost::filesystem::remove_all(workDir)); } BOOST_AUTO_TEST_CASE(testDetectorPersistBy) { detectorPersistHelper("testfiles/new_mlfields.json", "testfiles/big_ascending.txt", 0, "%d/%b/%Y:%T %z"); } BOOST_AUTO_TEST_CASE(testDetectorPersistOver) { detectorPersistHelper("testfiles/new_mlfields_over.json", "testfiles/big_ascending.txt", 0, "%d/%b/%Y:%T %z"); } BOOST_AUTO_TEST_CASE(testDetectorPersistPartition) { detectorPersistHelper("testfiles/new_mlfields_partition.json", "testfiles/big_ascending.txt", 0, "%d/%b/%Y:%T %z"); } BOOST_AUTO_TEST_CASE(testDetectorPersistDc) { detectorPersistHelper("testfiles/new_persist_dc.json", "testfiles/files_users_programs.csv", 5); } BOOST_AUTO_TEST_CASE(testDetectorPersistCount) { detectorPersistHelper("testfiles/new_persist_count.json", "testfiles/files_users_programs.csv", 5); } BOOST_AUTO_TEST_SUITE_END()