include/api/CDataFrameAnalysisInstrumentation.h (135 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #ifndef INCLUDED_ml_api_CDataFrameAnalysisInstrumentation_h #define INCLUDED_ml_api_CDataFrameAnalysisInstrumentation_h #include <core/CBoostJsonConcurrentLineWriter.h> #include <core/CProgramCounters.h> #include <maths/analytics/CDataFrameAnalysisInstrumentationInterface.h> #include <maths/common/CBasicStatistics.h> #include <api/ApiTypes.h> #include <api/ImportExport.h> #include <boost/json.hpp> #include <atomic> #include <cstdint> #include <memory> #include <mutex> namespace ml { namespace api { //! \brief Instrumentation class for collecting data frame analysis job statistics. //! //! DESCRIPTION:\n //! Responsible for collecting data frame analysis job statistics, i.e. memory usage, //! progress, parameters, quality of results. This also implements the functionality //! to write the JSON statistics to a specified output stream in a thread safe manner. //! //! IMPLEMENTATION DECISIONS:\n //! With the exception of reading and writing progress and memory usage this class is //! *NOT* thread safe. It is expected that calls to update and write instrumentation //! data all happen on the thread running the analysis. It also performs thread safe //! writing to a shared output stream. For example, it is expected that writes for //! progress happen concurrently with writes of other instrumentation. class API_EXPORT CDataFrameAnalysisInstrumentation : virtual public maths::analytics::CDataFrameAnalysisInstrumentationInterface { public: //!\brief Memory status enum EMemoryStatus { E_Ok, E_HardLimit }; //! \brief Set the output stream for the lifetime of this object. class API_EXPORT CScopeSetOutputStream { public: CScopeSetOutputStream(CDataFrameAnalysisInstrumentation& instrumentation, core::CJsonOutputStreamWrapper& outStream); ~CScopeSetOutputStream(); CScopeSetOutputStream(const CScopeSetOutputStream&) = delete; CScopeSetOutputStream& operator=(const CScopeSetOutputStream&) = delete; private: CDataFrameAnalysisInstrumentation& m_Instrumentation; }; public: //! Constructs an instrumentation object an analytics job with a given \p jobId. CDataFrameAnalysisInstrumentation(const std::string& jobId, std::size_t memoryLimit); //! Adds \p delta to the memory usage statistics. void updateMemoryUsage(std::int64_t delta) override; //! Start progress monitoring for \p phase. //! //! \note This resets the current progress to zero. void startNewProgressMonitoredTask(const std::string& task) override; //! This adds \p fractionalProgress to the current progress. //! //! \note The caller should try to ensure that the sum of the values added //! at the end of the analysis is equal to one. //! \note This is converted to an integer - so we can atomically add - by //! scaling by 1024. Therefore, this shouldn't be called with values less //! than 0.001. In fact, it is unlikely that such high resolution is needed //! and typically this would be called significantly less frequently. void updateProgress(double fractionalProgress) override; //! Reset variables related to the job progress. void resetProgress(); //! Record that the analysis is complete. void setToFinished(); //! \return True if the running analysis has finished. bool finished() const; //! \return The progress of the analysis in the range [0,1] being an estimate //! of the proportion of total work complete for a single run. double progress() const; //! Flush then reinitialize the instrumentation data. This will trigger //! writing them to the results pipe. void flush(const std::string& tag = "") override; //! \return The peak memory usage. std::int64_t memory() const; //! \return The id of the data frame analytics job. const std::string& jobId() const; //! Start polling and writing progress updates. //! //! \note This doesn't return until instrumentation.setToFinished() is called. static void monitor(CDataFrameAnalysisInstrumentation& instrumentation, core::CBoostJsonConcurrentLineWriter& writer); protected: using TWriter = core::CBoostJsonConcurrentLineWriter; using TWriterUPtr = std::unique_ptr<TWriter>; using TOptionalInt64 = std::optional<std::int64_t>; protected: virtual counter_t::ECounterTypes memoryCounterType() = 0; TWriter* writer(); void memoryReestimate(std::int64_t memoryReestimate); void memoryStatus(EMemoryStatus status); private: static const std::string NO_TASK; private: std::string readProgressMonitoredTask() const; int percentageProgress() const; virtual void writeAnalysisStats(std::int64_t timestamp) = 0; void writeMemoryAndAnalysisStats(); void writeMemory(std::int64_t timestamp); static void writeProgress(const std::string& task, int progress, core::CBoostJsonConcurrentLineWriter* writer); private: std::string m_JobId; std::string m_ProgressMonitoredTask; std::int64_t m_MemoryLimit; std::atomic_bool m_Finished; std::atomic_size_t m_FractionalProgress; std::atomic<std::int64_t> m_Memory; mutable std::mutex m_ProgressMutex; TWriterUPtr m_Writer; EMemoryStatus m_MemoryStatus; TOptionalInt64 m_MemoryReestimate; }; //! \brief Instrumentation class for Outlier Detection jobs. class API_EXPORT CDataFrameOutliersInstrumentation final : public CDataFrameAnalysisInstrumentation, public maths::analytics::CDataFrameOutliersInstrumentationInterface { public: CDataFrameOutliersInstrumentation(const std::string& jobId, std::size_t memoryLimit) : CDataFrameAnalysisInstrumentation(jobId, memoryLimit) {} void parameters(const maths::analytics::COutliers::SComputeParameters& parameters) override; void elapsedTime(std::uint64_t time) override; void featureInfluenceThreshold(double featureInfluenceThreshold) override; protected: counter_t::ECounterTypes memoryCounterType() override; private: void writeAnalysisStats(std::int64_t timestamp) override; void writeTimingStats(json::object& parentObject); void writeParameters(json::object& parentObject); private: maths::analytics::COutliers::SComputeParameters m_Parameters; std::uint64_t m_ElapsedTime; double m_FeatureInfluenceThreshold = -1.0; bool m_AnalysisStatsInitialized = false; }; //! \brief Instrumentation class for Supervised Learning jobs. //! //! DESCRIPTION:\n //! This class extends CDataFrameAnalysisInstrumentation with setters //! for hyperparameters, validation loss results, and job timing. class API_EXPORT CDataFrameTrainBoostedTreeInstrumentation final : public CDataFrameAnalysisInstrumentation, public maths::analytics::CDataFrameTrainBoostedTreeInstrumentationInterface { public: CDataFrameTrainBoostedTreeInstrumentation(const std::string& jobId, std::size_t memoryLimit) : CDataFrameAnalysisInstrumentation(jobId, memoryLimit) {} //! Set the supervised learning job \p type, can be E_Regression or E_Classification. void type(EStatsType type) override; //! Set the current \p iteration number. void iteration(std::size_t iteration) override; //! Set the run time of the current iteration. void iterationTime(std::uint64_t delta) override; //! Set the type of the validation loss result, e.g. "mse". void lossType(const std::string& lossType) override; //! Set the validation loss values for \p fold for each forest size to \p lossValues. void lossValues(std::size_t fold, TDoubleVec&& lossValues) override; //! Set the analysis task. void task(api_t::EDataFrameTrainBoostedTreeTask task); //! \return A writable object containing the training hyperparameters. SHyperparameters& hyperparameters() override { return m_Hyperparameters; } protected: counter_t::ECounterTypes memoryCounterType() override; private: using TLossVec = std::vector<std::pair<std::size_t, TDoubleVec>>; using TRowsAccumulator = maths::common::CBasicStatistics::SSampleMean<std::uint32_t>::TAccumulator; private: void writeAnalysisStats(std::int64_t timestamp) override; void writeHyperparameters(json::object& parentObject); void writeValidationLoss(json::object& parentObject); void writeTimingStats(json::object& parentObject); void reset(); private: EStatsType m_Type{E_Regression}; std::size_t m_Iteration{0}; std::uint64_t m_IterationTime{0}; std::uint64_t m_ElapsedTime{0}; bool m_AnalysisStatsInitialized{false}; std::string m_LossType; TLossVec m_LossValues; api_t::EDataFrameTrainBoostedTreeTask m_Task{api_t::E_Train}; SHyperparameters m_Hyperparameters; }; } } #endif // INCLUDED_ml_api_CDataFrameAnalysisInstrumentation_h