lib/api/CDataFrameOutliersRunner.cc (169 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <api/CDataFrameOutliersRunner.h> #include <core/CBoostJsonConcurrentLineWriter.h> #include <core/CDataFrame.h> #include <core/CLogger.h> #include <core/CProgramCounters.h> #include <maths/analytics/COutliers.h> #include <api/CDataFrameAnalysisConfigReader.h> #include <api/CDataFrameAnalysisSpecification.h> #include <boost/json.hpp> #include <algorithm> #include <iterator> #include <string> namespace json = boost::json; namespace ml { namespace api { namespace { const CDataFrameAnalysisConfigReader& parameterReader() { static const CDataFrameAnalysisConfigReader PARAMETER_READER{[] { CDataFrameAnalysisConfigReader theReader; theReader.addParameter(CDataFrameOutliersRunner::STANDARDIZATION_ENABLED, CDataFrameAnalysisConfigReader::E_OptionalParameter); theReader.addParameter(CDataFrameOutliersRunner::N_NEIGHBORS, CDataFrameAnalysisConfigReader::E_OptionalParameter); theReader.addParameter( CDataFrameOutliersRunner::METHOD, CDataFrameAnalysisConfigReader::E_OptionalParameter, {{maths::analytics::COutliers::LOF, int{maths::analytics::COutliers::E_Lof}}, {maths::analytics::COutliers::LDOF, int{maths::analytics::COutliers::E_Ldof}}, {maths::analytics::COutliers::DISTANCE_KNN, int{maths::analytics::COutliers::E_DistancekNN}}, {maths::analytics::COutliers::TOTAL_DISTANCE_KNN, int{maths::analytics::COutliers::E_TotalDistancekNN}}}); theReader.addParameter(CDataFrameOutliersRunner::COMPUTE_FEATURE_INFLUENCE, CDataFrameAnalysisConfigReader::E_OptionalParameter); theReader.addParameter(CDataFrameOutliersRunner::FEATURE_INFLUENCE_THRESHOLD, CDataFrameAnalysisConfigReader::E_OptionalParameter); theReader.addParameter(CDataFrameOutliersRunner::OUTLIER_FRACTION, CDataFrameAnalysisConfigReader::E_OptionalParameter); return theReader; }()}; return PARAMETER_READER; } // Output const std::string OUTLIER_SCORE_FIELD_NAME{"outlier_score"}; const std::string FEATURE_NAME_FIELD_NAME{"feature_name"}; const std::string FEATURE_INFLUENCE_FIELD_NAME{"feature_influence"}; const std::string INFLUENCE_FIELD_NAME{"influence"}; } CDataFrameOutliersRunner::CDataFrameOutliersRunner(const CDataFrameAnalysisSpecification& spec, const CDataFrameAnalysisParameters& parameters) : CDataFrameOutliersRunner{spec} { this->computeAndSaveExecutionStrategy(); m_StandardizationEnabled = parameters[STANDARDIZATION_ENABLED].fallback(true); m_NumberNeighbours = parameters[N_NEIGHBORS].fallback(std::size_t{0}); m_Method = parameters[METHOD].fallback(maths::analytics::COutliers::E_Ensemble); m_ComputeFeatureInfluence = parameters[COMPUTE_FEATURE_INFLUENCE].fallback(true); m_FeatureInfluenceThreshold = parameters[FEATURE_INFLUENCE_THRESHOLD].fallback(0.1); m_OutlierFraction = parameters[OUTLIER_FRACTION].fallback(0.05); m_Instrumentation.featureInfluenceThreshold(m_FeatureInfluenceThreshold); } CDataFrameOutliersRunner::CDataFrameOutliersRunner(const CDataFrameAnalysisSpecification& spec) : CDataFrameAnalysisRunner{spec}, m_Method{static_cast<std::size_t>( maths::analytics::COutliers::E_Ensemble)}, m_Instrumentation{spec.jobId(), spec.memoryLimit()} { this->computeAndSaveExecutionStrategy(); } std::size_t CDataFrameOutliersRunner::numberExtraColumns() const { return m_ComputeFeatureInfluence ? this->spec().numberColumns() + 1 : 1; } std::size_t CDataFrameOutliersRunner::dataFrameSliceCapacity() const { return core::dataFrameDefaultSliceCapacity(this->spec().numberColumns() + this->numberExtraColumns()); } core::CPackedBitVector CDataFrameOutliersRunner::rowsToWriteMask(const core::CDataFrame& frame) const { return {frame.numberRows(), true}; } void CDataFrameOutliersRunner::writeOneRow(const core::CDataFrame& frame, const TRowRef& row, core::CBoostJsonConcurrentLineWriter& writer) const { std::size_t scoreColumn{row.numberColumns() - this->numberExtraColumns()}; std::size_t beginFeatureScoreColumns{scoreColumn + 1}; std::size_t numberFeatureScoreColumns{this->numberExtraColumns() - 1}; writer.onObjectBegin(); writer.onKey(OUTLIER_SCORE_FIELD_NAME); writer.onDouble(row[scoreColumn]); if (row[scoreColumn] > m_FeatureInfluenceThreshold && numberFeatureScoreColumns > 0) { writer.onKey(FEATURE_INFLUENCE_FIELD_NAME); writer.onArrayBegin(); for (std::size_t i = 0; i < numberFeatureScoreColumns; ++i) { writer.onObjectBegin(); writer.onKey(FEATURE_NAME_FIELD_NAME); writer.onString(frame.columnNames()[i]); writer.onKey(INFLUENCE_FIELD_NAME); writer.onDouble(row[beginFeatureScoreColumns + i]); writer.onObjectEnd(); } writer.onArrayEnd(); } writer.onObjectEnd(); } bool CDataFrameOutliersRunner::validate(const core::CDataFrame& frame) const { if (frame.numberColumns() < 1) { HANDLE_FATAL(<< "Input error: analysis needs at least one feature"); return false; } return true; } const CDataFrameAnalysisInstrumentation& CDataFrameOutliersRunner::instrumentation() const { return m_Instrumentation; } CDataFrameAnalysisInstrumentation& CDataFrameOutliersRunner::instrumentation() { return m_Instrumentation; } void CDataFrameOutliersRunner::runImpl(core::CDataFrame& frame) { core::CProgramCounters::counter(counter_t::E_DFONumberPartitions) = this->numberPartitions(); core::CProgramCounters::counter(counter_t::E_DFOEstimatedPeakMemoryUsage) = this->estimateMemoryUsage(frame.numberRows(), frame.numberRows() / this->numberPartitions(), frame.numberColumns()); maths::analytics::COutliers::SComputeParameters params{ this->spec().numberThreads(), this->numberPartitions(), m_StandardizationEnabled, static_cast<maths::analytics::COutliers::EMethod>(m_Method), m_NumberNeighbours, m_ComputeFeatureInfluence, m_OutlierFraction}; maths::analytics::COutliers::compute(params, frame, m_Instrumentation); } std::size_t CDataFrameOutliersRunner::estimateBookkeepingMemoryUsage(std::size_t numberPartitions, std::size_t totalNumberRows, std::size_t partitionNumberRows, std::size_t numberColumns) const { maths::analytics::COutliers::SComputeParameters params{ this->spec().numberThreads(), numberPartitions, m_StandardizationEnabled, static_cast<maths::analytics::COutliers::EMethod>(m_Method), m_NumberNeighbours, m_ComputeFeatureInfluence, m_OutlierFraction}; return maths::analytics::COutliers::estimateMemoryUsedByCompute( params, totalNumberRows, partitionNumberRows, numberColumns); } const std::string CDataFrameOutliersRunner::STANDARDIZATION_ENABLED{"standardization_enabled"}; const std::string CDataFrameOutliersRunner::N_NEIGHBORS{"n_neighbors"}; const std::string CDataFrameOutliersRunner::METHOD{"method"}; const std::string CDataFrameOutliersRunner::COMPUTE_FEATURE_INFLUENCE{"compute_feature_influence"}; const std::string CDataFrameOutliersRunner::FEATURE_INFLUENCE_THRESHOLD{"feature_influence_threshold"}; const std::string CDataFrameOutliersRunner::OUTLIER_FRACTION{"outlier_fraction"}; const std::string& CDataFrameOutliersRunnerFactory::name() const { return NAME; } CDataFrameOutliersRunnerFactory::TRunnerUPtr CDataFrameOutliersRunnerFactory::makeImpl(const CDataFrameAnalysisSpecification& spec, TDataFrameUPtrTemporaryDirectoryPtrPr*) const { return std::make_unique<CDataFrameOutliersRunner>(spec); } CDataFrameOutliersRunnerFactory::TRunnerUPtr CDataFrameOutliersRunnerFactory::makeImpl(const CDataFrameAnalysisSpecification& spec, const json::value& jsonParameters, TDataFrameUPtrTemporaryDirectoryPtrPr*) const { auto parameters = parameterReader().read(jsonParameters); return std::make_unique<CDataFrameOutliersRunner>(spec, parameters); } const std::string CDataFrameOutliersRunnerFactory::NAME{"outlier_detection"}; } }