lib/api/CDataFrameOutliersRunner.cc (169 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <api/CDataFrameOutliersRunner.h>
#include <core/CBoostJsonConcurrentLineWriter.h>
#include <core/CDataFrame.h>
#include <core/CLogger.h>
#include <core/CProgramCounters.h>
#include <maths/analytics/COutliers.h>
#include <api/CDataFrameAnalysisConfigReader.h>
#include <api/CDataFrameAnalysisSpecification.h>
#include <boost/json.hpp>
#include <algorithm>
#include <iterator>
#include <string>
namespace json = boost::json;
namespace ml {
namespace api {
namespace {
const CDataFrameAnalysisConfigReader& parameterReader() {
static const CDataFrameAnalysisConfigReader PARAMETER_READER{[] {
CDataFrameAnalysisConfigReader theReader;
theReader.addParameter(CDataFrameOutliersRunner::STANDARDIZATION_ENABLED,
CDataFrameAnalysisConfigReader::E_OptionalParameter);
theReader.addParameter(CDataFrameOutliersRunner::N_NEIGHBORS,
CDataFrameAnalysisConfigReader::E_OptionalParameter);
theReader.addParameter(
CDataFrameOutliersRunner::METHOD, CDataFrameAnalysisConfigReader::E_OptionalParameter,
{{maths::analytics::COutliers::LOF, int{maths::analytics::COutliers::E_Lof}},
{maths::analytics::COutliers::LDOF, int{maths::analytics::COutliers::E_Ldof}},
{maths::analytics::COutliers::DISTANCE_KNN,
int{maths::analytics::COutliers::E_DistancekNN}},
{maths::analytics::COutliers::TOTAL_DISTANCE_KNN,
int{maths::analytics::COutliers::E_TotalDistancekNN}}});
theReader.addParameter(CDataFrameOutliersRunner::COMPUTE_FEATURE_INFLUENCE,
CDataFrameAnalysisConfigReader::E_OptionalParameter);
theReader.addParameter(CDataFrameOutliersRunner::FEATURE_INFLUENCE_THRESHOLD,
CDataFrameAnalysisConfigReader::E_OptionalParameter);
theReader.addParameter(CDataFrameOutliersRunner::OUTLIER_FRACTION,
CDataFrameAnalysisConfigReader::E_OptionalParameter);
return theReader;
}()};
return PARAMETER_READER;
}
// Output
const std::string OUTLIER_SCORE_FIELD_NAME{"outlier_score"};
const std::string FEATURE_NAME_FIELD_NAME{"feature_name"};
const std::string FEATURE_INFLUENCE_FIELD_NAME{"feature_influence"};
const std::string INFLUENCE_FIELD_NAME{"influence"};
}
CDataFrameOutliersRunner::CDataFrameOutliersRunner(const CDataFrameAnalysisSpecification& spec,
const CDataFrameAnalysisParameters& parameters)
: CDataFrameOutliersRunner{spec} {
this->computeAndSaveExecutionStrategy();
m_StandardizationEnabled = parameters[STANDARDIZATION_ENABLED].fallback(true);
m_NumberNeighbours = parameters[N_NEIGHBORS].fallback(std::size_t{0});
m_Method = parameters[METHOD].fallback(maths::analytics::COutliers::E_Ensemble);
m_ComputeFeatureInfluence = parameters[COMPUTE_FEATURE_INFLUENCE].fallback(true);
m_FeatureInfluenceThreshold = parameters[FEATURE_INFLUENCE_THRESHOLD].fallback(0.1);
m_OutlierFraction = parameters[OUTLIER_FRACTION].fallback(0.05);
m_Instrumentation.featureInfluenceThreshold(m_FeatureInfluenceThreshold);
}
CDataFrameOutliersRunner::CDataFrameOutliersRunner(const CDataFrameAnalysisSpecification& spec)
: CDataFrameAnalysisRunner{spec}, m_Method{static_cast<std::size_t>(
maths::analytics::COutliers::E_Ensemble)},
m_Instrumentation{spec.jobId(), spec.memoryLimit()} {
this->computeAndSaveExecutionStrategy();
}
std::size_t CDataFrameOutliersRunner::numberExtraColumns() const {
return m_ComputeFeatureInfluence ? this->spec().numberColumns() + 1 : 1;
}
std::size_t CDataFrameOutliersRunner::dataFrameSliceCapacity() const {
return core::dataFrameDefaultSliceCapacity(this->spec().numberColumns() +
this->numberExtraColumns());
}
core::CPackedBitVector
CDataFrameOutliersRunner::rowsToWriteMask(const core::CDataFrame& frame) const {
return {frame.numberRows(), true};
}
void CDataFrameOutliersRunner::writeOneRow(const core::CDataFrame& frame,
const TRowRef& row,
core::CBoostJsonConcurrentLineWriter& writer) const {
std::size_t scoreColumn{row.numberColumns() - this->numberExtraColumns()};
std::size_t beginFeatureScoreColumns{scoreColumn + 1};
std::size_t numberFeatureScoreColumns{this->numberExtraColumns() - 1};
writer.onObjectBegin();
writer.onKey(OUTLIER_SCORE_FIELD_NAME);
writer.onDouble(row[scoreColumn]);
if (row[scoreColumn] > m_FeatureInfluenceThreshold && numberFeatureScoreColumns > 0) {
writer.onKey(FEATURE_INFLUENCE_FIELD_NAME);
writer.onArrayBegin();
for (std::size_t i = 0; i < numberFeatureScoreColumns; ++i) {
writer.onObjectBegin();
writer.onKey(FEATURE_NAME_FIELD_NAME);
writer.onString(frame.columnNames()[i]);
writer.onKey(INFLUENCE_FIELD_NAME);
writer.onDouble(row[beginFeatureScoreColumns + i]);
writer.onObjectEnd();
}
writer.onArrayEnd();
}
writer.onObjectEnd();
}
bool CDataFrameOutliersRunner::validate(const core::CDataFrame& frame) const {
if (frame.numberColumns() < 1) {
HANDLE_FATAL(<< "Input error: analysis needs at least one feature");
return false;
}
return true;
}
const CDataFrameAnalysisInstrumentation& CDataFrameOutliersRunner::instrumentation() const {
return m_Instrumentation;
}
CDataFrameAnalysisInstrumentation& CDataFrameOutliersRunner::instrumentation() {
return m_Instrumentation;
}
void CDataFrameOutliersRunner::runImpl(core::CDataFrame& frame) {
core::CProgramCounters::counter(counter_t::E_DFONumberPartitions) =
this->numberPartitions();
core::CProgramCounters::counter(counter_t::E_DFOEstimatedPeakMemoryUsage) =
this->estimateMemoryUsage(frame.numberRows(),
frame.numberRows() / this->numberPartitions(),
frame.numberColumns());
maths::analytics::COutliers::SComputeParameters params{
this->spec().numberThreads(),
this->numberPartitions(),
m_StandardizationEnabled,
static_cast<maths::analytics::COutliers::EMethod>(m_Method),
m_NumberNeighbours,
m_ComputeFeatureInfluence,
m_OutlierFraction};
maths::analytics::COutliers::compute(params, frame, m_Instrumentation);
}
std::size_t
CDataFrameOutliersRunner::estimateBookkeepingMemoryUsage(std::size_t numberPartitions,
std::size_t totalNumberRows,
std::size_t partitionNumberRows,
std::size_t numberColumns) const {
maths::analytics::COutliers::SComputeParameters params{
this->spec().numberThreads(),
numberPartitions,
m_StandardizationEnabled,
static_cast<maths::analytics::COutliers::EMethod>(m_Method),
m_NumberNeighbours,
m_ComputeFeatureInfluence,
m_OutlierFraction};
return maths::analytics::COutliers::estimateMemoryUsedByCompute(
params, totalNumberRows, partitionNumberRows, numberColumns);
}
const std::string CDataFrameOutliersRunner::STANDARDIZATION_ENABLED{"standardization_enabled"};
const std::string CDataFrameOutliersRunner::N_NEIGHBORS{"n_neighbors"};
const std::string CDataFrameOutliersRunner::METHOD{"method"};
const std::string CDataFrameOutliersRunner::COMPUTE_FEATURE_INFLUENCE{"compute_feature_influence"};
const std::string CDataFrameOutliersRunner::FEATURE_INFLUENCE_THRESHOLD{"feature_influence_threshold"};
const std::string CDataFrameOutliersRunner::OUTLIER_FRACTION{"outlier_fraction"};
const std::string& CDataFrameOutliersRunnerFactory::name() const {
return NAME;
}
CDataFrameOutliersRunnerFactory::TRunnerUPtr
CDataFrameOutliersRunnerFactory::makeImpl(const CDataFrameAnalysisSpecification& spec,
TDataFrameUPtrTemporaryDirectoryPtrPr*) const {
return std::make_unique<CDataFrameOutliersRunner>(spec);
}
CDataFrameOutliersRunnerFactory::TRunnerUPtr
CDataFrameOutliersRunnerFactory::makeImpl(const CDataFrameAnalysisSpecification& spec,
const json::value& jsonParameters,
TDataFrameUPtrTemporaryDirectoryPtrPr*) const {
auto parameters = parameterReader().read(jsonParameters);
return std::make_unique<CDataFrameOutliersRunner>(spec, parameters);
}
const std::string CDataFrameOutliersRunnerFactory::NAME{"outlier_detection"};
}
}