lib/api/CDataFrameAnalysisInstrumentation.cc (461 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <api/CDataFrameAnalysisInstrumentation.h> #include <core/CTimeUtils.h> #include <core/Constants.h> #include <maths/analytics/CBoostedTree.h> #include <api/CDataFrameOutliersRunner.h> #include <api/CDataFrameTrainBoostedTreeClassifierRunner.h> #include <api/CDataFrameTrainBoostedTreeRunner.h> #include <boost/json.hpp> #include <chrono> #include <cmath> #include <cstdint> #include <iomanip> #include <sstream> #include <string> #include <thread> #include <vector> namespace json = boost::json; namespace ml { namespace api { namespace { using TStrVec = std::vector<std::string>; const double MEMORY_LIMIT_INCREMENT{2.0}; // request 100% more memory const std::size_t MAXIMUM_FRACTIONAL_PROGRESS{std::size_t{1} << ((sizeof(std::size_t) - 2) * 8)}; const std::int64_t BYTES_IN_KB{static_cast<std::int64_t>(core::constants::BYTES_IN_KILOBYTES)}; // clang-format off const std::string CLASSIFICATION_STATS_TAG{"classification_stats"}; const std::string HYPERPARAMETERS_TAG{"hyperparameters"}; const std::string MEMORY_REESTIMATE_TAG{"memory_reestimate_bytes"}; const std::string ITERATION_TAG{"iteration"}; const std::string JOB_ID_TAG{"job_id"}; const std::string MEMORY_STATUS_HARD_LIMIT_TAG{"hard_limit"}; const std::string MEMORY_STATUS_OK_TAG{"ok"}; const std::string MEMORY_STATUS_TAG{"status"}; const std::string MEMORY_TYPE_TAG{"analytics_memory_usage"}; const std::string OUTLIER_DETECTION_STATS{"outlier_detection_stats"}; const std::string PARAMETERS_TAG{"parameters"}; const std::string PEAK_MEMORY_USAGE_TAG{"peak_usage_bytes"}; const std::string PROGRESS_TAG{"progress"}; const std::string REGRESSION_STATS_TAG{"regression_stats"}; const std::string STEP_TAG{"step"}; const std::string TIMESTAMP_TAG{"timestamp"}; const std::string TIMING_ELAPSED_TIME_TAG{"elapsed_time"}; const std::string TIMING_ITERATION_TIME_TAG{"iteration_time"}; const std::string TIMING_STATS_TAG{"timing_stats"}; const std::string TYPE_TAG{"type"}; const std::string VALIDATION_FOLD_TAG{"fold"}; const std::string VALIDATION_FOLD_VALUES_TAG{"fold_values"}; const std::string VALIDATION_LOSS_TAG{"validation_loss"}; const std::string VALIDATION_LOSS_TYPE_TAG{"loss_type"}; const std::string VALIDATION_LOSS_VALUES_TAG{"values"}; // Hyperparameters // TODO we should expose these in the analysis config. const std::string MAX_ATTEMPTS_TO_ADD_TREE_TAG{"max_attempts_to_add_tree"}; const std::string NUM_SPLITS_PER_FEATURE_TAG{"num_splits_per_feature"}; // Phase progress const std::string PHASE_PROGRESS{"phase_progress"}; const std::string PHASE{"phase"}; const std::string PROGRESS_PERCENT{"progress_percent"}; // clang-format on std::string bytesToString(std::int64_t value) { std::ostringstream stream; stream << std::fixed; stream << std::setprecision(0); value = (value + BYTES_IN_KB - 1) / BYTES_IN_KB; if (value < BYTES_IN_KB) { stream << value; stream << " kb"; } else { value = (value + BYTES_IN_KB - 1) / BYTES_IN_KB; stream << value; stream << " mb"; } return stream.str(); } std::string bytesToString(double bytes) { return bytesToString(static_cast<std::int64_t>(bytes)); } } CDataFrameAnalysisInstrumentation::CDataFrameAnalysisInstrumentation(const std::string& jobId, std::size_t memoryLimit) : m_JobId{jobId}, m_ProgressMonitoredTask{NO_TASK}, m_MemoryLimit{static_cast<std::int64_t>(memoryLimit)}, m_Finished{false}, m_FractionalProgress{0}, m_Memory{0}, m_Writer{nullptr}, m_MemoryStatus(E_Ok) { } void CDataFrameAnalysisInstrumentation::updateMemoryUsage(std::int64_t delta) { std::int64_t memory{m_Memory.fetch_add(delta) + delta}; if (memory >= 0) { core::CProgramCounters::counter(this->memoryCounterType()).max(static_cast<std::uint64_t>(memory)); if (memory > m_MemoryLimit) { double memoryReestimateBytes{static_cast<double>(memory) * MEMORY_LIMIT_INCREMENT}; this->memoryReestimate(static_cast<std::int64_t>(memoryReestimateBytes)); this->memoryStatus(E_HardLimit); this->flush(); m_Writer->flush(); LOG_INFO(<< "Required memory " << memory << " exceeds the memory limit " << m_MemoryLimit << ". New estimated limit is " << memoryReestimateBytes << "."); HANDLE_FATAL(<< "Input error: memory limit [" << bytesToString(m_MemoryLimit) << "] has been exceeded. Please force stop the job, increase to new estimated limit [" << bytesToString(memoryReestimateBytes) << "] and restart."); } } else { // Something has gone wrong with memory estimation. Trap this case // to avoid underflowing the peak memory usage statistic. LOG_WARN(<< "Memory estimate " << memory << " is negative!"); } } void CDataFrameAnalysisInstrumentation::startNewProgressMonitoredTask(const std::string& task) { std::string lastTask; { std::lock_guard<std::mutex> lock{m_ProgressMutex}; lastTask = m_ProgressMonitoredTask; m_ProgressMonitoredTask = task; m_FractionalProgress.store(0.0); } writeProgress(lastTask, 100, m_Writer.get()); } void CDataFrameAnalysisInstrumentation::updateProgress(double fractionalProgress) { m_FractionalProgress.fetch_add(static_cast<std::size_t>(std::max( static_cast<double>(MAXIMUM_FRACTIONAL_PROGRESS) * fractionalProgress + 0.5, 1.0))); } void CDataFrameAnalysisInstrumentation::resetProgress() { std::lock_guard<std::mutex> lock{m_ProgressMutex}; m_ProgressMonitoredTask = NO_TASK; m_FractionalProgress.store(0); m_Finished.store(false); } void CDataFrameAnalysisInstrumentation::setToFinished() { m_Finished.store(true); m_FractionalProgress.store(MAXIMUM_FRACTIONAL_PROGRESS); } bool CDataFrameAnalysisInstrumentation::finished() const { return m_Finished.load(); } double CDataFrameAnalysisInstrumentation::progress() const { return this->finished() ? 1.0 : static_cast<double>(std::min(m_FractionalProgress.load(), MAXIMUM_FRACTIONAL_PROGRESS - 1)) / static_cast<double>(MAXIMUM_FRACTIONAL_PROGRESS); } void CDataFrameAnalysisInstrumentation::flush(const std::string& /* tag */) { // TODO use the tag. this->writeMemoryAndAnalysisStats(); } std::int64_t CDataFrameAnalysisInstrumentation::memory() const { return m_Memory.load(); } const std::string& CDataFrameAnalysisInstrumentation::jobId() const { return m_JobId; } void CDataFrameAnalysisInstrumentation::monitor(CDataFrameAnalysisInstrumentation& instrumentation, core::CBoostJsonConcurrentLineWriter& writer) { std::string lastTask{NO_TASK}; int lastProgress{0}; int wait{1}; while (instrumentation.finished() == false) { std::this_thread::sleep_for(std::chrono::milliseconds(wait)); std::string task{instrumentation.readProgressMonitoredTask()}; int progress{instrumentation.percentageProgress()}; if (task != lastTask || progress > lastProgress) { lastTask = task; lastProgress = progress; writeProgress(lastTask, lastProgress, &writer); } wait = std::min(2 * wait, 1024); } lastTask = instrumentation.readProgressMonitoredTask(); lastProgress = instrumentation.percentageProgress(); writeProgress(lastTask, lastProgress, &writer); } void CDataFrameAnalysisInstrumentation::memoryReestimate(std::int64_t memoryReestimate) { m_MemoryReestimate = memoryReestimate; } void CDataFrameAnalysisInstrumentation::memoryStatus(EMemoryStatus status) { m_MemoryStatus = status; } std::string CDataFrameAnalysisInstrumentation::readProgressMonitoredTask() const { std::lock_guard<std::mutex> lock{m_ProgressMutex}; return m_ProgressMonitoredTask; } int CDataFrameAnalysisInstrumentation::percentageProgress() const { return static_cast<int>(std::floor(100.0 * this->progress())); } CDataFrameAnalysisInstrumentation::TWriter* CDataFrameAnalysisInstrumentation::writer() { return m_Writer.get(); } void CDataFrameAnalysisInstrumentation::writeMemoryAndAnalysisStats() { if (m_Writer != nullptr) { std::int64_t timestamp{core::CTimeUtils::nowMs()}; m_Writer->onObjectBegin(); this->writeMemory(timestamp); this->writeAnalysisStats(timestamp); m_Writer->onObjectEnd(); } } void CDataFrameAnalysisInstrumentation::writeMemory(std::int64_t timestamp) { if (m_Writer != nullptr) { m_Writer->onKey(MEMORY_TYPE_TAG); m_Writer->onObjectBegin(); m_Writer->onKey(JOB_ID_TAG); m_Writer->onString(m_JobId); m_Writer->onKey(TIMESTAMP_TAG); m_Writer->onInt64(timestamp); m_Writer->onKey(PEAK_MEMORY_USAGE_TAG); m_Writer->onUint64(core::CProgramCounters::counter(this->memoryCounterType())); m_Writer->onKey(MEMORY_STATUS_TAG); switch (m_MemoryStatus) { case E_Ok: m_Writer->onString(MEMORY_STATUS_OK_TAG); break; case E_HardLimit: m_Writer->onString(MEMORY_STATUS_HARD_LIMIT_TAG); break; } if (m_MemoryReestimate) { m_Writer->onKey(MEMORY_REESTIMATE_TAG); m_Writer->onInt64(*m_MemoryReestimate); } m_Writer->onObjectEnd(); } } void CDataFrameAnalysisInstrumentation::writeProgress(const std::string& task, int progress, core::CBoostJsonConcurrentLineWriter* writer) { if (writer != nullptr && task != NO_TASK) { writer->onObjectBegin(); writer->onKey(PHASE_PROGRESS); writer->onObjectBegin(); writer->onKey(PHASE); writer->onString(task); writer->onKey(PROGRESS_PERCENT); writer->onInt(progress); writer->onObjectEnd(); writer->onObjectEnd(); writer->flush(); } } const std::string CDataFrameAnalysisInstrumentation::NO_TASK; counter_t::ECounterTypes CDataFrameOutliersInstrumentation::memoryCounterType() { return counter_t::E_DFOPeakMemoryUsage; } counter_t::ECounterTypes CDataFrameTrainBoostedTreeInstrumentation::memoryCounterType() { return counter_t::E_DFTPMPeakMemoryUsage; } void CDataFrameOutliersInstrumentation::writeAnalysisStats(std::int64_t timestamp) { auto* writer = this->writer(); if (writer != nullptr && m_AnalysisStatsInitialized == true) { writer->onKey(OUTLIER_DETECTION_STATS); writer->onObjectBegin(); writer->onKey(JOB_ID_TAG); writer->onString(this->jobId()); writer->onKey(TIMESTAMP_TAG); writer->onInt64(timestamp); json::object parametersObject{writer->makeObject()}; this->writeParameters(parametersObject); writer->onKey(PARAMETERS_TAG); writer->write(parametersObject); json::object timingStatsObject{writer->makeObject()}; this->writeTimingStats(timingStatsObject); writer->onKey(TIMING_STATS_TAG); writer->write(timingStatsObject); writer->onObjectEnd(); } } void CDataFrameOutliersInstrumentation::parameters( const maths::analytics::COutliers::SComputeParameters& parameters) { if (m_AnalysisStatsInitialized == false) { m_AnalysisStatsInitialized = true; } m_Parameters = parameters; } void CDataFrameOutliersInstrumentation::elapsedTime(std::uint64_t time) { m_ElapsedTime = time; } void CDataFrameOutliersInstrumentation::featureInfluenceThreshold(double featureInfluenceThreshold) { m_FeatureInfluenceThreshold = featureInfluenceThreshold; } void CDataFrameOutliersInstrumentation::writeTimingStats(json::object& parentObject) { auto* writer = this->writer(); if (writer != nullptr) { writer->addMember(TIMING_ELAPSED_TIME_TAG, json::value(m_ElapsedTime), parentObject); } } void CDataFrameOutliersInstrumentation::writeParameters(json::object& parentObject) { auto* writer = this->writer(); if (writer != nullptr) { writer->addMember(CDataFrameOutliersRunner::N_NEIGHBORS, json::value(static_cast<std::uint64_t>(m_Parameters.s_NumberNeighbours)), parentObject); writer->addMember(CDataFrameOutliersRunner::COMPUTE_FEATURE_INFLUENCE, json::value(m_Parameters.s_ComputeFeatureInfluence), parentObject); writer->addMember(CDataFrameOutliersRunner::OUTLIER_FRACTION, json::value(m_Parameters.s_OutlierFraction), parentObject); writer->addMember(CDataFrameOutliersRunner::FEATURE_INFLUENCE_THRESHOLD, json::value(m_FeatureInfluenceThreshold), parentObject); writer->addMember(CDataFrameOutliersRunner::STANDARDIZATION_ENABLED, json::value(m_Parameters.s_StandardizeColumns), parentObject); writer->addMember( CDataFrameOutliersRunner::METHOD, json::value(maths::analytics::COutliers::print(m_Parameters.s_Method)), parentObject); } } void CDataFrameTrainBoostedTreeInstrumentation::type(EStatsType type) { m_Type = type; } void CDataFrameTrainBoostedTreeInstrumentation::iteration(std::size_t iteration) { if (m_AnalysisStatsInitialized == false) { m_AnalysisStatsInitialized = true; } m_Iteration = iteration; } void CDataFrameTrainBoostedTreeInstrumentation::iterationTime(std::uint64_t delta) { m_IterationTime = delta; m_ElapsedTime += delta; } void CDataFrameTrainBoostedTreeInstrumentation::lossType(const std::string& lossType) { m_LossType = lossType; } void CDataFrameTrainBoostedTreeInstrumentation::lossValues(std::size_t fold, TDoubleVec&& lossValues) { m_LossValues.emplace_back(fold, std::move(lossValues)); } void CDataFrameTrainBoostedTreeInstrumentation::task(api_t::EDataFrameTrainBoostedTreeTask task) { m_Task = task; } void CDataFrameTrainBoostedTreeInstrumentation::writeAnalysisStats(std::int64_t timestamp) { auto* writer = this->writer(); if (writer != nullptr && m_AnalysisStatsInitialized == true) { switch (m_Type) { case E_Regression: writer->onKey(REGRESSION_STATS_TAG); break; case E_Classification: writer->onKey(CLASSIFICATION_STATS_TAG); break; } writer->onObjectBegin(); writer->onKey(JOB_ID_TAG); writer->onString(this->jobId()); writer->onKey(TIMESTAMP_TAG); writer->onInt64(timestamp); writer->onKey(ITERATION_TAG); writer->onUint64(m_Iteration); json::object hyperparametersObject{writer->makeObject()}; this->writeHyperparameters(hyperparametersObject); writer->onKey(HYPERPARAMETERS_TAG); writer->write(hyperparametersObject); json::object validationLossObject{writer->makeObject()}; this->writeValidationLoss(validationLossObject); writer->onKey(VALIDATION_LOSS_TAG); writer->write(validationLossObject); json::object timingStatsObject{writer->makeObject()}; this->writeTimingStats(timingStatsObject); writer->onKey(TIMING_STATS_TAG); writer->write(timingStatsObject); writer->onObjectEnd(); } this->reset(); } void CDataFrameTrainBoostedTreeInstrumentation::reset() { // Clear the map of loss values before the next iteration m_LossValues.clear(); } void CDataFrameTrainBoostedTreeInstrumentation::writeHyperparameters(json::object& parentObject) { auto* writer = this->writer(); if (writer != nullptr) { writer->addMember(CDataFrameTrainBoostedTreeRunner::ETA, json::value(m_Hyperparameters.s_Eta), parentObject); if (m_Type == E_Classification) { auto objective = m_Hyperparameters.s_ClassAssignmentObjective; writer->addMember( CDataFrameTrainBoostedTreeClassifierRunner::CLASS_ASSIGNMENT_OBJECTIVE, CDataFrameTrainBoostedTreeClassifierRunner::CLASS_ASSIGNMENT_OBJECTIVE_VALUES[objective], parentObject); } writer->addMember(CDataFrameTrainBoostedTreeRunner::ALPHA, json::value(m_Hyperparameters.s_DepthPenaltyMultiplier), parentObject); writer->addMember(CDataFrameTrainBoostedTreeRunner::SOFT_TREE_DEPTH_LIMIT, json::value(m_Hyperparameters.s_SoftTreeDepthLimit), parentObject); writer->addMember(CDataFrameTrainBoostedTreeRunner::SOFT_TREE_DEPTH_TOLERANCE, json::value(m_Hyperparameters.s_SoftTreeDepthTolerance), parentObject); writer->addMember(CDataFrameTrainBoostedTreeRunner::GAMMA, json::value(m_Hyperparameters.s_TreeSizePenaltyMultiplier), parentObject); writer->addMember(CDataFrameTrainBoostedTreeRunner::LAMBDA, json::value(m_Hyperparameters.s_LeafWeightPenaltyMultiplier), parentObject); writer->addMember(CDataFrameTrainBoostedTreeRunner::DOWNSAMPLE_FACTOR, json::value(m_Hyperparameters.s_DownsampleFactor), parentObject); writer->addMember( CDataFrameTrainBoostedTreeRunner::NUM_FOLDS, json::value(static_cast<std::uint64_t>(m_Hyperparameters.s_NumFolds)), parentObject); writer->addMember( CDataFrameTrainBoostedTreeRunner::MAX_TREES, json::value(static_cast<std::uint64_t>(m_Hyperparameters.s_MaxTrees)), parentObject); writer->addMember(CDataFrameTrainBoostedTreeRunner::FEATURE_BAG_FRACTION, json::value(m_Hyperparameters.s_FeatureBagFraction), parentObject); writer->addMember(CDataFrameTrainBoostedTreeRunner::ETA_GROWTH_RATE_PER_TREE, json::value(m_Hyperparameters.s_EtaGrowthRatePerTree), parentObject); writer->addMember(MAX_ATTEMPTS_TO_ADD_TREE_TAG, json::value(static_cast<std::uint64_t>( m_Hyperparameters.s_MaxAttemptsToAddTree)), parentObject); writer->addMember(NUM_SPLITS_PER_FEATURE_TAG, json::value(static_cast<std::uint64_t>( m_Hyperparameters.s_NumSplitsPerFeature)), parentObject); writer->addMember(CDataFrameTrainBoostedTreeRunner::MAX_OPTIMIZATION_ROUNDS_PER_HYPERPARAMETER, json::value(static_cast<std::uint64_t>( m_Hyperparameters.s_MaxOptimizationRoundsPerHyperparameter)), parentObject); if (m_Task == api_t::E_Update) { writer->addMember(CDataFrameTrainBoostedTreeRunner::TREE_TOPOLOGY_CHANGE_PENALTY, json::value(m_Hyperparameters.s_TreeTopologyChangePenalty), parentObject); writer->addMember(CDataFrameTrainBoostedTreeRunner::PREDICTION_CHANGE_COST, json::value(m_Hyperparameters.s_PredictionChangeCost), parentObject); writer->addMember(CDataFrameTrainBoostedTreeRunner::RETRAINED_TREE_ETA, json::value(m_Hyperparameters.s_RetrainedTreeEta), parentObject); } } } void CDataFrameTrainBoostedTreeInstrumentation::writeValidationLoss(json::object& parentObject) { auto* writer = this->writer(); if (writer != nullptr) { writer->addMember(VALIDATION_LOSS_TYPE_TAG, json::value(m_LossType), parentObject); // NOTE: Do not use brace initialization here as that will // result in "lossValuesArray" being created as a nested array on linux json::array lossValuesArray = writer->makeArray(); for (auto& element : m_LossValues) { json::object item{writer->makeObject()}; writer->addMember(VALIDATION_FOLD_TAG, json::value(static_cast<std::uint64_t>(element.first)), item); // NOTE: Do not use brace initialization here as that will // result in "array" being created as a nested array on linux json::array array = writer->makeArray(element.second.size()); for (double lossValue : element.second) { array.push_back(json::value(lossValue)); } writer->addMember(VALIDATION_LOSS_VALUES_TAG, array, item); lossValuesArray.push_back(item); } writer->addMember(VALIDATION_FOLD_VALUES_TAG, lossValuesArray, parentObject); } } void CDataFrameTrainBoostedTreeInstrumentation::writeTimingStats(json::object& parentObject) { auto* writer = this->writer(); if (writer != nullptr) { writer->addMember(TIMING_ELAPSED_TIME_TAG, json::value(m_ElapsedTime), parentObject); writer->addMember(TIMING_ITERATION_TIME_TAG, json::value(m_IterationTime), parentObject); } } CDataFrameAnalysisInstrumentation::CScopeSetOutputStream::CScopeSetOutputStream( CDataFrameAnalysisInstrumentation& instrumentation, core::CJsonOutputStreamWrapper& outStream) : m_Instrumentation{instrumentation} { instrumentation.m_Writer = std::make_unique<core::CBoostJsonConcurrentLineWriter>(outStream); } CDataFrameAnalysisInstrumentation::CScopeSetOutputStream::~CScopeSetOutputStream() { m_Instrumentation.m_Writer = nullptr; } } }