include/api/CDataFrameAnalysisRunner.h (118 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#ifndef INCLUDED_ml_api_CDataFrameAnalysisRunner_h
#define INCLUDED_ml_api_CDataFrameAnalysisRunner_h
#include <api/ImportExport.h>
#include <boost/json.hpp>
#include <cstddef>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <thread>
#include <vector>
namespace json = boost::json;
namespace ml {
namespace core {
class CDataFrame;
class CPackedBitVector;
class CBoostJsonConcurrentLineWriter;
class CStatePersistInserter;
class CTemporaryDirectory;
namespace data_frame_detail {
class CRowRef;
}
}
namespace api {
class CDataFrameAnalysisInstrumentation;
class CDataFrameAnalysisSpecification;
class CDataSummarizationJsonWriter;
class CInferenceModelDefinition;
class CInferenceModelMetadata;
class CMemoryUsageEstimationResultJsonWriter;
//! \brief Hierarchy for running a specific core::CDataFrame analyses.
//!
//! DESCRIPTION:\n
//! This hierarchy manages the running of specific analyses on a core::CDataFrame
//! object. It provides common interface for reporting progress and errors back to
//! calling code and starting an analysis.
//!
//! IMPLEMENTATION DECISIONS:\n
//! Particular analyses are specified by a JSON object which is passed as a header
//! to the data_frame_analyzer command before any data. It is the responsibility of
//! the CDataFrameAnalysisSpecification to parse this header although it passes off
//! the reading of the analysis parameters object to implementations of this runner.
//! Therefore CDataFrameAnalysisSpecification is also responsible for creating an
//! appropriate runner object for the data_frame_analyzer command. A particular
//! analysis is launched by the CDataFrameAnalysisSpecification::run method which
//! returns a reference to the appropriate CDataFrameAnalysisRunner implementation.
//!
//! This launches the work to do the analysis in a background thread so that the
//! main thread remains responsive and can periodically report progress and errors.
//!
//! No mechanism is provided to cancel the work (yet) because it is anticipated
//! that this will be probably be achieved by killing the process and it is too
//! early to determine how to implement a good cooperative interrupt scheme.
class API_EXPORT CDataFrameAnalysisRunner {
public:
using TBoolVec = std::vector<bool>;
using TStrVec = std::vector<std::string>;
using TRowRef = core::data_frame_detail::CRowRef;
using TProgressRecorder = std::function<void(double)>;
using TStrVecVec = std::vector<TStrVec>;
using TInferenceModelDefinitionUPtr = std::unique_ptr<CInferenceModelDefinition>;
using TDataSummarizationJsonWriterUPtr = std::unique_ptr<CDataSummarizationJsonWriter>;
using TDataFrameUPtr = std::unique_ptr<core::CDataFrame>;
using TTemporaryDirectoryPtr = std::shared_ptr<core::CTemporaryDirectory>;
using TDataFrameUPtrTemporaryDirectoryPtrPr =
std::pair<TDataFrameUPtr, TTemporaryDirectoryPtr>;
public:
//! The intention is that concrete objects of this hierarchy are constructed
//! by the factory class.
explicit CDataFrameAnalysisRunner(const CDataFrameAnalysisSpecification& spec);
virtual ~CDataFrameAnalysisRunner();
CDataFrameAnalysisRunner(const CDataFrameAnalysisRunner&) = delete;
CDataFrameAnalysisRunner& operator=(const CDataFrameAnalysisRunner&) = delete;
//! Make a data frame suitable for running the analysis on.
//!
//! This chooses the storage strategy based on the analysis constraints and
//! the number of rows and columns it needs and reserves capacity as appropriate.
TDataFrameUPtrTemporaryDirectoryPtrPr makeDataFrame() const;
//! Estimates memory usage in two cases:
//! 1. disk is not used (the whole data frame fits in main memory)
//! 2. disk is used (only one partition needs to be loaded to main memory)
void estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const;
//! Check if the data frame for this analysis should use in or out of core
//! storage.
bool storeDataFrameInMainMemory() const;
//! \return The number of partitions to use when analysing the data frame.
//! \note If this is greater than one then the data frame should be stored
//! on disk. The run method is responsible for copying the relevant pieces
//! into main memory during an analysis.
std::size_t numberPartitions() const;
//! Get the maximum permitted partition size in numbers of rows.
std::size_t maximumNumberRowsPerPartition() const;
//! \return The number of columns this analysis appends.
virtual std::size_t numberExtraColumns() const = 0;
//! \return The capacity of the data frame slice to use.
virtual std::size_t dataFrameSliceCapacity() const = 0;
//! Get a mask for the subset of the rows for which results are required.
//!
//! \param[in] frame The data frame for which to write results.
//! \return A mask of the rows of \p frame to write.
virtual core::CPackedBitVector rowsToWriteMask(const core::CDataFrame& frame) const = 0;
//! Write the extra columns of \p row added by the analysis to \p writer.
//!
//! This should create a new object of the form:
//! <pre>
//! {
//! "name of column n": "value of column n",
//! "name of column n+1": "value of column n+1",
//! ...
//! }
//! </pre>
//! with one named member for each column added.
//!
//! \param[in] frame The data frame for which to write results.
//! \param[in] row The row to write the columns added by this analysis.
//! \param[in,out] writer The stream to which to write the extra columns.
virtual void writeOneRow(const core::CDataFrame& frame,
const TRowRef& row,
core::CBoostJsonConcurrentLineWriter& writer) const = 0;
//! Validate if \p frame is suitable for running the analysis on.
virtual bool validate(const core::CDataFrame& frame) const = 0;
//! Checks whether the analysis is already running and if not launches it
//! in the background.
//!
//! \note The thread calling this is expected to be nearly always idle, i.e.
//! just progress monitoring, so this doesn't count towards the thread limit.
void run(core::CDataFrame& frame);
//! This waits to until the analysis has finished and joins the thread.
void waitToFinish();
//! \return A serialisable definition of the trained model.
virtual TInferenceModelDefinitionUPtr
inferenceModelDefinition(const TStrVec& fieldNames, const TStrVecVec& categoryNames) const;
//! \return A serialisable summarization of the training data if appropriate or a null pointer.
virtual TDataSummarizationJsonWriterUPtr dataSummarization() const;
//! \return A serialisable metadata of the trained model.
virtual const CInferenceModelMetadata* inferenceModelMetadata() const;
//! \return Reference to the analysis instrumentation.
virtual const CDataFrameAnalysisInstrumentation& instrumentation() const = 0;
//! \return Reference to the analysis instrumentation.
virtual CDataFrameAnalysisInstrumentation& instrumentation() = 0;
protected:
using TMemoryMonitor = std::function<void(std::int64_t)>;
using TStatePersister =
std::function<void(std::function<void(core::CStatePersistInserter&)>)>;
protected:
const CDataFrameAnalysisSpecification& spec() const;
//! This computes the execution strategy for the analysis, including how
//! the data frame will be stored, the size of the partition and the maximum
//! number of rows per subset.
//!
//! \warning This must be called in the constructor of any derived runner.
virtual void computeAndSaveExecutionStrategy();
void numberPartitions(std::size_t partitions);
void maximumNumberRowsPerPartition(std::size_t rowsPerPartition);
std::size_t estimateMemoryUsage(std::size_t totalNumberRows,
std::size_t partitionNumberRows,
std::size_t numberColumns) const;
//! \return Callback function for writing state using given persist inserter
TStatePersister statePersister();
private:
virtual void runImpl(core::CDataFrame& frame) = 0;
virtual std::size_t estimateBookkeepingMemoryUsage(std::size_t numberPartitions,
std::size_t totalNumberRows,
std::size_t partitionNumberRows,
std::size_t numberColumns) const = 0;
private:
const CDataFrameAnalysisSpecification& m_Spec;
std::size_t m_NumberPartitions = 0;
std::size_t m_MaximumNumberRowsPerPartition = 0;
std::thread m_Runner;
};
//! \brief Makes a core::CDataFrame analysis runner.
class API_EXPORT CDataFrameAnalysisRunnerFactory {
public:
using TRunnerUPtr = std::unique_ptr<CDataFrameAnalysisRunner>;
using TDataFrameUPtrTemporaryDirectoryPtrPr =
CDataFrameAnalysisRunner::TDataFrameUPtrTemporaryDirectoryPtrPr;
public:
virtual ~CDataFrameAnalysisRunnerFactory() = default;
virtual const std::string& name() const = 0;
//! Create a new runner object from \p spec.
//!
//! \param[in] spec The analysis specification.
//! \param[out] frameAndDirectory If non-null a data frame is created which
//! is suitable for the analysis together with the directory handle, if it
//! is stored on disk, and written to this.
TRunnerUPtr make(const CDataFrameAnalysisSpecification& spec,
TDataFrameUPtrTemporaryDirectoryPtrPr* frameAndDirectory = nullptr) const;
//! Create a new runner object from \p spec.
//!
//! \param[in] spec The analysis specification.
//! \param[in] jsonParameters A JSON description of the analysis parameters.
//! \param[out] frameAndDirectory If non-null a data frame is created which
//! is suitable for the analysis together with the directory handle, if it
//! is stored on disk, and written to this parameter.
TRunnerUPtr make(const CDataFrameAnalysisSpecification& spec,
const json::value& jsonParameters,
TDataFrameUPtrTemporaryDirectoryPtrPr* frameAndDirectory = nullptr) const;
private:
virtual TRunnerUPtr
makeImpl(const CDataFrameAnalysisSpecification& spec,
TDataFrameUPtrTemporaryDirectoryPtrPr* frameAndDirectory) const = 0;
virtual TRunnerUPtr
makeImpl(const CDataFrameAnalysisSpecification& spec,
const json::value& jsonParameters,
TDataFrameUPtrTemporaryDirectoryPtrPr* frameAndDirectory) const = 0;
};
}
}
#endif // INCLUDED_ml_api_CDataFrameAnalysisRunner_h