lib/api/dump_state/Main.cc (271 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ //! \brief //! Creates model state files for Anomaly Detectors, Categorizers and the //! Normalizer quantiles. The resulting files can then be used in backwards //! compatibility tests. //! //! DESCRIPTION:\n //! The state files produced by this program are written to the //! ../unittest/testfiles/state/$VERSION directory and can be used //! by the CRestorePreviousStateTest. Some detectors are configured //! with non-zero latency buckets the same latency buckets value //! should be used in CRestorePreviousStateTest. //! //! //! IMPLEMENTATION DECISIONS:\n //! //! #include <core/CJsonOutputStreamWrapper.h> #include <core/CLogger.h> #include <core/COsFileFuncs.h> #include <core/CRegex.h> #include <core/CoreTypes.h> #include <ver/CBuildInfo.h> #include <model/CAnomalyDetectorModelConfig.h> #include <model/CLimits.h> #include <api/CAnomalyJob.h> #include <api/CAnomalyJobConfig.h> #include <api/CCsvInputParser.h> #include <api/CFieldDataCategorizer.h> #include <api/CJsonOutputWriter.h> #include <api/CModelSnapshotJsonWriter.h> #include <api/CNdJsonInputParser.h> #include <api/CSingleStreamDataAdder.h> #include <api/CSingleStreamSearcher.h> #include <boost/filesystem.hpp> #include <boost/program_options.hpp> #include <cstdlib> #include <fstream> #include <iostream> #include <memory> #include <string> #include <vector> static std::string persistedNormalizerState; static std::vector<std::string> persistedStateFiles; static const std::string TEST_FILES_PATH{"../unittest/testfiles/"}; bool parseOptions(int argc, const char* const* argv, std::string& outputDir) { try { boost::program_options::options_description desc( "Utility for creating ML model state files for use in BWC tests"); desc.add_options()("help", "Display this information and exit")( "outputDir", boost::program_options::value<std::string>(), "Optional directory to write state files to"); boost::program_options::variables_map vm; boost::program_options::parsed_options parsed = boost::program_options::command_line_parser(argc, argv) .options(desc) .run(); boost::program_options::store(parsed, vm); if (vm.count("help") > 0) { std::cerr << desc << std::endl; return false; } if (vm.count("outputDir") > 0) { outputDir = vm["outputDir"].as<std::string>(); if (outputDir.empty()) { std::cerr << "Error processing command line: outputDir is an empty string" << std::endl; return false; } if (outputDir.back() != '/') { outputDir.push_back('/'); } } return true; } catch (std::exception& e) { std::cerr << "Error processing command line: " << e.what() << std::endl; return false; } } std::string versionNumber() { ml::core::CRegex regex; regex.init("\\d\\.\\d\\.\\d"); std::string longVersion = ml::ver::CBuildInfo::versionNumber(); std::size_t pos; std::string version; if (regex.search(longVersion, pos)) { version = longVersion.substr(pos, 5); } return version; } void reportPersistComplete(ml::api::CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport) { LOG_INFO(<< "Persist complete with description: " << modelSnapshotReport.s_Description); persistedNormalizerState = modelSnapshotReport.s_NormalizerState; } bool writeNormalizerState(const std::string& outputFileName) { std::ofstream out(outputFileName); if (!out.is_open()) { LOG_ERROR(<< "Failed to open normalizer state output file " << outputFileName); return false; } out << persistedNormalizerState; out.close(); persistedStateFiles.push_back(outputFileName); return true; } bool persistCategorizerStateToFile(const std::string& outputFileName, const std::string& timeFormat = std::string()) { ml::model::CLimits limits(true); const std::string jsonConfig{ "{\"job_id\":\"new_ml_fields\",\"analysis_config\":{\"detectors\":[{\"function\":\"count\",\"by_field_name\":\"mlcategory\"}]}}"}; ml::api::CAnomalyJobConfig jobConfig; if (jobConfig.parse(jsonConfig) == false) { LOG_ERROR(<< "Failed to parse json: \"" << jsonConfig << "\""); return false; } std::ofstream outStream(ml::core::COsFileFuncs::NULL_FILENAME); ml::core::CJsonOutputStreamWrapper wrappedOutStream(outStream); ml::api::CFieldDataCategorizer categorizer{ "job", jobConfig.analysisConfig(), limits, "time", timeFormat, nullptr, wrappedOutStream, nullptr, false}; ml::api::CFieldDataCategorizer::TStrStrUMap dataRowFields; dataRowFields["_raw"] = "thing"; dataRowFields["two"] = "other"; categorizer.handleRecord(dataRowFields, ml::api::CFieldDataCategorizer::TOptionalTime{}); // Persist the categorizer state to file { std::ofstream* out = nullptr; ml::api::CSingleStreamDataAdder::TOStreamP ptr(out = new std::ofstream(outputFileName)); if (!out->is_open()) { LOG_ERROR(<< "Failed to open categorizer state output file " << outputFileName); return false; } ml::api::CSingleStreamDataAdder persister(ptr); if (!categorizer.persistStateInForeground(persister, "State persisted due to job close at ")) { LOG_ERROR(<< "Error persisting state to " << outputFileName); return false; } } persistedStateFiles.push_back(outputFileName); return true; } bool persistAnomalyDetectorStateToFile(const std::string& configFileName, const std::string& inputFilename, const std::string& outputFileName, int latencyBuckets, const std::string& timeFormat = std::string()) { // Open the input and output files std::ifstream inputStrm(inputFilename); if (!inputStrm.is_open()) { LOG_ERROR(<< "Cannot open input file " << inputFilename); return false; } std::ofstream outputStrm(ml::core::COsFileFuncs::NULL_FILENAME); ml::core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); ml::model::CLimits limits(true); ml::api::CAnomalyJobConfig jobConfig; if (!jobConfig.initFromFile(configFileName)) { LOG_ERROR(<< "Failed to init field config from " << configFileName); return false; } ml::core_t::TTime bucketSize(3600); std::string jobId("foo"); ml::model::CAnomalyDetectorModelConfig modelConfig = ml::model::CAnomalyDetectorModelConfig::defaultConfig( bucketSize, ml::model_t::E_None, "", bucketSize * latencyBuckets, false); ml::api::CAnomalyJob origJob(jobId, limits, jobConfig, modelConfig, wrappedOutputStream, std::bind(&reportPersistComplete, std::placeholders::_1), nullptr, -1, "time", timeFormat, 0); using TInputParserUPtr = std::unique_ptr<ml::api::CInputParser>; const TInputParserUPtr parser{[&inputFilename, &inputStrm]() -> TInputParserUPtr { if (inputFilename.rfind(".csv") == inputFilename.length() - 4) { return std::make_unique<ml::api::CCsvInputParser>(inputStrm); } return std::make_unique<ml::api::CNdJsonInputParser>(inputStrm); }()}; if (parser->readStreamIntoMaps([&origJob](const ml::api::CAnomalyJob::TStrStrUMap& dataRowFields) { return origJob.handleRecord(dataRowFields, ml::api::CAnomalyJob::TOptionalTime{}); }) == false) { LOG_ERROR(<< "Failed to processs input"); return false; } // Persist the job state to file { std::ofstream* out = nullptr; ml::api::CSingleStreamDataAdder::TOStreamP ptr(out = new std::ofstream(outputFileName)); if (!out->is_open()) { LOG_ERROR(<< "Failed to open state output file " << outputFileName); return false; } ml::api::CSingleStreamDataAdder persister(ptr); if (!origJob.persistStateInForeground(persister, "State persisted due to job close at ")) { LOG_ERROR(<< "Error persisting state to " << outputFileName); return false; } } persistedStateFiles.push_back(outputFileName); return true; } bool persistByDetector(const std::string& stateFilesPath) { return persistAnomalyDetectorStateToFile( TEST_FILES_PATH + "new_mlfields.json", TEST_FILES_PATH + "big_ascending.txt", stateFilesPath + "by_detector_state.json", 0, "%d/%b/%Y:%T %z"); } bool persistOverDetector(const std::string& stateFilesPath) { return persistAnomalyDetectorStateToFile( TEST_FILES_PATH + "new_mlfields_over.json", TEST_FILES_PATH + "big_ascending.txt", stateFilesPath + "over_detector_state.json", 0, "%d/%b/%Y:%T %z"); } bool persistPartitionDetector(const std::string& stateFilesPath) { return persistAnomalyDetectorStateToFile( TEST_FILES_PATH + "new_mlfields_partition.json", TEST_FILES_PATH + "big_ascending.txt", stateFilesPath + "partition_detector_state.json", 0, "%d/%b/%Y:%T %z"); } bool persistDcDetector(const std::string& stateFilesPath) { return persistAnomalyDetectorStateToFile( TEST_FILES_PATH + "new_persist_dc.json", TEST_FILES_PATH + "files_users_programs.csv", stateFilesPath + "dc_detector_state.json", 5); } bool persistCountDetector(const std::string& stateFilesPath) { return persistAnomalyDetectorStateToFile( TEST_FILES_PATH + "new_persist_count.json", TEST_FILES_PATH + "files_users_programs.csv", stateFilesPath + "count_detector_state.json", 5); } int main(int argc, char** argv) { std::string stateFilesPath; if (parseOptions(argc, argv, stateFilesPath) == false) { return EXIT_FAILURE; } ml::core::CLogger::instance().setLoggingLevel(ml::core::CLogger::E_Info); std::string version = versionNumber(); if (version.empty()) { LOG_ERROR(<< "Cannot get version number"); return EXIT_FAILURE; } if (stateFilesPath.empty()) { // The outputDir argument wasn't set, use the default path stateFilesPath = TEST_FILES_PATH + "state/" + version + "/"; } boost::system::error_code errorCode; boost::filesystem::create_directories(stateFilesPath, errorCode); if (errorCode) { LOG_ERROR(<< "Failed to create directory " << stateFilesPath << ", error: " << errorCode.message()); return EXIT_FAILURE; } LOG_INFO(<< "Saving model state for version: " << version << " to directory: " << stateFilesPath); bool persisted = persistByDetector(stateFilesPath); if (!persisted) { LOG_ERROR(<< "Failed to persist state for by detector"); return EXIT_FAILURE; } if (persistedNormalizerState.empty()) { LOG_ERROR(<< "Normalizer state not persisted"); return EXIT_FAILURE; } if (!writeNormalizerState(stateFilesPath + "normalizer_state.json")) { LOG_ERROR(<< "Error writing normalizer state file"); return EXIT_FAILURE; } persisted = persistOverDetector(stateFilesPath); if (!persisted) { LOG_ERROR(<< "Failed to persist state for over detector"); return EXIT_FAILURE; } persisted = persistPartitionDetector(stateFilesPath); if (!persisted) { LOG_ERROR(<< "Failed to persist state for partition detector"); return EXIT_FAILURE; } persisted = persistDcDetector(stateFilesPath); if (!persisted) { LOG_ERROR(<< "Failed to persist state for DC detector"); return EXIT_FAILURE; } persisted = persistCountDetector(stateFilesPath); if (!persisted) { LOG_ERROR(<< "Failed to persist state for count detector"); return EXIT_FAILURE; } persisted = persistCategorizerStateToFile(stateFilesPath + "categorizer_state.json"); if (!persisted) { LOG_ERROR(<< "Failed to persist categorizer state"); return EXIT_FAILURE; } LOG_INFO(<< "Written state files:"); for (const auto& stateFile : persistedStateFiles) { LOG_INFO(<< "\t" << stateFile); } return EXIT_SUCCESS; }