lib/api/CForecastRunner.cc (403 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <api/CForecastRunner.h> #include <core/CLogger.h> #include <core/CStopWatch.h> #include <core/CTimeUtils.h> #include <model/CForecastDataSink.h> #include <model/CForecastModelPersist.h> #include <model/ModelTypes.h> #include <boost/filesystem.hpp> #include <boost/property_tree/json_parser.hpp> #include <boost/system/error_code.hpp> #include <sstream> namespace ml { namespace api { namespace { bool sufficientAvailableDiskSpaceForPath(std::size_t minForecastAvailableDiskSpace, const boost::filesystem::path& path) { boost::system::error_code errorCode; auto spaceInfo = boost::filesystem::space(path, errorCode); if (errorCode) { LOG_ERROR(<< "Failed to retrieve disk information for " << path << " error " << errorCode.message()); return false; } if (spaceInfo.available < minForecastAvailableDiskSpace) { LOG_WARN(<< "Checked disk space for " << path << " - required: " << minForecastAvailableDiskSpace << ", available: " << spaceInfo.available); return false; } return true; } const std::string EMPTY_STRING; } const std::size_t CForecastRunner::DEFAULT_MAX_FORECAST_MODEL_MEMORY{20971520}; // 20MB const std::size_t CForecastRunner::DEFAULT_MIN_FORECAST_AVAILABLE_DISK_SPACE{4294967296ull}; // 4GB const std::string CForecastRunner::ERROR_FORECAST_REQUEST_FAILED_TO_PARSE("Failed to parse forecast request: "); const std::string CForecastRunner::ERROR_NO_FORECAST_ID("forecast ID must be specified and non empty"); const std::string CForecastRunner::ERROR_TOO_MANY_JOBS("Forecast cannot be executed due to queue limit. Please wait for requests to finish and try again"); const std::string CForecastRunner::ERROR_NO_MODELS("Forecast cannot be executed as model is not yet established. Job requires more time to learn"); const std::string CForecastRunner::ERROR_NO_DATA_PROCESSED( "Forecast cannot be executed as job requires data to have been processed and modeled"); const std::string CForecastRunner::ERROR_NO_CREATE_TIME("Forecast create time must be specified and non zero"); const std::string CForecastRunner::ERROR_BAD_MEMORY_STATUS("Forecast cannot be executed as model memory status is not OK"); const std::string CForecastRunner::ERROR_BAD_MODEL_MEMORY_LIMIT( "Forecast max_model_memory must be below 500MB and must not exceed 40% of the job's configured model memory limit."); const std::string CForecastRunner::ERROR_MEMORY_LIMIT_DISK( "Forecast cannot be executed as forecast memory usage is predicted to exceed 500MB"); const std::string CForecastRunner::ERROR_MEMORY_LIMIT_DISKSPACE( "Forecast cannot be executed as models exceed internal memory limit and available disk space is insufficient"); const std::string CForecastRunner::ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS("Forecast is not supported for population analysis"); const std::string CForecastRunner::ERROR_NO_SUPPORTED_FUNCTIONS("Forecast is not supported for the used functions"); const std::string CForecastRunner::WARNING_INVALID_EXPIRY("Forecast expires_in invalid, setting to 14 days"); const std::string CForecastRunner::INFO_DEFAULT_DURATION("Forecast duration not specified, setting to 1 day"); const std::string CForecastRunner::INFO_DEFAULT_EXPIRY("Forecast expires_in not specified, setting to 14 days"); const std::string CForecastRunner::INFO_NO_MODELS_CAN_CURRENTLY_BE_FORECAST("Insufficient history to forecast for all models"); CForecastRunner::CForecastRunner(const std::string& jobId, core::CJsonOutputStreamWrapper& strmOut, model::CResourceMonitor& resourceMonitor) : m_JobId{jobId}, m_ConcurrentOutputStream{strmOut}, m_ResourceMonitor{resourceMonitor}, m_Shutdown{false} { m_Worker = std::thread([this] { this->forecastWorker(); }); } CForecastRunner::~CForecastRunner() { // shutdown m_Shutdown.store(true); // signal the worker { std::unique_lock<std::mutex> lock(m_Mutex); m_WorkAvailableCondition.notify_all(); } m_Worker.join(); } void CForecastRunner::finishForecasts() { std::unique_lock<std::mutex> lock(m_Mutex); // note: forecast could still be active while (m_Shutdown.load() == false && m_ForecastJobs.empty() == false) { // items in the queue, wait m_WorkCompleteCondition.wait(lock); } } void CForecastRunner::forecastWorker() { SForecast forecastJob; while (m_Shutdown.load() == false) { if (this->tryGetJob(forecastJob)) { LOG_INFO(<< "Start forecasting from " << core::CTimeUtils::toIso8601(forecastJob.s_StartTime) << " to " << core::CTimeUtils::toIso8601(forecastJob.forecastEnd())); core::CStopWatch timer(true); std::uint64_t lastStatsUpdate = 0; LOG_TRACE(<< "about to create sink"); model::CForecastDataSink sink( m_JobId, forecastJob.s_ForecastId, forecastJob.s_ForecastAlias, forecastJob.s_CreateTime, forecastJob.s_StartTime, forecastJob.forecastEnd(), forecastJob.s_ExpiryTime, forecastJob.s_MemoryUsage, m_ConcurrentOutputStream); std::string message; // collecting the runtime messages first and sending it in 1 go TStrUSet messages(forecastJob.s_Messages); double processedModels = 0; double totalNumberOfForecastableModels = static_cast<double>(forecastJob.s_NumberOfForecastableModels); std::size_t failedForecasts = 0; sink.writeStats(0.0, 0, forecastJob.s_Messages); // while loops allow us to free up memory for every model right after each forecast is done while (!forecastJob.s_ForecastSeries.empty()) { TForecastResultSeries& series = forecastJob.s_ForecastSeries.back(); std::unique_ptr<model::CForecastModelPersist::CRestore> modelRestore; // initialize persistence restore exactly once if (!series.s_ToForecastPersisted.empty()) { modelRestore = std::make_unique<model::CForecastModelPersist::CRestore>( series.s_ModelParams, series.s_MinimumSeasonalVarianceScale, series.s_ToForecastPersisted); } while (series.s_ToForecast.empty() == false || modelRestore != nullptr) { // check if we should backfill from persistence if (series.s_ToForecast.empty()) { TMathsModelPtr model; core_t::TTime firstDataTime; core_t::TTime lastDataTime; model_t::EFeature feature; std::string byFieldValue; if (modelRestore->nextModel(model, firstDataTime, lastDataTime, feature, byFieldValue)) { series.s_ToForecast.emplace_back( feature, byFieldValue, std::move(model), firstDataTime, lastDataTime); } else { // restorer exhausted, no need for further restoring modelRestore.reset(); break; } } const TForecastModelWrapper& model{series.s_ToForecast.back()}; bool success{model.forecast( series, forecastJob.s_StartTime, forecastJob.forecastEnd(), forecastJob.s_BoundsPercentile, sink, message)}; series.s_ToForecast.pop_back(); if (success == false) { LOG_DEBUG(<< "Detector " << series.s_DetectorIndex << " failed to forecast"); ++failedForecasts; } if (message.empty() == false) { messages.insert("Detector[" + std::to_string(series.s_DetectorIndex) + "]: " + message); message.clear(); } ++processedModels; if (processedModels != totalNumberOfForecastableModels) { std::uint64_t elapsedTime = timer.lap(); if (elapsedTime - lastStatsUpdate > MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE) { sink.writeStats(processedModels / totalNumberOfForecastableModels, elapsedTime, forecastJob.s_Messages); lastStatsUpdate = elapsedTime; } } } forecastJob.s_ForecastSeries.pop_back(); } // write final message sink.writeStats(1.0, timer.stop(), messages, failedForecasts != forecastJob.s_NumberOfForecastableModels); // important: reset the structure to decrease shared pointer reference counts forecastJob.reset(); LOG_INFO(<< "Finished forecasting, wrote " << sink.numRecordsWritten() << " records"); // signal that job is done m_WorkCompleteCondition.notify_all(); // cleanup if (!forecastJob.s_TemporaryFolder.empty()) { boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder); boost::system::error_code errorCode; boost::filesystem::remove_all(temporaryFolder, errorCode); if (errorCode) { // not an error: there is also cleanup code on the Java side LOG_WARN(<< "Failed to cleanup temporary data from: " << forecastJob.s_TemporaryFolder << " error " << errorCode.message()); } } } } // clear any queued forecast jobs (paranoia, this should not happen) this->deleteAllForecastJobs(); } void CForecastRunner::deleteAllForecastJobs() { std::unique_lock<std::mutex> lock(m_Mutex); m_ForecastJobs.clear(); m_WorkAvailableCondition.notify_all(); } bool CForecastRunner::tryGetJob(SForecast& forecastJob) { std::unique_lock<std::mutex> lock(m_Mutex); if (!m_ForecastJobs.empty()) { std::swap(forecastJob, m_ForecastJobs.front()); m_ForecastJobs.pop_front(); return true; } // m_Shutdown might have been set meanwhile if (m_Shutdown.load()) { return false; } m_WorkAvailableCondition.wait(lock); return false; } bool CForecastRunner::pushForecastJob(const std::string& controlMessage, const TAnomalyDetectorPtrVec& detectors, const core_t::TTime lastResultsTime) { SForecast forecastJob; if (parseAndValidateForecastRequest( controlMessage, forecastJob, lastResultsTime, m_ResourceMonitor.getBytesMemoryLimit(), std::bind(&CForecastRunner::sendErrorMessage, this, std::placeholders::_1, std::placeholders::_2)) == false) { return false; } if (m_ResourceMonitor.memoryStatus() != model_t::E_MemoryStatusOk) { this->sendErrorMessage(forecastJob, ERROR_BAD_MEMORY_STATUS); return false; } std::size_t totalNumberOfModels = 0; std::size_t totalNumberOfForecastModels = 0; bool atLeastOneNonPopulationModel = false; bool atLeastOneSupportedFunction = false; std::size_t totalMemoryUsage = 0; // 1st loop over the detectors to check prerequisites for (const auto& detector : detectors) { if (detector.get() == nullptr) { LOG_ERROR(<< "Unexpected empty detector found"); continue; } model::CForecastDataSink::SForecastModelPrerequisites prerequisites = detector->getForecastPrerequisites(); totalNumberOfModels += prerequisites.s_NumberOfModels; totalNumberOfForecastModels += prerequisites.s_NumberOfForecastableModels; atLeastOneNonPopulationModel = atLeastOneNonPopulationModel || !prerequisites.s_IsPopulation; atLeastOneSupportedFunction = atLeastOneSupportedFunction || prerequisites.s_IsSupportedFunction; totalMemoryUsage += prerequisites.s_MemoryUsageForDetector; if (totalMemoryUsage >= forecastJob.s_MaxForecastModelMemory && forecastJob.s_TemporaryFolder.empty()) { this->sendErrorMessage( forecastJob, "Forecast cannot be executed as forecast memory usage is predicted to exceed " + std::to_string(forecastJob.s_MaxForecastModelMemory) + " bytes while disk space is exceeded"); return false; } } if (totalMemoryUsage >= MAX_FORECAST_MODEL_PERSISTANCE_MEMORY) { this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISK); return false; } if (atLeastOneNonPopulationModel == false) { this->sendErrorMessage(forecastJob, ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS); return false; } if (atLeastOneSupportedFunction == false) { this->sendErrorMessage(forecastJob, ERROR_NO_SUPPORTED_FUNCTIONS); return false; } if (totalNumberOfForecastModels == 0) { this->sendFinalMessage(forecastJob, INFO_NO_MODELS_CAN_CURRENTLY_BE_FORECAST); return false; } forecastJob.s_NumberOfModels = totalNumberOfModels; forecastJob.s_NumberOfForecastableModels = totalNumberOfForecastModels; forecastJob.s_MemoryUsage = totalMemoryUsage; // send a notification that job has been scheduled this->sendScheduledMessage(forecastJob); // 2nd loop over the detectors to clone models for forecasting bool persistOnDisk = false; if (totalMemoryUsage >= forecastJob.s_MaxForecastModelMemory) { boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder); if (sufficientAvailableDiskSpaceForPath(forecastJob.s_MinForecastAvailableDiskSpace, temporaryFolder) == false) { this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISKSPACE); return false; } LOG_WARN(<< "Forecast [" << forecastJob.s_ForecastId << "] memory usage exceeds configured byte limit [" << std::to_string(forecastJob.s_MaxForecastModelMemory) << "] (requires " << std::to_string(1 + (totalMemoryUsage >> 20)) << " MB), using disk."); // create a subdirectory using the unique forecast id temporaryFolder /= forecastJob.s_ForecastId; forecastJob.s_TemporaryFolder = temporaryFolder.string(); boost::system::error_code errorCode; boost::filesystem::create_directories(temporaryFolder, errorCode); if (errorCode) { this->sendErrorMessage( forecastJob, "Forecast internal error, failed to create temporary folder " + temporaryFolder.string() + " error: " + errorCode.message()); return false; } LOG_DEBUG(<< "Persisting to: " << temporaryFolder.string()); persistOnDisk = true; } else { forecastJob.s_TemporaryFolder.clear(); } for (const auto& detector : detectors) { if (detector.get() == nullptr) { LOG_ERROR(<< "Unexpected empty detector found"); continue; } forecastJob.s_ForecastSeries.emplace_back(detector->getForecastModels( persistOnDisk, forecastJob.s_TemporaryFolder)); } return this->push(forecastJob); } bool CForecastRunner::push(SForecast& forecastJob) { std::unique_lock<std::mutex> lock(m_Mutex); if (m_ForecastJobs.size() == MAX_FORECAST_JOBS_IN_QUEUE) { this->sendErrorMessage(forecastJob, ERROR_TOO_MANY_JOBS); return false; } if (forecastJob.s_NumberOfModels == 0) { this->sendErrorMessage(forecastJob, ERROR_NO_MODELS); return false; } m_ForecastJobs.push_back(std::move(forecastJob)); lock.unlock(); m_WorkAvailableCondition.notify_all(); return true; } bool CForecastRunner::parseAndValidateForecastRequest(const std::string& controlMessage, SForecast& forecastJob, const core_t::TTime lastResultsTime, std::size_t jobBytesSizeLimit, const TErrorFunc& errorFunction) { std::istringstream stringStream(controlMessage.substr(1)); forecastJob.s_StartTime = lastResultsTime; core_t::TTime expiresIn = 0; boost::property_tree::ptree properties; try { boost::property_tree::read_json(stringStream, properties); forecastJob.s_ForecastId = properties.get<std::string>("forecast_id", EMPTY_STRING); forecastJob.s_ForecastAlias = properties.get<std::string>("forecast_alias", EMPTY_STRING); forecastJob.s_Duration = properties.get<core_t::TTime>("duration", 0); forecastJob.s_CreateTime = properties.get<core_t::TTime>("create_time", 0); forecastJob.s_MaxForecastModelMemory = properties.get<std::size_t>( "max_model_memory", DEFAULT_MAX_FORECAST_MODEL_MEMORY); forecastJob.s_MinForecastAvailableDiskSpace = properties.get<std::size_t>( "min_available_disk_space", DEFAULT_MIN_FORECAST_AVAILABLE_DISK_SPACE); // tmp storage if available forecastJob.s_TemporaryFolder = properties.get<std::string>("tmp_storage", EMPTY_STRING); // use -1 as default to allow 0 as 'never expires' expiresIn = properties.get<core_t::TTime>("expires_in", -1l); // note: this is not exposed on the Java side forecastJob.s_BoundsPercentile = properties.get<double>( "boundspercentile", maths::common::CModel::DEFAULT_BOUNDS_PERCENTILE); } catch (const std::exception& e) { LOG_ERROR(<< ERROR_FORECAST_REQUEST_FAILED_TO_PARSE << e.what()); return false; } if (forecastJob.s_ForecastId.empty()) { LOG_ERROR(<< ERROR_NO_FORECAST_ID); return false; } // from now we have a forecast ID and can send error messages if (forecastJob.s_MaxForecastModelMemory != DEFAULT_MAX_FORECAST_MODEL_MEMORY && (forecastJob.s_MaxForecastModelMemory >= MAX_FORECAST_MODEL_PERSISTANCE_MEMORY || forecastJob.s_MaxForecastModelMemory >= static_cast<std::size_t>(jobBytesSizeLimit * 0.40))) { errorFunction(forecastJob, ERROR_BAD_MODEL_MEMORY_LIMIT); return false; } if (lastResultsTime == 0) { errorFunction(forecastJob, ERROR_NO_DATA_PROCESSED); return false; } if (forecastJob.s_CreateTime == 0) { errorFunction(forecastJob, ERROR_NO_CREATE_TIME); return false; } if (forecastJob.s_Duration == 0) { // only log forecastJob.s_Duration = core::constants::DAY; LOG_INFO(<< INFO_DEFAULT_DURATION); } if (expiresIn < -1) { // only log expiresIn = DEFAULT_EXPIRY_TIME; LOG_INFO(<< WARNING_INVALID_EXPIRY); } else if (expiresIn == -1) { // only log expiresIn = DEFAULT_EXPIRY_TIME; LOG_DEBUG(<< INFO_DEFAULT_EXPIRY); } forecastJob.s_ExpiryTime = forecastJob.s_CreateTime + expiresIn; return true; } void CForecastRunner::sendScheduledMessage(const SForecast& forecastJob) const { LOG_DEBUG(<< "job passed forecast validation, scheduled for forecasting"); model::CForecastDataSink sink( m_JobId, forecastJob.s_ForecastId, forecastJob.s_ForecastAlias, forecastJob.s_CreateTime, forecastJob.s_StartTime, forecastJob.forecastEnd(), forecastJob.s_ExpiryTime, forecastJob.s_MemoryUsage, m_ConcurrentOutputStream); sink.writeScheduledMessage(); } void CForecastRunner::sendErrorMessage(const SForecast& forecastJob, const std::string& message) const { LOG_ERROR(<< message); this->sendMessage(&model::CForecastDataSink::writeErrorMessage, forecastJob, message); } void CForecastRunner::sendFinalMessage(const SForecast& forecastJob, const std::string& message) const { this->sendMessage(&model::CForecastDataSink::writeFinalMessage, forecastJob, message); } template<typename WRITE> void CForecastRunner::sendMessage(WRITE write, const SForecast& forecastJob, const std::string& message) const { model::CForecastDataSink sink( m_JobId, forecastJob.s_ForecastId, forecastJob.s_ForecastAlias, forecastJob.s_CreateTime, forecastJob.s_StartTime, forecastJob.forecastEnd(), // in an error case use the default expiry time forecastJob.s_CreateTime + DEFAULT_EXPIRY_TIME, forecastJob.s_MemoryUsage, m_ConcurrentOutputStream); (sink.*write)(message); } bool CForecastRunner::sufficientAvailableDiskSpace(std::size_t minForecastAvailableDiskSpace, const char* path) { return sufficientAvailableDiskSpaceForPath(minForecastAvailableDiskSpace, path); } void CForecastRunner::SForecast::reset() { // clean up all non-simple types s_ForecastSeries.clear(); } core_t::TTime CForecastRunner::SForecast::forecastEnd() const { return s_StartTime + s_Duration; } } }