lib/api/CDataFrameAnalysisRunner.cc (212 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <api/CDataFrameAnalysisRunner.h> #include <core/CDataFrame.h> #include <core/CJsonStatePersistInserter.h> #include <core/CLogger.h> #include <core/CStateCompressor.h> #include <core/Constants.h> #include <api/CDataFrameAnalysisInstrumentation.h> #include <api/CDataFrameAnalysisSpecification.h> #include <api/CDataSummarizationJsonWriter.h> #include <api/CInferenceModelDefinition.h> #include <api/CMemoryUsageEstimationResultJsonWriter.h> #include <api/CSingleStreamDataAdder.h> #include <api/ElasticsearchStateIndex.h> #include <boost/iterator/counting_iterator.hpp> #include <algorithm> #include <cstddef> namespace ml { namespace api { namespace { using TBoolVec = std::vector<bool>; std::size_t maximumNumberPartitions(const CDataFrameAnalysisSpecification& spec) { // We limit the maximum number of partitions to rows^(1/2) because very // large numbers of partitions are going to be slow and it is better to tell // user to allocate more resources for the job in this case. return static_cast<std::size_t>(std::sqrt(static_cast<double>(spec.numberRows())) + 0.5); } } CDataFrameAnalysisRunner::CDataFrameAnalysisRunner(const CDataFrameAnalysisSpecification& spec) : m_Spec{spec} { } CDataFrameAnalysisRunner::~CDataFrameAnalysisRunner() { this->waitToFinish(); } CDataFrameAnalysisRunner::TDataFrameUPtrTemporaryDirectoryPtrPr CDataFrameAnalysisRunner::makeDataFrame() const { auto result = this->storeDataFrameInMainMemory() ? core::makeMainStorageDataFrame(m_Spec.numberColumns(), this->dataFrameSliceCapacity()) : core::makeDiskStorageDataFrame( m_Spec.temporaryDirectory(), m_Spec.numberColumns(), m_Spec.numberRows(), this->dataFrameSliceCapacity()); result.first->missingString(m_Spec.missingFieldValue()); result.first->reserve(m_Spec.numberThreads(), m_Spec.numberColumns() + this->numberExtraColumns()); return result; } void CDataFrameAnalysisRunner::estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const { std::size_t numberRows{m_Spec.numberRows()}; std::size_t numberColumns{m_Spec.numberColumns()}; std::size_t maxNumberPartitions{maximumNumberPartitions(m_Spec)}; if (maxNumberPartitions == 0) { writer.write("0mb", "0mb"); return; } std::size_t expectedMemoryWithoutDisk{ this->estimateMemoryUsage(numberRows, numberRows, numberColumns)}; std::size_t expectedMemoryWithDisk{this->estimateMemoryUsage( numberRows, numberRows / maxNumberPartitions, numberColumns)}; auto roundUpToNearestMb = [](std::size_t bytes) { return std::to_string((bytes + core::constants::BYTES_IN_MEGABYTES - 1) / core::constants::BYTES_IN_MEGABYTES) + "mb"; }; writer.write(roundUpToNearestMb(expectedMemoryWithoutDisk), roundUpToNearestMb(expectedMemoryWithDisk)); } bool CDataFrameAnalysisRunner::storeDataFrameInMainMemory() const { return m_NumberPartitions == 1; } std::size_t CDataFrameAnalysisRunner::numberPartitions() const { return m_NumberPartitions; } std::size_t CDataFrameAnalysisRunner::maximumNumberRowsPerPartition() const { return m_MaximumNumberRowsPerPartition; } void CDataFrameAnalysisRunner::run(core::CDataFrame& frame) { if (m_Runner.joinable()) { LOG_INFO(<< "Already running analysis"); } else { this->instrumentation().resetProgress(); m_Runner = std::thread([&frame, this]() { this->runImpl(frame); this->instrumentation().setToFinished(); }); } } void CDataFrameAnalysisRunner::waitToFinish() { if (m_Runner.joinable()) { m_Runner.join(); } } const CDataFrameAnalysisSpecification& CDataFrameAnalysisRunner::spec() const { return m_Spec; } void CDataFrameAnalysisRunner::computeAndSaveExecutionStrategy() { std::size_t numberRows{m_Spec.numberRows()}; std::size_t numberColumns{m_Spec.numberColumns()}; std::size_t memoryLimit{m_Spec.memoryLimit()}; LOG_TRACE(<< "memory limit = " << memoryLimit); // Find the smallest number of partitions such that the size per partition // is less than the memory limit. std::size_t maxNumberPartitions{maximumNumberPartitions(m_Spec)}; std::size_t memoryUsage{0}; for (m_NumberPartitions = 1; m_NumberPartitions < maxNumberPartitions; ++m_NumberPartitions) { std::size_t partitionNumberRows{numberRows / m_NumberPartitions}; memoryUsage = this->estimateMemoryUsage(numberRows, partitionNumberRows, numberColumns); LOG_TRACE(<< "partition number rows = " << partitionNumberRows); LOG_TRACE(<< "memory usage = " << memoryUsage); if (memoryUsage <= memoryLimit) { break; } if (m_Spec.diskUsageAllowed() == false) { LOG_TRACE(<< "stop partition number computation since disk usage is disabled"); break; } } LOG_TRACE(<< "number partitions = " << m_NumberPartitions); if (memoryUsage > memoryLimit) { auto roundMb = [](std::size_t memory) { double scale{std::max( std::pow(10.0, std::ceil(std::log(static_cast<double>(core::constants::BYTES_IN_MEGABYTES) / static_cast<double>(memory)) / std::log(10.0))), 100.0)}; return std::round(scale * static_cast<double>(memory) / static_cast<double>(core::constants::BYTES_IN_MEGABYTES)) / scale; }; // Simply log the limit being configured too low. If we exceed the limit // during the run, we will fail and the user will have to update the // limit and attempt to re-run. LOG_INFO(<< "Memory limit " << roundMb(memoryLimit) << "MB is configured lower" << " than the estimate " << roundMb(memoryUsage) << "MB." << "The analytics process may fail due to hitting the memory limit."); } if (m_NumberPartitions > 1) { // The maximum number of rows is found by binary search in the interval // [numberRows / m_NumberPartitions, numberRows / (m_NumberPartitions - 1)). m_MaximumNumberRowsPerPartition = *std::lower_bound( boost::make_counting_iterator(numberRows / m_NumberPartitions), boost::make_counting_iterator(numberRows / (m_NumberPartitions - 1)), memoryLimit, [&](std::size_t partitionNumberRows, std::size_t limit) { return this->estimateMemoryUsage(numberRows, partitionNumberRows, numberColumns) < limit; }); LOG_TRACE(<< "maximum rows per partition = " << m_MaximumNumberRowsPerPartition); } else { m_MaximumNumberRowsPerPartition = numberRows; } } void CDataFrameAnalysisRunner::numberPartitions(std::size_t partitions) { m_NumberPartitions = partitions; } void CDataFrameAnalysisRunner::maximumNumberRowsPerPartition(std::size_t rowsPerPartition) { m_MaximumNumberRowsPerPartition = rowsPerPartition; } std::size_t CDataFrameAnalysisRunner::estimateMemoryUsage(std::size_t totalNumberRows, std::size_t partitionNumberRows, std::size_t numberColumns) const { return core::CDataFrame::estimateMemoryUsage( this->storeDataFrameInMainMemory(), totalNumberRows, numberColumns + this->numberExtraColumns(), core::CAlignment::E_Aligned16) + this->estimateBookkeepingMemoryUsage(m_NumberPartitions, totalNumberRows, partitionNumberRows, numberColumns); } CDataFrameAnalysisRunner::TStatePersister CDataFrameAnalysisRunner::statePersister() { return [this](std::function<void(core::CStatePersistInserter&)> persistFunction) { auto persister = m_Spec.persister(); if (persister != nullptr) { core::CStateCompressor compressor(*persister); auto persistStream = compressor.addStreamed( getStateId(m_Spec.jobId(), m_Spec.analysisName())); { core::CJsonStatePersistInserter inserter{*persistStream}; persistFunction(inserter); } if (compressor.streamComplete(persistStream, true) == false || persistStream->bad()) { LOG_ERROR(<< "Failed to complete last persistence stream"); } } }; } CDataFrameAnalysisRunner::TInferenceModelDefinitionUPtr CDataFrameAnalysisRunner::inferenceModelDefinition(const TStrVec& /*fieldNames*/, const TStrVecVec& /*categoryNames*/) const { return {}; } CDataFrameAnalysisRunner::TDataSummarizationJsonWriterUPtr CDataFrameAnalysisRunner::dataSummarization() const { return {}; } const CInferenceModelMetadata* CDataFrameAnalysisRunner::inferenceModelMetadata() const { return nullptr; } CDataFrameAnalysisRunnerFactory::TRunnerUPtr CDataFrameAnalysisRunnerFactory::make(const CDataFrameAnalysisSpecification& spec, TDataFrameUPtrTemporaryDirectoryPtrPr* frameAndDirectory) const { auto result = this->makeImpl(spec, frameAndDirectory); if (result->numberPartitions() == 0) { HANDLE_FATAL(<< "You need to call 'computeAndSaveExecutionStrategy' in the derived runner constructor."); } if (frameAndDirectory != nullptr && frameAndDirectory->first == nullptr) { *frameAndDirectory = result->makeDataFrame(); } return result; } CDataFrameAnalysisRunnerFactory::TRunnerUPtr CDataFrameAnalysisRunnerFactory::make(const CDataFrameAnalysisSpecification& spec, const json::value& jsonParameters, TDataFrameUPtrTemporaryDirectoryPtrPr* frameAndDirectory) const { auto result = this->makeImpl(spec, jsonParameters, frameAndDirectory); if (result->numberPartitions() == 0) { HANDLE_FATAL(<< "You need to call 'computeAndSaveExecutionStrategy' in the derived runner constructor."); } if (frameAndDirectory != nullptr && frameAndDirectory->first == nullptr) { *frameAndDirectory = result->makeDataFrame(); } return result; } } }