lib/maths/analytics/CDataFrameUtils.cc (1,227 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <maths/analytics/CDataFrameUtils.h> #include <core/CLogger.h> #include <core/CPackedBitVector.h> #include <core/Concurrency.h> #include <maths/analytics/CDataFrameCategoryEncoder.h> #include <maths/analytics/CMic.h> #include <maths/common/CBasicStatistics.h> #include <maths/common/CLbfgs.h> #include <maths/common/CLinearAlgebraEigen.h> #include <maths/common/CMathsFuncs.h> #include <maths/common/COrderings.h> #include <maths/common/CPRNG.h> #include <maths/common/CQuantileSketch.h> #include <maths/common/CSampling.h> #include <maths/common/CSolvers.h> #include <maths/common/CTools.h> #include <maths/common/CToolsDetail.h> #include <boost/unordered_map.hpp> #include <cmath> #include <limits> #include <memory> #include <numeric> #include <vector> namespace ml { namespace maths { namespace analytics { namespace { using TDoubleVec = std::vector<double>; using TSizeVec = std::vector<std::size_t>; using TSizeVecVec = std::vector<TSizeVec>; using TFloatVec = std::vector<common::CFloatStorage>; using TFloatVecVec = std::vector<TFloatVec>; using TRowItr = core::CDataFrame::TRowItr; using TRowRef = core::CDataFrame::TRowRef; using TRowSampler = common::CSampling::CReservoirSampler<TRowRef>; using TRowSamplerVec = std::vector<TRowSampler>; using TSizeEncoderPtrUMap = boost::unordered_map<std::size_t, std::unique_ptr<CDataFrameUtils::CColumnValue>>; using TPackedBitVectorVec = CDataFrameUtils::TPackedBitVectorVec; using TDoubleVector = CDataFrameUtils::TDoubleVector; //! Reduce the results of a call to core::CDataFrame::readRows using \p reduceFirst //! for the first and \p reduce for the rest and writing the result \p reduction. //! //! \tparam REDUCER Must be a binary operator whose logical type is the function //! void (typeof(READER.s_FunctionState[0]), REDUCTION&). template<typename READER, typename REDUCER, typename FIRST_REDUCER, typename REDUCTION> bool doReduce(std::pair<std::vector<READER>, bool> readResults, FIRST_REDUCER reduceFirst, REDUCER reduce, REDUCTION& reduction) { if (readResults.second == false) { return false; } reduceFirst(std::move(readResults.first[0].s_FunctionState), reduction); for (std::size_t i = 1; i < readResults.first.size(); ++i) { reduce(std::move(readResults.first[i].s_FunctionState), reduction); } return true; } //! \brief Manages stratified sampling. class CStratifiedSampler { public: using TSamplerSelector = std::function<std::size_t(const TRowRef&)>; public: explicit CStratifiedSampler(std::size_t size) : m_SampledRowIndices(size) { m_DesiredCounts.reserve(size); m_Samplers.reserve(size); } void sample(const TRowRef& row) { m_Samplers[m_Selector(row)].sample(row); } //! Add one of the strata samplers. void addSampler(std::size_t count, common::CPRNG::CXorOShiro128Plus rng) { TSizeVec& samples{m_SampledRowIndices[m_Samplers.size()]}; samples.reserve(count); auto sampler = [&](std::size_t slot, const TRowRef& row) { if (slot >= samples.size()) { samples.resize(slot + 1); } samples[slot] = row.index(); }; m_DesiredCounts.push_back(count); m_Samplers.emplace_back(count, sampler, rng); } //! Define the callback to select the sampler. void samplerSelector(TSamplerSelector selector) { m_Selector = std::move(selector); } //! This selects the final samples, writing to \p result, and resets the sampling //! state so this is ready to sample again. void finishSampling(common::CPRNG::CXorOShiro128Plus& rng, TSizeVec& result) { result.clear(); for (std::size_t i = 0; i < m_SampledRowIndices.size(); ++i) { std::size_t sampleSize{m_Samplers[i].sampleSize()}; std::size_t desiredCount{std::min(m_DesiredCounts[i], sampleSize)}; common::CSampling::random_shuffle(rng, m_SampledRowIndices[i].begin(), m_SampledRowIndices[i].begin() + sampleSize); result.insert(result.end(), m_SampledRowIndices[i].begin(), m_SampledRowIndices[i].begin() + desiredCount); m_SampledRowIndices[i].clear(); m_Samplers[i].reset(); } } private: TSizeVec m_DesiredCounts; TSizeVecVec m_SampledRowIndices; TRowSamplerVec m_Samplers; TSamplerSelector m_Selector; }; using TStratifiedSamplerUPtr = std::unique_ptr<CStratifiedSampler>; //! Get a classifier stratified row sampler for cross fold validation. std::pair<TStratifiedSamplerUPtr, TDoubleVec> classifierStratifiedCrossValidationRowSampler(std::size_t numberThreads, const core::CDataFrame& frame, std::size_t targetColumn, common::CPRNG::CXorOShiro128Plus rng, std::size_t desiredCount, const core::CPackedBitVector& rowMask) { TDoubleVec categoryFrequencies{CDataFrameUtils::categoryFrequencies( numberThreads, frame, rowMask, {targetColumn})[targetColumn]}; LOG_TRACE(<< "category frequencies = " << categoryFrequencies); TSizeVec categoryCounts; common::CSampling::weightedSample(desiredCount, categoryFrequencies, categoryCounts); LOG_TRACE(<< "desired category counts per test fold = " << categoryCounts); auto sampler = std::make_unique<CStratifiedSampler>(categoryCounts.size()); for (auto categoryCount : categoryCounts) { sampler->addSampler(categoryCount, rng); } sampler->samplerSelector([targetColumn](const TRowRef& row) mutable { return static_cast<std::size_t>(row[targetColumn]); }); return {std::move(sampler), std::move(categoryFrequencies)}; } //! Get a regression stratified row sampler for cross fold validation. TStratifiedSamplerUPtr regressionStratifiedCrossValidationRowSampler(std::size_t numberThreads, const core::CDataFrame& frame, std::size_t targetColumn, common::CPRNG::CXorOShiro128Plus rng, std::size_t desiredCount, std::size_t numberBuckets, const core::CPackedBitVector& rowMask) { auto quantiles = CDataFrameUtils::columnQuantiles( numberThreads, frame, rowMask, {targetColumn}, common::CFastQuantileSketch{75, common::CPRNG::CXorOShiro128Plus{}, 0.9}) .first; TDoubleVec buckets; for (double step = 100.0 / static_cast<double>(numberBuckets), percentile = step; percentile < 100.0; percentile += step) { double xQuantile; quantiles[0].quantile(percentile, xQuantile); buckets.push_back(xQuantile); } buckets.erase(std::unique(buckets.begin(), buckets.end()), buckets.end()); buckets.push_back(std::numeric_limits<double>::max()); LOG_TRACE(<< "buckets = " << buckets); auto bucketSelector = [buckets, targetColumn](const TRowRef& row) mutable { return static_cast<std::size_t>( std::upper_bound(buckets.begin(), buckets.end(), row[targetColumn]) - buckets.begin()); }; auto countBucketRows = core::bindRetrievableState( [&](TDoubleVec& bucketCounts, const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { bucketCounts[bucketSelector(*row)] += 1.0; } }, TDoubleVec(buckets.size(), 0.0)); auto copyBucketRowCounts = [](TDoubleVec counts_, TDoubleVec& counts) { counts = std::move(counts_); }; auto reduceBucketRowCounts = [](TDoubleVec counts_, TDoubleVec& counts) { for (std::size_t i = 0; i < counts.size(); ++i) { counts[i] += counts_[i]; } }; TDoubleVec bucketFrequencies; doReduce(frame.readRows(numberThreads, 0, frame.numberRows(), countBucketRows, &rowMask), copyBucketRowCounts, reduceBucketRowCounts, bucketFrequencies); double totalCount{std::accumulate(bucketFrequencies.begin(), bucketFrequencies.end(), 0.0)}; for (auto& frequency : bucketFrequencies) { frequency /= totalCount; } TSizeVec bucketCounts; common::CSampling::weightedSample(desiredCount, bucketFrequencies, bucketCounts); LOG_TRACE(<< "desired bucket counts per fold = " << bucketCounts); auto sampler = std::make_unique<CStratifiedSampler>(buckets.size()); for (std::size_t i = 0; i < buckets.size(); ++i) { sampler->addSampler(bucketCounts[i], rng); } sampler->samplerSelector(bucketSelector); return sampler; } TStratifiedSamplerUPtr classifierDistributionPreservingRowSampler(std::size_t numberThreads, const core::CDataFrame& frame, std::size_t targetColumn, common::CPRNG::CXorOShiro128Plus rng, const core::CPackedBitVector& rowMask) { TDoubleVec categoryCounts{CDataFrameUtils::categoryCounts( numberThreads, frame, rowMask, {targetColumn})[targetColumn]}; auto sampler = std::make_unique<CStratifiedSampler>(categoryCounts.size()); for (auto categoryCount : categoryCounts) { sampler->addSampler(static_cast<std::size_t>(categoryCount), rng); } sampler->samplerSelector([targetColumn](const TRowRef& row) { return static_cast<std::size_t>(row[targetColumn]); }); return sampler; } //! Get the test row masks corresponding to \p foldRowMasks. TPackedBitVectorVec complementRowMasks(const TPackedBitVectorVec& foldRowMasks, core::CPackedBitVector allRowsMask) { TPackedBitVectorVec complementFoldRowMasks(foldRowMasks.size(), std::move(allRowsMask)); for (std::size_t fold = 0; fold < foldRowMasks.size(); ++fold) { complementFoldRowMasks[fold] ^= foldRowMasks[fold]; } return complementFoldRowMasks; } //! Get a row feature sampler. template<typename TARGET> auto rowFeatureSampler(std::size_t i, const TARGET& target, TFloatVecVec& samples) { return [i, &target, &samples](std::size_t slot, const TRowRef& row) { if (slot >= samples.size()) { samples.resize(slot + 1, {0.0, 0.0}); } samples[slot][0] = row[i]; samples[slot][1] = target(row); }; } //! Get a row sampler. auto rowSampler(TFloatVecVec& samples) { return [&samples](std::size_t slot, const TRowRef& row) { if (slot >= samples.size()) { samples.resize(slot + 1, TFloatVec(row.numberColumns())); } row.copyTo(samples[slot].begin()); }; } template<typename TARGET> auto computeEncodedCategory(CMic& mic, const TARGET& target, TSizeEncoderPtrUMap& encoders, TFloatVecVec& samples) { CDataFrameUtils::TSizeDoublePrVec encodedMics; encodedMics.reserve(encoders.size()); for (const auto& encoder : encoders) { std::size_t category{encoder.first}; const auto& encode = *encoder.second; mic.clear(); for (const auto& sample : samples) { mic.add(encode(sample), target(sample)); } encodedMics.emplace_back(category, mic.compute()); } return encodedMics; } //! Check that all components are neither infinite nor NaN. bool allFinite(TDoubleVector x) { return std::all_of(x.begin(), x.end(), [](auto xi) { return common::CMathsFuncs::isFinite(xi); }); } const std::size_t NUMBER_SAMPLES_TO_COMPUTE_MIC{10000}; } std::string CDataFrameUtils::SDataType::toDelimited() const { // clang-format off return core::CStringUtils::typeToString(static_cast<int>(s_IsInteger)) + INTERNAL_DELIMITER + core::CStringUtils::typeToStringPrecise(s_Min, core::CIEEE754::E_DoublePrecision) + INTERNAL_DELIMITER + core::CStringUtils::typeToStringPrecise(s_Max, core::CIEEE754::E_DoublePrecision) + INTERNAL_DELIMITER; // clang-format on } bool CDataFrameUtils::SDataType::fromDelimited(const std::string& delimited) { TDoubleVec state(3); std::size_t pos{0}, i{0}; for (auto delimiter = delimited.find(INTERNAL_DELIMITER); delimiter != std::string::npos && i < state.size(); delimiter = delimited.find(INTERNAL_DELIMITER, pos)) { if (core::CStringUtils::stringToType(delimited.substr(pos, delimiter - pos), state[i++]) == false) { return false; } pos = delimiter + 1; } std::tie(s_IsInteger, s_Min, s_Max) = std::make_tuple(state[0] == 1.0, state[1], state[2]); return true; } const char CDataFrameUtils::SDataType::INTERNAL_DELIMITER{':'}; const char CDataFrameUtils::SDataType::EXTERNAL_DELIMITER{';'}; bool CDataFrameUtils::standardizeColumns(std::size_t numberThreads, core::CDataFrame& frame) { using TMeanVarAccumulatorVec = std::vector<common::CBasicStatistics::SSampleMeanVar<double>::TAccumulator>; if (frame.numberRows() == 0 || frame.numberColumns() == 0) { return true; } auto readColumnMoments = core::bindRetrievableState( [](TMeanVarAccumulatorVec& moments_, const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { for (std::size_t i = 0; i < row->numberColumns(); ++i) { if (isMissing((*row)[i]) == false) { moments_[i].add((*row)[i]); } } } }, TMeanVarAccumulatorVec(frame.numberColumns())); auto copyColumnMoments = [](TMeanVarAccumulatorVec moments_, TMeanVarAccumulatorVec& moments) { moments = std::move(moments_); }; auto reduceColumnMoments = [](TMeanVarAccumulatorVec moments_, TMeanVarAccumulatorVec& moments) { for (std::size_t i = 0; i < moments.size(); ++i) { moments[i] += moments_[i]; } }; TMeanVarAccumulatorVec moments; if (doReduce(frame.readRows(numberThreads, readColumnMoments), copyColumnMoments, reduceColumnMoments, moments) == false) { LOG_ERROR(<< "Failed to standardise columns"); return false; } TDoubleVec mean(moments.size()); TDoubleVec scale(moments.size()); for (std::size_t i = 0; i < moments.size(); ++i) { double variance{common::CBasicStatistics::variance(moments[i])}; mean[i] = common::CBasicStatistics::mean(moments[i]); scale[i] = variance == 0.0 ? 1.0 : 1.0 / std::sqrt(variance); } LOG_TRACE(<< "means = " << mean); LOG_TRACE(<< "scales = " << scale); auto standardiseColumns = [&mean, &scale](const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { for (std::size_t i = 0; i < row->numberColumns(); ++i) { row->writeColumn(i, scale[i] * ((*row)[i] - mean[i])); } } }; return frame.writeColumns(numberThreads, standardiseColumns).second; } CDataFrameUtils::TDataTypeVec CDataFrameUtils::columnDataTypes(std::size_t numberThreads, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, const TSizeVec& columnMask, const CDataFrameCategoryEncoder* encoder) { if (frame.numberRows() == 0) { return {}; } using TMinMax = common::CBasicStatistics::CMinMax<double>; using TMinMaxBoolPrVec = std::vector<std::pair<TMinMax, bool>>; auto readDataTypes = core::bindRetrievableState( [&](TMinMaxBoolPrVec& types, const TRowItr& beginRows, const TRowItr& endRows) { double integerPart; if (encoder != nullptr) { for (auto row = beginRows; row != endRows; ++row) { CEncodedDataFrameRowRef encodedRow{encoder->encode(*row)}; for (auto i : columnMask) { double value{encodedRow[i]}; if (isMissing(value) == false) { types[i].first.add(value); types[i].second = types[i].second && (std::modf(value, &integerPart) == 0.0); } } } } else { for (auto row = beginRows; row != endRows; ++row) { for (auto i : columnMask) { double value{(*row)[i]}; if (isMissing(value) == false) { types[i].first.add(value); types[i].second = types[i].second && (std::modf(value, &integerPart) == 0.0); } } } } }, TMinMaxBoolPrVec(encoder != nullptr ? encoder->numberEncodedColumns() : frame.numberColumns(), {TMinMax{}, true})); auto copyDataTypes = [](TMinMaxBoolPrVec types, TMinMaxBoolPrVec& result) { result = std::move(types); }; auto reduceDataTypes = [&](TMinMaxBoolPrVec types, TMinMaxBoolPrVec& result) { for (auto i : columnMask) { result[i].first += types[i].first; result[i].second = result[i].second && types[i].second; } }; TMinMaxBoolPrVec types; doReduce(frame.readRows(numberThreads, 0, frame.numberRows(), readDataTypes, &rowMask), copyDataTypes, reduceDataTypes, types); TDataTypeVec result(types.size()); for (auto i : columnMask) { result[i] = SDataType{types[i].second, types[i].first.min(), types[i].first.max()}; } return result; } std::pair<CDataFrameUtils::TFastQuantileSketchVec, bool> CDataFrameUtils::columnQuantiles(std::size_t numberThreads, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, const TSizeVec& columnMask, common::CFastQuantileSketch quantileEstimator, const CDataFrameCategoryEncoder* encoder, const TWeightFunc& weight) { auto readQuantiles = core::bindRetrievableState( [&](TFastQuantileSketchVec& quantiles, const TRowItr& beginRows, const TRowItr& endRows) { if (encoder != nullptr) { for (auto row_ = beginRows; row_ != endRows; ++row_) { auto row = *row_; auto encodedRow = encoder->encode(row); auto rowWeight = weight(row); for (std::size_t i = 0; i < columnMask.size(); ++i) { double feature{encodedRow[columnMask[i]]}; if (isMissing(feature) == false) { quantiles[i].add(feature, rowWeight); } } } } else { for (auto row_ = beginRows; row_ != endRows; ++row_) { auto row = *row_; auto rowWeight = weight(row); for (std::size_t i = 0; i < columnMask.size(); ++i) { double feature{row[columnMask[i]]}; if (isMissing(feature) == false) { quantiles[i].add(feature, rowWeight); } } } } }, TFastQuantileSketchVec(columnMask.size(), quantileEstimator)); auto copyQuantiles = [](TFastQuantileSketchVec quantiles, TFastQuantileSketchVec& result) { result = std::move(quantiles); }; auto reduceQuantiles = [&](TFastQuantileSketchVec quantiles, TFastQuantileSketchVec& result) { for (std::size_t i = 0; i < columnMask.size(); ++i) { result[i] += quantiles[i]; } }; TFastQuantileSketchVec result; if (doReduce(frame.readRows(numberThreads, 0, frame.numberRows(), readQuantiles, &rowMask), copyQuantiles, reduceQuantiles, result) == false) { LOG_ERROR(<< "Failed to compute column quantiles"); return {std::move(result), false}; } return {std::move(result), true}; } std::tuple<TPackedBitVectorVec, TPackedBitVectorVec, TDoubleVec> CDataFrameUtils::stratifiedCrossValidationRowMasks(std::size_t numberThreads, const core::CDataFrame& frame, std::size_t targetColumn, common::CPRNG::CXorOShiro128Plus rng, std::size_t numberFolds, double trainFractionPerFold, std::size_t numberBuckets, const core::CPackedBitVector& allTrainingRowMask) { double numberTrainingRows{allTrainingRowMask.manhattan()}; if (static_cast<std::size_t>(numberTrainingRows) < numberFolds) { HANDLE_FATAL(<< "Input error: insufficient training data provided."); return {{}, {}, {}}; } double sampleFraction{std::min(trainFractionPerFold, 1.0 - trainFractionPerFold)}; double excessSampleFraction{ std::max(sampleFraction - 1.0 / static_cast<double>(numberFolds), 0.0)}; // We sample the smaller of the test or train set in the loop. std::size_t excessSampleSize{static_cast<std::size_t>( std::ceil(excessSampleFraction * numberTrainingRows))}; std::size_t sampleSize{static_cast<std::size_t>(std::max( (1.0 + 1e-8) * (sampleFraction - excessSampleFraction) * numberTrainingRows, 1.0))}; LOG_TRACE(<< "excess sample size = " << excessSampleSize << ", sample size = " << sampleSize); TDoubleVec frequencies; auto makeSampler = [&](std::size_t size, const core::CPackedBitVector& rowMask) { TStratifiedSamplerUPtr result; if (size > 0) { if (frame.columnIsCategorical()[targetColumn]) { std::tie(result, frequencies) = classifierStratifiedCrossValidationRowSampler( numberThreads, frame, targetColumn, rng, size, rowMask); } else { result = regressionStratifiedCrossValidationRowSampler( numberThreads, frame, targetColumn, rng, size, numberBuckets, rowMask); } } return result; }; auto excessSampler = makeSampler(excessSampleSize, allTrainingRowMask); LOG_TRACE(<< "number training rows = " << allTrainingRowMask.manhattan()); TPackedBitVectorVec testingRowMasks(numberFolds); TSizeVec rowIndices; auto sample = [&](const TStratifiedSamplerUPtr& sampler_, const core::CPackedBitVector& rowMask) { frame.readRows(1, 0, frame.numberRows(), [&](const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { sampler_->sample(*row); } }, &rowMask); sampler_->finishSampling(rng, rowIndices); std::sort(rowIndices.begin(), rowIndices.end()); LOG_TRACE(<< "# row indices = " << rowIndices.size()); core::CPackedBitVector result; for (auto row : rowIndices) { result.extend(false, row - result.size()); result.extend(true); } result.extend(false, rowMask.size() - result.size()); return result; }; core::CPackedBitVector candidateTestingRowMask{allTrainingRowMask}; for (auto& testingRowMask : testingRowMasks) { if (static_cast<std::size_t>(candidateTestingRowMask.manhattan()) <= sampleSize) { testingRowMask = std::move(candidateTestingRowMask); candidateTestingRowMask = core::CPackedBitVector{testingRowMask.size(), false}; } else { auto sampler = makeSampler(sampleSize, candidateTestingRowMask); if (sampler == nullptr) { HANDLE_FATAL(<< "Internal error: failed to create train/test splits."); return {{}, {}, {}}; } testingRowMask = sample(sampler, candidateTestingRowMask); candidateTestingRowMask ^= testingRowMask; } if (excessSampler != nullptr) { testingRowMask |= sample(excessSampler, allTrainingRowMask ^ testingRowMask); } } TPackedBitVectorVec trainingRowMasks{complementRowMasks(testingRowMasks, allTrainingRowMask)}; if (trainFractionPerFold < 0.5) { std::swap(trainingRowMasks, testingRowMasks); } return {std::move(trainingRowMasks), std::move(testingRowMasks), std::move(frequencies)}; } core::CPackedBitVector CDataFrameUtils::stratifiedSamplingRowMask(std::size_t numberThreads, const core::CDataFrame& frame, std::size_t targetColumn, common::CPRNG::CXorOShiro128Plus rng, std::size_t desiredNumberSamples, std::size_t numberBuckets, const core::CPackedBitVector& allTrainingRowMask) { TDoubleVec frequencies; TStratifiedSamplerUPtr sampler; core::CPackedBitVector samplesRowMask; double numberTrainingRows{allTrainingRowMask.manhattan()}; if (numberTrainingRows < 2.0) { HANDLE_FATAL(<< "Input error: insufficient training data provided."); return {}; } if (frame.columnIsCategorical()[targetColumn]) { std::tie(sampler, frequencies) = classifierStratifiedCrossValidationRowSampler( numberThreads, frame, targetColumn, rng, desiredNumberSamples, allTrainingRowMask); } else { sampler = regressionStratifiedCrossValidationRowSampler( numberThreads, frame, targetColumn, rng, desiredNumberSamples, numberBuckets, allTrainingRowMask); } LOG_TRACE(<< "number training rows = " << allTrainingRowMask.manhattan()); TSizeVec rowIndices; core::CPackedBitVector candidateSamplesRowMask{allTrainingRowMask}; frame.readRows(1, 0, frame.numberRows(), [&](const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { sampler->sample(*row); } }, &candidateSamplesRowMask); sampler->finishSampling(rng, rowIndices); std::sort(rowIndices.begin(), rowIndices.end()); LOG_TRACE(<< "# row indices = " << rowIndices.size()); for (auto row : rowIndices) { samplesRowMask.extend(false, row - samplesRowMask.size()); samplesRowMask.extend(true); } samplesRowMask.extend(false, allTrainingRowMask.size() - samplesRowMask.size()); // We exclusive or here to remove the rows we've selected for the current //test fold. This is equivalent to samplng without replacement candidateSamplesRowMask ^= samplesRowMask; LOG_TRACE(<< "# selected rows = " << samplesRowMask.manhattan()); return samplesRowMask; } core::CPackedBitVector CDataFrameUtils::distributionPreservingSamplingRowMask( std::size_t numberThreads, const core::CDataFrame& frame, std::size_t targetColumn, common::CPRNG::CXorOShiro128Plus rng, std::size_t desiredNumberSamples, std::size_t numberBuckets, const core::CPackedBitVector& distributionSourceRowMask, const core::CPackedBitVector& allTrainingRowMask) { TStratifiedSamplerUPtr sampler; core::CPackedBitVector samplesRowMask; double numberTrainingRows{allTrainingRowMask.manhattan()}; if (numberTrainingRows < 2.0) { HANDLE_FATAL(<< "Input error: insufficient training data provided."); return {}; } if (frame.columnIsCategorical()[targetColumn]) { sampler = classifierDistributionPreservingRowSampler( numberThreads, frame, targetColumn, rng, distributionSourceRowMask); } else { sampler = regressionStratifiedCrossValidationRowSampler( numberThreads, frame, targetColumn, rng, desiredNumberSamples, numberBuckets, distributionSourceRowMask); } LOG_TRACE(<< "number training rows = " << allTrainingRowMask.manhattan()); TSizeVec rowIndices; frame.readRows(1, 0, frame.numberRows(), [&](const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { sampler->sample(*row); } }, &allTrainingRowMask); sampler->finishSampling(rng, rowIndices); std::sort(rowIndices.begin(), rowIndices.end()); LOG_TRACE(<< "# row indices = " << rowIndices.size()); for (auto row : rowIndices) { samplesRowMask.extend(false, row - samplesRowMask.size()); samplesRowMask.extend(true); } samplesRowMask.extend(false, allTrainingRowMask.size() - samplesRowMask.size()); LOG_TRACE(<< "# selected rows = " << samplesRowMask.manhattan()); return samplesRowMask; } CDataFrameUtils::TDoubleVecVec CDataFrameUtils::categoryFrequencies(std::size_t numberThreads, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, TSizeVec columnMask) { TDoubleVecVec result{CDataFrameUtils::categoryCounts(numberThreads, frame, rowMask, std::move(columnMask))}; double Z{rowMask.manhattan()}; for (auto& i : result) { for (double& j : i) { j /= Z; } } return result; } CDataFrameUtils::TDoubleVecVec CDataFrameUtils::categoryCounts(std::size_t numberThreads, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, TSizeVec columnMask) { removeMetricColumns(frame, columnMask); if (frame.numberRows() == 0 || columnMask.empty()) { return TDoubleVecVec(frame.numberColumns()); } // Note this can throw a length_error in resize hence the try block around read. auto readCategoryCounts = core::bindRetrievableState( [&](TDoubleVecVec& counts, const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { for (std::size_t i : columnMask) { if (isMissing((*row)[i]) == false) { std::size_t category{static_cast<std::size_t>((*row)[i])}; counts[i].resize(std::max(counts[i].size(), category + 1), 0.0); counts[i][category] += 1.0; } } } }, TDoubleVecVec(frame.numberColumns())); auto copyCategoryCounts = [](TDoubleVecVec counts, TDoubleVecVec& result) { result = std::move(counts); }; auto reduceCategoryCounts = [](TDoubleVecVec counts, TDoubleVecVec& result) { for (std::size_t i = 0; i < counts.size(); ++i) { result[i].resize(std::max(result[i].size(), counts[i].size()), 0.0); for (std::size_t j = 0; j < counts[i].size(); ++j) { result[i][j] += counts[i][j]; } } }; TDoubleVecVec result; try { doReduce(frame.readRows(numberThreads, 0, frame.numberRows(), readCategoryCounts, &rowMask), copyCategoryCounts, reduceCategoryCounts, result); } catch (const std::exception& e) { HANDLE_FATAL(<< "Internal error: '" << e.what() << "' exception calculating" << " category frequencies. Please report this problem."); } return result; } CDataFrameUtils::TDoubleVecVec CDataFrameUtils::meanValueOfTargetForCategories(const CColumnValue& target, std::size_t numberThreads, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, TSizeVec columnMask) { TDoubleVecVec result(frame.numberColumns()); removeMetricColumns(frame, columnMask); if (frame.numberRows() == 0 || columnMask.empty()) { return result; } using TMeanAccumulatorVec = std::vector<common::CBasicStatistics::SSampleMean<double>::TAccumulator>; using TMeanAccumulatorVecVec = std::vector<TMeanAccumulatorVec>; // Note this can throw a length_error in resize hence the try block around read. auto readColumnMeans = core::bindRetrievableState( [&](TMeanAccumulatorVecVec& means_, const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { for (std::size_t i : columnMask) { if (isMissing((*row)[i]) == false && isMissing(target(*row)) == false) { std::size_t category{static_cast<std::size_t>((*row)[i])}; means_[i].resize(std::max(means_[i].size(), category + 1)); means_[i][category].add(target(*row)); } } } }, TMeanAccumulatorVecVec(frame.numberColumns())); auto copyColumnMeans = [](TMeanAccumulatorVecVec means_, TMeanAccumulatorVecVec& means) { means = std::move(means_); }; auto reduceColumnMeans = [](TMeanAccumulatorVecVec means_, TMeanAccumulatorVecVec& means) { for (std::size_t i = 0; i < means_.size(); ++i) { means[i].resize(std::max(means[i].size(), means_[i].size())); for (std::size_t j = 0; j < means_[i].size(); ++j) { means[i][j] += means_[i][j]; } } }; TMeanAccumulatorVecVec means; try { doReduce(frame.readRows(numberThreads, 0, frame.numberRows(), readColumnMeans, &rowMask), copyColumnMeans, reduceColumnMeans, means); } catch (const std::exception& e) { HANDLE_FATAL(<< "Internal error: '" << e.what() << "' exception calculating" << " mean target values for categories. Please report this problem."); return result; } for (std::size_t i = 0; i < result.size(); ++i) { result[i].resize(means[i].size()); for (std::size_t j = 0; j < means[i].size(); ++j) { result[i][j] = common::CBasicStatistics::mean(means[i][j]); } } return result; } CDataFrameUtils::TSizeDoublePrVecVecVec CDataFrameUtils::categoricalMicWithColumn(const CColumnValue& target, std::size_t numberThreads, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, TSizeVec columnMask, const TEncoderFactoryVec& encoderFactories) { TSizeDoublePrVecVecVec none(encoderFactories.size(), TSizeDoublePrVecVec(frame.numberColumns())); removeMetricColumns(frame, columnMask); if (frame.numberRows() == 0 || columnMask.empty()) { return none; } auto method = frame.inMainMemory() ? categoricalMicWithColumnDataFrameInMemory : categoricalMicWithColumnDataFrameOnDisk; TDoubleVecVec frequencies(categoryFrequencies(numberThreads, frame, rowMask, columnMask)); LOG_TRACE(<< "frequencies = " << frequencies); TSizeDoublePrVecVecVec mics( method(target, frame, rowMask, columnMask, encoderFactories, frequencies, std::min(NUMBER_SAMPLES_TO_COMPUTE_MIC, frame.numberRows()))); for (auto& encoderMics : mics) { for (auto& categoryMics : encoderMics) { std::sort(categoryMics.begin(), categoryMics.end(), [](const TSizeDoublePr& lhs, const TSizeDoublePr& rhs) { return common::COrderings::lexicographicalCompare( -lhs.second, lhs.first, -rhs.second, rhs.first); }); } } return mics; } CDataFrameUtils::TDoubleVec CDataFrameUtils::metricMicWithColumn(const CColumnValue& target, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, TSizeVec columnMask) { TDoubleVec zeros(frame.numberColumns(), 0.0); removeCategoricalColumns(frame, columnMask); if (frame.numberRows() == 0 || columnMask.empty()) { return zeros; } auto method = frame.inMainMemory() ? metricMicWithColumnDataFrameInMemory : metricMicWithColumnDataFrameOnDisk; return method(target, frame, rowMask, columnMask, std::min(NUMBER_SAMPLES_TO_COMPUTE_MIC, frame.numberRows())); } CDataFrameUtils::TDoubleVector CDataFrameUtils::maximumMinimumRecallClassWeights(std::size_t numberThreads, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, std::size_t numberClasses, std::size_t targetColumn, const TReadPredictionFunc& readPrediction) { return numberClasses == 2 ? maximizeMinimumRecallForBinary(numberThreads, frame, rowMask, targetColumn, readPrediction) : maximizeMinimumRecallForMulticlass(numberThreads, frame, rowMask, numberClasses, targetColumn, readPrediction); } bool CDataFrameUtils::isMissing(double value) { return core::CDataFrame::isMissing(value); } CDataFrameUtils::TSizeDoublePrVecVecVec CDataFrameUtils::categoricalMicWithColumnDataFrameInMemory( const CColumnValue& target, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, const TSizeVec& columnMask, const TEncoderFactoryVec& encoderFactories, const TDoubleVecVec& frequencies, std::size_t numberSamples) { TSizeDoublePrVecVecVec encoderMics; encoderMics.reserve(encoderFactories.size()); TFloatVecVec samples; TSizeEncoderPtrUMap encoders; CMic mic; samples.reserve(numberSamples); mic.reserve(numberSamples); for (const auto& encoderFactory : encoderFactories) { TEncoderFactory makeEncoder; double minimumFrequency; std::tie(makeEncoder, minimumFrequency) = encoderFactory; TSizeDoublePrVecVec mics(frame.numberColumns()); for (auto i : columnMask) { // Sample samples.clear(); TRowSampler sampler{numberSamples, rowFeatureSampler(i, target, samples)}; frame.readRows( 1, 0, frame.numberRows(), [&](const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { if (isMissing((*row)[i]) || isMissing(target(*row))) { continue; } std::size_t category{static_cast<std::size_t>((*row)[i])}; if (frequencies[i][category] >= minimumFrequency) { sampler.sample(*row); } } }, &rowMask); LOG_TRACE(<< "# samples = " << samples.size()); // Setup encoders encoders.clear(); for (const auto& sample : samples) { std::size_t category{static_cast<std::size_t>(sample[0])}; auto encoder = makeEncoder(i, 0, category); std::size_t hash{encoder->hash()}; encoders.emplace(hash, std::move(encoder)); } auto target_ = [](const TFloatVec& sample) { return sample[1]; }; mics[i] = computeEncodedCategory(mic, target_, encoders, samples); } encoderMics.push_back(std::move(mics)); } return encoderMics; } CDataFrameUtils::TSizeDoublePrVecVecVec CDataFrameUtils::categoricalMicWithColumnDataFrameOnDisk( const CColumnValue& target, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, const TSizeVec& columnMask, const TEncoderFactoryVec& encoderFactories, const TDoubleVecVec& frequencies, std::size_t numberSamples) { TSizeDoublePrVecVecVec encoderMics; encoderMics.reserve(encoderFactories.size()); TFloatVecVec samples; TSizeEncoderPtrUMap encoders; CMic mic; samples.reserve(numberSamples); mic.reserve(numberSamples); for (const auto& encoderFactory : encoderFactories) { TEncoderFactory makeEncoder; double minimumFrequency; std::tie(makeEncoder, minimumFrequency) = encoderFactory; TSizeDoublePrVecVec mics(frame.numberColumns()); // Sample // // The law of large numbers means we have a high probability of sampling // each category provided minimumFrequency * NUMBER_SAMPLES_TO_COMPUTE_MIC // is large (which we ensure it is). samples.clear(); TRowSampler sampler{numberSamples, rowSampler(samples)}; frame.readRows(1, 0, frame.numberRows(), [&](const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { if (isMissing(target(*row)) == false) { sampler.sample(*row); } } }, &rowMask); LOG_TRACE(<< "# samples = " << samples.size()); for (auto i : columnMask) { // Setup encoders encoders.clear(); for (const auto& sample : samples) { if (isMissing(sample[i])) { continue; } std::size_t category{static_cast<std::size_t>(sample[i])}; if (frequencies[i][category] >= minimumFrequency) { auto encoder = makeEncoder(i, i, category); std::size_t hash{encoder->hash()}; encoders.emplace(hash, std::move(encoder)); } } mics[i] = computeEncodedCategory(mic, target, encoders, samples); } encoderMics.push_back(std::move(mics)); } return encoderMics; } CDataFrameUtils::TDoubleVec CDataFrameUtils::metricMicWithColumnDataFrameInMemory(const CColumnValue& target, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, const TSizeVec& columnMask, std::size_t numberSamples) { TDoubleVec mics(frame.numberColumns(), 0.0); TFloatVecVec samples; samples.reserve(numberSamples); double numberMaskedRows{rowMask.manhattan()}; for (auto i : columnMask) { // Do sampling TRowSampler sampler{numberSamples, rowFeatureSampler(i, target, samples)}; auto missingCount = frame.readRows( 1, 0, frame.numberRows(), core::bindRetrievableState( [&](std::size_t& missing, const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { if (isMissing((*row)[i])) { ++missing; } else if (isMissing(target(*row)) == false) { sampler.sample(*row); } } }, std::size_t{0}), &rowMask); LOG_TRACE(<< "# samples = " << samples.size()); double fractionMissing{static_cast<double>(missingCount.first[0].s_FunctionState) / numberMaskedRows}; LOG_TRACE(<< "feature = " << i << " fraction missing = " << fractionMissing); // Compute MICe CMic mic; mic.reserve(samples.size()); for (const auto& sample : samples) { mic.add(sample[0], sample[1]); } mics[i] = (1.0 - fractionMissing) * mic.compute(); samples.clear(); } return mics; } CDataFrameUtils::TDoubleVec CDataFrameUtils::metricMicWithColumnDataFrameOnDisk(const CColumnValue& target, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, const TSizeVec& columnMask, std::size_t numberSamples) { TDoubleVec mics(frame.numberColumns(), 0.0); TFloatVecVec samples; samples.reserve(numberSamples); double numberMaskedRows{rowMask.manhattan()}; // Do sampling TRowSampler sampler{numberSamples, rowSampler(samples)}; auto missingCounts = frame.readRows( 1, 0, frame.numberRows(), core::bindRetrievableState( [&](TSizeVec& missing, const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { for (std::size_t i = 0; i < row->numberColumns(); ++i) { missing[i] += isMissing((*row)[i]) ? 1 : 0; } if (isMissing(target(*row)) == false) { sampler.sample(*row); } } }, TSizeVec(frame.numberColumns(), 0)), &rowMask); LOG_TRACE(<< "# samples = " << samples.size()); TDoubleVec fractionMissing(frame.numberColumns()); for (std::size_t i = 0; i < fractionMissing.size(); ++i) { for (const auto& missingCount : missingCounts.first) { fractionMissing[i] += static_cast<double>(missingCount.s_FunctionState[i]) / numberMaskedRows; } } LOG_TRACE(<< "Fraction missing = " << fractionMissing); // Compute MICe for (auto i : columnMask) { CMic mic; mic.reserve(samples.size()); for (const auto& sample : samples) { if (isMissing(sample[i]) == false) { mic.add(sample[i], target(sample)); } } mics[i] = (1.0 - fractionMissing[i]) * mic.compute(); } return mics; } CDataFrameUtils::TDoubleVector CDataFrameUtils::maximizeMinimumRecallForBinary(std::size_t numberThreads, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, std::size_t targetColumn, const TReadPredictionFunc& readPrediction) { auto readQuantiles = core::bindRetrievableState( [&](TQuantileSketchVec& quantiles, const TRowItr& beginRows, const TRowItr& endRows) { TDoubleVector probabilities; for (auto row = beginRows; row != endRows; ++row) { if (isMissing((*row)[targetColumn]) == false) { std::size_t actualClass{static_cast<std::size_t>((*row)[targetColumn])}; if (actualClass < quantiles.size()) { probabilities = readPrediction(*row); if (allFinite(probabilities)) { common::CTools::inplaceSoftmax(probabilities); quantiles[actualClass].add(probabilities(1)); } else { LOG_WARN(<< "Ignoring unexpected probabilities " << probabilities); } } else { LOG_WARN(<< "Ignoring class " << actualClass << " which is out-of-range. " << "Should be less than " << quantiles.size() << ". Classes " << frame.categoricalColumnValues()[targetColumn] << "."); } } } }, TQuantileSketchVec(2, common::CQuantileSketch{100})); auto copyQuantiles = [](TQuantileSketchVec quantiles, TQuantileSketchVec& result) { result = std::move(quantiles); }; auto reduceQuantiles = [&](TQuantileSketchVec quantiles, TQuantileSketchVec& result) { for (std::size_t i = 0; i < 2; ++i) { result[i] += quantiles[i]; } }; TQuantileSketchVec classProbabilityClassOneQuantiles; if (doReduce(frame.readRows(numberThreads, 0, frame.numberRows(), readQuantiles, &rowMask), copyQuantiles, reduceQuantiles, classProbabilityClassOneQuantiles) == false) { HANDLE_FATAL(<< "Failed to compute category quantiles"); return TDoubleVector::Ones(2); } auto minRecall = [&](double threshold) -> double { double cdf[2]; classProbabilityClassOneQuantiles[0].cdf(threshold, cdf[0]); classProbabilityClassOneQuantiles[1].cdf(threshold, cdf[1]); return std::min(cdf[0], 1.0 - cdf[1]); }; double threshold; double minRecallAtThreshold; std::size_t maxIterations{20}; common::CSolvers::maximize(0.01, 0.99, minRecall(0.01), minRecall(0.99), minRecall, 1e-3, maxIterations, threshold, minRecallAtThreshold); LOG_TRACE(<< "threshold = " << threshold << ", min recall at threshold = " << minRecallAtThreshold); TDoubleVector result{2}; result(0) = threshold < 0.5 ? threshold / (1.0 - threshold) : 1.0; result(1) = threshold < 0.5 ? 1.0 : (1.0 - threshold) / threshold; return result; } CDataFrameUtils::TDoubleVector CDataFrameUtils::maximizeMinimumRecallForMulticlass(std::size_t numberThreads, const core::CDataFrame& frame, const core::CPackedBitVector& rowMask, std::size_t numberClasses, std::size_t targetColumn, const TReadPredictionFunc& readPrediction) { // Use a large random sample of the data frame and compute the expected // optimisation objective for the whole data set from this. using TDoubleMatrix = common::CDenseMatrix<double>; using TMinAccumulator = common::CBasicStatistics::SMin<double>::TAccumulator; common::CPRNG::CXorOShiro128Plus rng; std::size_t numberSamples{ static_cast<std::size_t>(std::min(1000.0, rowMask.manhattan()))}; core::CPackedBitVector sampleMask; // No need to sample if were going to use every row we've been given. if (numberSamples < static_cast<std::size_t>(rowMask.manhattan())) { TStratifiedSamplerUPtr sampler; std::tie(sampler, std::ignore) = classifierStratifiedCrossValidationRowSampler( numberThreads, frame, targetColumn, rng, numberSamples, rowMask); TSizeVec rowIndices; frame.readRows(1, 0, frame.numberRows(), [&](const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { sampler->sample(*row); } }, &rowMask); sampler->finishSampling(rng, rowIndices); std::sort(rowIndices.begin(), rowIndices.end()); LOG_TRACE(<< "# row indices = " << rowIndices.size()); for (auto row : rowIndices) { sampleMask.extend(false, row - sampleMask.size()); sampleMask.extend(true); } sampleMask.extend(false, rowMask.size() - sampleMask.size()); } else { sampleMask = rowMask; } // Compute the count of each class in the sample set. auto readClassCountsAndRecalls = core::bindRetrievableState( [&](TDoubleVector& state, const TRowItr& beginRows, const TRowItr& endRows) { for (auto row = beginRows; row != endRows; ++row) { if (isMissing((*row)[targetColumn]) == false) { int j{static_cast<int>((*row)[targetColumn])}; int k; readPrediction(*row).maxCoeff(&k); state(j) += 1.0; state(numberClasses + j) += j == k ? 1.0 : 0.0; } } }, TDoubleVector{TDoubleVector::Zero(2 * numberClasses)}); auto copyClassCountsAndRecalls = [](TDoubleVector state, TDoubleVector& result) { result = std::move(state); }; auto reduceClassCountsAndRecalls = [&](TDoubleVector state, TDoubleVector& result) { result += state; }; TDoubleVector classCountsAndRecalls; doReduce(frame.readRows(numberThreads, 0, frame.numberRows(), readClassCountsAndRecalls, &sampleMask), copyClassCountsAndRecalls, reduceClassCountsAndRecalls, classCountsAndRecalls); TDoubleVector classCounts{classCountsAndRecalls.topRows(numberClasses)}; TDoubleVector classRecalls{classCountsAndRecalls.bottomRows(numberClasses)}; // If a class count is zero the derivative of the loss functin w.r.t. that // class is not well defined. The objective is independent of such a class // so the choice for its count and recall is not important. However, its // count must be non-zero so that we don't run into a NaN cascade. classCounts = classCounts.cwiseMax(1.0); classRecalls.array() /= classCounts.array(); LOG_TRACE(<< "class counts = " << classCounts.transpose()); LOG_TRACE(<< "class recalls = " << classRecalls.transpose()); TSizeVec recallOrder(numberClasses); std::iota(recallOrder.begin(), recallOrder.end(), 0); std::sort(recallOrder.begin(), recallOrder.end(), [&](std::size_t lhs, std::size_t rhs) { return classRecalls(lhs) > classRecalls(rhs); }); LOG_TRACE(<< "decreasing recall order = " << recallOrder); // We want to solve max_w{min_j{recall(class_j)}} = max_w{min_j{c_j(w) / n_j}} // where c_j(w) and n_j are correct predictions for weight w and count of class_j // in the sample set, respectively. We use an equivalent formulation // // min_w{max_j{f_j(w)}} = min_w{max_j{1 - c_j(w) / n_j}} // // We can write f_j(w) as // // max_j{sum_i{1 - 1{argmax_i(w_i p_i) == j}} / n_j} (1) // // where 1{.} denotes the indicator function. (1) has a smooth relaxation given // by f_j(w) = max_j{sum_i{1 - softmax_j(w_i p_i)} / n_j}. Note that this isn't // convex so we use multiple restarts. auto objective = [&](const TDoubleVector& weights) -> double { TDoubleVector probabilities; TDoubleVector scores; auto computeObjective = core::bindRetrievableState( [=](TDoubleVector& state, const TRowItr& beginRows, const TRowItr& endRows) mutable { for (auto row = beginRows; row != endRows; ++row) { if (isMissing((*row)[targetColumn]) == false) { std::size_t j{static_cast<std::size_t>((*row)[targetColumn])}; probabilities = readPrediction(*row); if (allFinite(probabilities)) { common::CTools::inplaceSoftmax(probabilities); scores = probabilities.cwiseProduct(weights); common::CTools::inplaceSoftmax(scores); state(j) += (1.0 - scores(j)) / classCounts(j); } else { LOG_WARN(<< "Ignoring unexpected probabilities " << probabilities); } } } }, TDoubleVector{TDoubleVector::Zero(numberClasses)}); auto copyObjective = [](TDoubleVector state, TDoubleVector& result) { result = std::move(state); }; auto reduceObjective = [&](TDoubleVector state, TDoubleVector& result) { result += state; }; TDoubleVector objective_; doReduce(frame.readRows(numberThreads, 0, frame.numberRows(), computeObjective, &sampleMask), copyObjective, reduceObjective, objective_); return objective_.maxCoeff(); }; auto objectiveGradient = [&](const TDoubleVector& weights) -> TDoubleVector { TDoubleVector probabilities; TDoubleVector scores; auto computeObjectiveAndGradient = core::bindRetrievableState( [=](TDoubleMatrix& state, const TRowItr& beginRows, const TRowItr& endRows) mutable { for (auto row = beginRows; row != endRows; ++row) { if (isMissing((*row)[targetColumn]) == false) { std::size_t j{static_cast<std::size_t>((*row)[targetColumn])}; probabilities = readPrediction(*row); if (allFinite(probabilities)) { common::CTools::inplaceSoftmax(probabilities); scores = probabilities.cwiseProduct(weights); common::CTools::inplaceSoftmax(scores); state(j, 0) += (1.0 - scores(j)) / classCounts(j); state.col(j + 1) += scores(j) * probabilities .cwiseProduct(scores - TDoubleVector::Unit(numberClasses, j)) .cwiseQuotient(classCounts); } else { LOG_WARN(<< "Ignoring unexpected probabilities " << probabilities); } } } }, TDoubleMatrix{TDoubleMatrix::Zero(numberClasses, numberClasses + 1)}); auto copyObjectiveAndGradient = [](TDoubleMatrix state, TDoubleMatrix& result) { result = std::move(state); }; auto reduceObjectiveAndGradient = [&](TDoubleMatrix state, TDoubleMatrix& result) { result += state; }; TDoubleMatrix objectiveAndGradient; doReduce(frame.readRows(numberThreads, 0, frame.numberRows(), computeObjectiveAndGradient, &sampleMask), copyObjectiveAndGradient, reduceObjectiveAndGradient, objectiveAndGradient); std::size_t max; objectiveAndGradient.col(0).maxCoeff(&max); return objectiveAndGradient.col(max + 1); }; // We always try initialising with all weights equal because our minimization // algorithm ensures we'll only decrease the initial value of the optimisation // objective. This means we'll never do worse than not reweighting. Also, we // expect weights to be (roughly) monotonic increasing for decreasing recall // and use this to bias initial values. TMinAccumulator minLoss; TDoubleVector minLossWeights; TDoubleVector w0{TDoubleVector::Ones(numberClasses)}; for (std::size_t i = 0; i < 5; ++i) { common::CLbfgs<TDoubleVector> lbfgs{5}; double loss; std::tie(w0, loss) = lbfgs.minimize(objective, objectiveGradient, std::move(w0)); LOG_TRACE(<< "weights* = " << w0.transpose() << ", loss* = " << loss); if (minLoss.add(loss)) { minLossWeights = std::move(w0); } w0 = TDoubleVector::Ones(numberClasses); for (std::size_t j = 1; j < numberClasses; ++j) { w0(j) = w0(j - 1) * common::CSampling::uniformSample(rng, 0.9, 1.3); } } // Since we take argmax_i w_i p_i we can multiply by a constant. We arrange for // the largest weight to always be one. minLossWeights.array() /= minLossWeights.maxCoeff(); LOG_TRACE(<< "weights = " << minLossWeights.transpose()); return minLossWeights; } void CDataFrameUtils::removeMetricColumns(const core::CDataFrame& frame, TSizeVec& columnMask) { const auto& columnIsCategorical = frame.columnIsCategorical(); columnMask.erase(std::remove_if(columnMask.begin(), columnMask.end(), [&columnIsCategorical](std::size_t i) { return columnIsCategorical[i] == false; }), columnMask.end()); } void CDataFrameUtils::removeCategoricalColumns(const core::CDataFrame& frame, TSizeVec& columnMask) { const auto& columnIsCategorical = frame.columnIsCategorical(); columnMask.erase(std::remove_if(columnMask.begin(), columnMask.end(), [&columnIsCategorical](std::size_t i) { return columnIsCategorical[i]; }), columnMask.end()); } double CDataFrameUtils::unitWeight(const TRowRef&) { return 1.0; } } } }