include/api/CForecastRunner.h (155 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #ifndef INCLUDED_ml_api_CForecastRunner_h #define INCLUDED_ml_api_CForecastRunner_h #include <core/CConcurrentWrapper.h> #include <core/CJsonOutputStreamWrapper.h> #include <core/CNonCopyable.h> #include <core/Constants.h> #include <core/CoreTypes.h> #include <maths/common/CModel.h> #include <model/CAnomalyDetector.h> #include <model/CForecastDataSink.h> #include <model/CResourceMonitor.h> #include <api/ImportExport.h> #include <boost/unordered_set.hpp> #include <atomic> #include <condition_variable> #include <cstdint> #include <functional> #include <list> #include <memory> #include <mutex> #include <string> #include <thread> #include <vector> namespace CForecastRunnerTest { struct testPopulation; struct testRare; struct testInsufficientData; struct testValidateDefaultExpiry; struct testValidateNoExpiry; struct testValidateInvalidExpiry; struct testValidateBrokenMessage; struct testValidateMissingId; struct testValidateProvidedMinDiskSpace; struct testValidateProvidedMaxMemoryLimit; struct testValidateProvidedTooLargeMaxMemoryLimit; struct testSufficientDiskSpace; } namespace ml { namespace api { //! \brief //! Forecast Worker to create forecasts of timeseries/ml models. //! //! DESCRIPTION:\n //! Executes forecast jobs async to the main thread //! //! IMPLEMENTATION DECISIONS:\n //! Uses only 1 thread as worker. //! //! The forecast runs in parallel to the main thread, this has //! various consequences: //! //! (math) models are cloned for forecasting at the time the request //! is made, as models would otherwise continue changing in the main //! thread. //! For the same reason, any field values are copied as they might get //! pruned in the main thread. //! Cloning also happens beforehand as the forecast job might hang in //! the queue for a while class API_EXPORT CForecastRunner final : private core::CNonCopyable { public: //! max open forecast requests //! if you change this, also change the ERROR_TOO_MANY_JOBS message accordingly static const std::size_t MAX_FORECAST_JOBS_IN_QUEUE = 3; //! default expiry time static const std::size_t DEFAULT_EXPIRY_TIME = 14 * core::constants::DAY; //! max memory allowed to use for forecast models //! (not defined inline because we need its address) static const std::size_t DEFAULT_MAX_FORECAST_MODEL_MEMORY; //! Note: This value measures the size in memory, not the size of the persistence, //! which is likely higher and would be hard to calculate upfront //! max memory allowed to use for forecast models persisting to disk static const std::size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000ull; // 500MB //! Note: This value is lower than in the ML Java code to prevent side-effects. //! If you change this value also change the limit in the ML Java code. //! The purpose of this value is to guard the rest of the system against //! running out of disk space. //! minimum disk space required for disk persistence //! (not defined inline because we need its address) static const std::size_t DEFAULT_MIN_FORECAST_AVAILABLE_DISK_SPACE; //! minimum time between stat updates to prevent to many updates in a short time static const std::uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000ul; // 3s private: static const std::string ERROR_FORECAST_REQUEST_FAILED_TO_PARSE; static const std::string ERROR_NO_FORECAST_ID; static const std::string ERROR_TOO_MANY_JOBS; static const std::string ERROR_NO_MODELS; static const std::string ERROR_NO_DATA_PROCESSED; static const std::string ERROR_NO_CREATE_TIME; static const std::string ERROR_BAD_MEMORY_STATUS; static const std::string ERROR_BAD_MODEL_MEMORY_LIMIT; static const std::string ERROR_MEMORY_LIMIT_DISK; static const std::string ERROR_MEMORY_LIMIT_DISKSPACE; static const std::string ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS; static const std::string ERROR_NO_SUPPORTED_FUNCTIONS; static const std::string WARNING_INVALID_EXPIRY; static const std::string INFO_DEFAULT_DURATION; static const std::string INFO_DEFAULT_EXPIRY; static const std::string INFO_NO_MODELS_CAN_CURRENTLY_BE_FORECAST; public: using TOStreamConcurrentWrapper = core::CConcurrentWrapper<std::ostream>; using TOStreamConcurrentWrapperPtr = std::shared_ptr<TOStreamConcurrentWrapper>; using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>; using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>; using TForecastModelWrapper = model::CForecastDataSink::CForecastModelWrapper; using TForecastResultSeries = model::CForecastDataSink::SForecastResultSeries; using TForecastResultSeriesVec = std::vector<TForecastResultSeries>; using TMathsModelPtr = std::unique_ptr<maths::common::CModel>; using TStrUSet = boost::unordered_set<std::string>; public: //! Initialize and start the forecast runner thread //! \p jobId The job ID //! \p strmOut The output stream to write forecast results to CForecastRunner(const std::string& jobId, core::CJsonOutputStreamWrapper& strmOut, model::CResourceMonitor& resourceMonitor); //! Destructor, cancels all queued forecast requests, finishes a running forecast. //! To finish all remaining forecasts call finishForecasts() first. ~CForecastRunner(); //! Enqueue a forecast job that will execute the requested forecast //! //! Parses and verifies the controlMessage and creates an internal job object which //! contains the required detectors (reference) as well as start and end date. //! The forecast itself isn't executed but might start later depending on the workers //! load. //! //! Validation fails if the message is invalid and/or the too many jobs are in the //! queue. //! //! \param controlMessage The control message retrieved. //! \param detectors vector of detectors (shallow copy) //! \return true if the forecast request passed validation bool pushForecastJob(const std::string& controlMessage, const TAnomalyDetectorPtrVec& detectors, const core_t::TTime lastResultsTime); //! Blocks and waits until all queued forecasts are done void finishForecasts(); //! Deletes all pending forecast requests void deleteAllForecastJobs(); private: struct API_EXPORT SForecast { SForecast() = default; SForecast(SForecast&& other) = default; SForecast& operator=(SForecast&& other) = default; SForecast(const SForecast& that) = delete; SForecast& operator=(const SForecast&) = delete; //! reset the struct, important to e.g. clean up reference counts void reset(); //! get the the end time core_t::TTime forecastEnd() const; //! The forecast ID std::string s_ForecastId; //! The forecast alias std::string s_ForecastAlias; //! Vector of models/series selected for forecasting (cloned for forecasting) TForecastResultSeriesVec s_ForecastSeries; //! Forecast create time core_t::TTime s_CreateTime{0}; //! Forecast start time core_t::TTime s_StartTime{0}; //! Forecast duration core_t::TTime s_Duration{0}; //! Expiration of the forecast (for automatic deletion) core_t::TTime s_ExpiryTime{0}; //! Forecast bounds double s_BoundsPercentile{maths::common::CModel::DEFAULT_BOUNDS_PERCENTILE}; //! total number of models std::size_t s_NumberOfModels{0}; //! total number of models able to forecast std::size_t s_NumberOfForecastableModels{0}; //! total memory required for this forecasting job (only the models) std::size_t s_MemoryUsage{0}; //! maximum allowed memory (in bytes) that this forecast can use std::size_t s_MaxForecastModelMemory{DEFAULT_MAX_FORECAST_MODEL_MEMORY}; //! minimum free disk space (in bytes) for a forecast to use disk std::size_t s_MinForecastAvailableDiskSpace{DEFAULT_MIN_FORECAST_AVAILABLE_DISK_SPACE}; //! A collection storing important messages from forecasting TStrUSet s_Messages; //! A directory to persist models on disk std::string s_TemporaryFolder; }; private: using TErrorFunc = std::function<void(const SForecast& forecastJob, const std::string& message)>; private: //! The worker loop void forecastWorker(); //! Check for new jobs, blocks while waiting bool tryGetJob(SForecast& forecastJob); //! pushes new jobs into the internal 'queue' (thread boundary) bool push(SForecast& forecastJob); //! send a scheduled message void sendScheduledMessage(const SForecast& forecastJob) const; //! send an error message void sendErrorMessage(const SForecast& forecastJob, const std::string& message) const; //! send a final message void sendFinalMessage(const SForecast& forecastJob, const std::string& message) const; //! send a message using \p write template<typename WRITE> void sendMessage(WRITE write, const SForecast& forecastJob, const std::string& message) const; //! Check for sufficient disk space. static bool sufficientAvailableDiskSpace(std::size_t minForecastAvailableDiskSpace, const char* path); //! parse and validate a forecast request and turn it into a forecast job static bool parseAndValidateForecastRequest( const std::string& controlMessage, SForecast& forecastJob, const core_t::TTime lastResultsTime, std::size_t jobBytesSizeLimit = std::numeric_limits<std::size_t>::max() / 2, const TErrorFunc& errorFunction = TErrorFunc()); private: //! This job ID std::string m_JobId; //! the output stream to write results to core::CJsonOutputStreamWrapper& m_ConcurrentOutputStream; //! The resource monitor by reference (owned by CAnomalyJob) //! note: we use the resource monitor only for checks at the moment model::CResourceMonitor& m_ResourceMonitor; //! thread for the worker std::thread m_Worker; //! indicator for worker std::atomic_bool m_Shutdown; //! The 'queue' of forecast jobs to be executed std::list<SForecast> m_ForecastJobs; //! Mutex std::mutex m_Mutex; //! Condition variable for the requests queue std::condition_variable m_WorkAvailableCondition; //! Condition variable for notifications on done requests std::condition_variable m_WorkCompleteCondition; friend struct CForecastRunnerTest::testPopulation; friend struct CForecastRunnerTest::testRare; friend struct CForecastRunnerTest::testInsufficientData; friend struct CForecastRunnerTest::testValidateDefaultExpiry; friend struct CForecastRunnerTest::testValidateNoExpiry; friend struct CForecastRunnerTest::testValidateInvalidExpiry; friend struct CForecastRunnerTest::testValidateBrokenMessage; friend struct CForecastRunnerTest::testValidateMissingId; friend struct CForecastRunnerTest::testValidateProvidedMinDiskSpace; friend struct CForecastRunnerTest::testValidateProvidedMaxMemoryLimit; friend struct CForecastRunnerTest::testValidateProvidedTooLargeMaxMemoryLimit; friend struct CForecastRunnerTest::testSufficientDiskSpace; }; } } #endif // INCLUDED_ml_api_CForecastRunner_h