include/api/CDataFrameAnalysisSpecification.h (110 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#ifndef INCLUDED_ml_api_CDataFrameAnalysisSpecification_h
#define INCLUDED_ml_api_CDataFrameAnalysisSpecification_h
#include <core/CDataAdder.h>
#include <core/CDataSearcher.h>
#include <core/CFastMutex.h>
#include <core/CJsonOutputStreamWrapper.h>
#include <api/CDataFrameAnalysisRunner.h>
#include <api/ImportExport.h>
#include <boost/json.hpp>
#include <cstddef>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <vector>
namespace json = boost::json;
namespace ml {
namespace core {
class CDataFrame;
class CTemporaryDirectory;
}
namespace api {
//! \brief Parses a complete specification for running a core::CDataFrame analysis
//! and supports launching that analysis on a specified frame object.
//!
//! DESCRIPTION:\n
//! This manages extracting all configuration for a particular analysis from a JSON
//! header which is passed to the data_frame_analyzer command before any data. This
//! creates and owns an analysis runner object which is also configured by the header.
//! The analysis is run asynchronously via the CDataFrameAnalysisSpecification::run
//! method which returns a handle to the runner to retrieve progress, errors and other
//! performance statistics.
class API_EXPORT CDataFrameAnalysisSpecification {
public:
using TBoolVec = std::vector<bool>;
using TSizeVec = std::vector<std::size_t>;
using TStrVec = std::vector<std::string>;
using TDataFrameUPtr = std::unique_ptr<core::CDataFrame>;
using TTemporaryDirectoryPtr = std::shared_ptr<core::CTemporaryDirectory>;
using TDataAdderUPtr = std::unique_ptr<ml::core::CDataAdder>;
using TPersisterSupplier = std::function<TDataAdderUPtr()>;
using TDataSearcherUPtr = std::unique_ptr<ml::core::CDataSearcher>;
using TRestoreSearcherSupplier = std::function<TDataSearcherUPtr()>;
using TDataFrameUPtrTemporaryDirectoryPtrPr =
std::pair<TDataFrameUPtr, TTemporaryDirectoryPtr>;
using TRunnerUPtr = std::unique_ptr<CDataFrameAnalysisRunner>;
using TRunnerFactoryUPtr = std::unique_ptr<CDataFrameAnalysisRunnerFactory>;
using TRunnerFactoryUPtrVec = std::vector<TRunnerFactoryUPtr>;
public:
static const std::string JOB_ID;
static const std::string ROWS;
static const std::string COLS;
static const std::string MEMORY_LIMIT;
static const std::string THREADS;
static const std::string TEMPORARY_DIRECTORY;
static const std::string RESULTS_FIELD;
static const std::string MISSING_FIELD_VALUE;
static const std::string CATEGORICAL_FIELD_NAMES;
static const std::string DISK_USAGE_ALLOWED;
static const std::string ANALYSIS;
static const std::string NAME;
static const std::string PARAMETERS;
public:
//! Initialize from a JSON object.
//!
//! The specification has the following expected form:
//! <CODE>
//! {
//! "job_id": <string>,
//! "rows": <integer>,
//! "cols": <integer>,
//! "memory_limit": <integer>,
//! "threads": <integer>,
//! "temp_dir": <string>,
//! "results_field": <string>,
//! "categorical_fields": [<string>],
//! "disk_usage_allowed": <boolean>,
//! "analysis": {
//! "name": <string>,
//! "parameters": <object>
//! }
//! }
//! </CODE>
//!
//! \param[in] jsonSpecification The specification as a JSON object.
//! \note The analysis name must be one of the supported analysis types.
//! \note All constraints must be positive.
//! \note The parameters, if any, must be consistent for the analysis type.
//! \note If this fails the state is set to bad and the analysis will not run.
//! \note temp_dir Is a directory which can be used to store the data frame
//! out-of-core if we can't meet the memory constraint for the analysis without
//! partitioning.
//! \param persisterSupplier Shared pointer to the CDataAdder instance.
explicit CDataFrameAnalysisSpecification(
const std::string& jsonSpecification,
TDataFrameUPtrTemporaryDirectoryPtrPr* frameAndDirectory = nullptr,
TPersisterSupplier persisterSupplier = noopPersisterSupplier,
TRestoreSearcherSupplier restoreSearcherSupplier = noopRestoreSearcherSupplier);
//! This construtor provides support for custom analysis types and is mainly
//! intended for testing.
//!
//! \param[in] runnerFactories Plugins for the supported analyses.
CDataFrameAnalysisSpecification(
TRunnerFactoryUPtrVec runnerFactories,
const std::string& jsonSpecification,
TDataFrameUPtrTemporaryDirectoryPtrPr* frameAndDirectory = nullptr,
TPersisterSupplier persisterSupplier = noopPersisterSupplier,
TRestoreSearcherSupplier restoreSearcherSupplier = noopRestoreSearcherSupplier);
CDataFrameAnalysisSpecification(const CDataFrameAnalysisSpecification&) = delete;
CDataFrameAnalysisSpecification& operator=(const CDataFrameAnalysisSpecification&) = delete;
CDataFrameAnalysisSpecification(CDataFrameAnalysisSpecification&&) = delete;
CDataFrameAnalysisSpecification& operator=(CDataFrameAnalysisSpecification&&) = delete;
//! \return The number of rows in the frame.
std::size_t numberRows() const;
//! \return The number of columns in the input frame.
std::size_t numberColumns() const;
//! \return The number of columns the analysis configured to run will append
//! to the data frame.
std::size_t numberExtraColumns() const;
//! \return The memory usage limit for the process.
std::size_t memoryLimit() const;
//! \return The number of threads the analysis can use.
std::size_t numberThreads() const;
//! \return The name of the results field.
const std::string& resultsField() const;
//! \return The jobId.
const std::string& jobId() const;
//! \return The analysis name.
const std::string& analysisName() const;
//! \return The special string signifying a missing value.
const std::string& missingFieldValue() const;
//! \return The names of the categorical fields.
const TStrVec& categoricalFieldNames() const;
//! \return If it is allowed to overflow data frame to the disk if it doesn't
//! fit in memory.
bool diskUsageAllowed() const;
//! \return The temporary directory if this analysis is using disk storage.
const std::string& temporaryDirectory() const;
//! Validate if \p frame is suitable for running the analysis on.
bool validate(const core::CDataFrame& frame) const;
//! \return A handle to the object responsible for running the analysis.
CDataFrameAnalysisRunner* runner();
//! Estimates memory usage in two cases:
//! 1. disk is not used (the whole data frame fits in main memory)
//! 2. disk is used (only one partition needs to be loaded to main memory)
void estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const;
//! \return The stream to which to persist state if there is one.
TDataAdderUPtr persister() const;
//! \return The stream from which to retore state if there is one.
TDataSearcherUPtr restoreSearcher() const;
//! Persister supplier without any action.
static TDataAdderUPtr noopPersisterSupplier();
//! Restore search supplier without any action.
static TDataSearcherUPtr noopRestoreSearcherSupplier();
private:
void initializeRunner(const json::value& jsonAnalysis,
TDataFrameUPtrTemporaryDirectoryPtrPr* frameAndDirectory);
private:
std::size_t m_NumberRows = 0;
std::size_t m_NumberColumns = 0;
std::size_t m_MemoryLimit = 0;
std::size_t m_NumberThreads = 0;
std::string m_TemporaryDirectory;
std::string m_ResultsField;
std::string m_JobId;
std::string m_AnalysisName;
std::string m_MissingFieldValue;
TStrVec m_CategoricalFieldNames;
bool m_DiskUsageAllowed;
// TODO Sparse table support
// double m_TableLoadFactor = 0.0;
TRunnerFactoryUPtrVec m_RunnerFactories;
TRunnerUPtr m_Runner;
TPersisterSupplier m_PersisterSupplier;
TRestoreSearcherSupplier m_RestoreSearcherSupplier;
};
}
}
#endif // INCLUDED_ml_api_CDataFrameAnalysisSpecification_h