include/maths/common/CXMeansOnline.h (894 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #ifndef INCLUDED_ml_maths_common_CXMeansOnline_h #define INCLUDED_ml_maths_common_CXMeansOnline_h #include <core/CLogger.h> #include <core/CStatePersistInserter.h> #include <core/CStateRestoreTraverser.h> #include <core/RestoreMacros.h> #include <maths/common/CBasicStatistics.h> #include <maths/common/CBasicStatisticsPersist.h> #include <maths/common/CClusterer.h> #include <maths/common/CInformationCriteria.h> #include <maths/common/CKMeansOnline.h> #include <maths/common/CLinearAlgebra.h> #include <maths/common/CLinearAlgebraPersist.h> #include <maths/common/COrderings.h> #include <maths/common/COrderingsSimultaneousSort.h> #include <maths/common/CPRNG.h> #include <maths/common/CRestoreParams.h> #include <maths/common/CSphericalCluster.h> #include <maths/common/CTypeTraits.h> #include <maths/common/Constants.h> #include <maths/common/MathsTypes.h> #include <cmath> #include <cstddef> #include <numeric> #include <optional> #include <vector> namespace ml { namespace maths { namespace common { //! \brief A single pass online clusterer based on the x-means //! algorithm of Pelleg and Moore. //! //! DESCRIPTION:\n //! This implements the CClusterer contract targeting the k-means //! optimization objective (the standard implementation is not possible //! because the data are processed using the turnstile model). //! //! As with x-means clustering proceeds top down, so we only create //! a split when we are confident. This minimizes our state size and //! is important for anomaly detection because over splitting the data //! significantly impairs anomaly detection. An information theoretic //! criterion is used to decide whether or not to split clusters; //! specifically, we compare the BIC of the most promising split with //! that of the unsplit data and require strong evidence for the split. //! A sketch data structure is used to keep state size down on which //! to run k-means. In addition, clusters can be merged if they are no //! longer worth keeping (again based on BIC, but with some hysteresis). //! //! Verses standard x-means this has one additional feature which is //! particularly desirable for anomaly detection: the minimum cluster //! size (in number of data points) and fraction (in proportion of data //! points) can both be controlled. Typically we don't want to create //! clusters for a small number of well separated points, since these //! may be genuine anomalies. This principally affects the search for //! candidate splits of the data. //! //! Note that this is a soft clustering so that we assign the soft //! membership of a point to a cluster based on the probability that //! it is generated by the corresponding normal. However, this is not //! trying to learn a mixture distribution representation of the data, //! which would require more involved inference scheme such as EM. It //! is expected to give largely order (of points processed) invariant //! unsupervised clustering of the data which identifies reasonably //! separated clusters. template<typename T, std::size_t N> class CXMeansOnline : public CClusterer<CVectorNx1<T, N>> { public: using TPoint = CVectorNx1<T, N>; using TPointVec = std::vector<TPoint>; using TClusterer = CClusterer<TPoint>; using TPointPrecise = typename TClusterer::TPointPrecise; using TPointPreciseVec = typename TClusterer::TPointPreciseVec; using TPointPreciseDoublePrVec = typename TClusterer::TPointPreciseDoublePrVec; using TSizeDoublePr = typename TClusterer::TSizeDoublePr; using TSizeDoublePr2Vec = typename TClusterer::TSizeDoublePr2Vec; using TDoubleVec = std::vector<double>; using TDoubleVecVec = std::vector<TDoubleVec>; using TSizeVec = std::vector<std::size_t>; using TSizeVecVec = std::vector<TSizeVec>; using TPrecise = typename SPromoted<T>::Type; using TMatrixPrecise = CSymmetricMatrixNxN<TPrecise, N>; using TCovariances = CBasicStatistics::SSampleCovariances<TPointPrecise>; using TSphericalCluster = typename CSphericalCluster<TPoint>::Type; using TSphericalClusterVec = std::vector<TSphericalCluster>; using TSphericalClusterVecVec = std::vector<TSphericalClusterVec>; using TKMeansOnline = CKMeansOnline<TPoint>; using TKMeansOnlineVec = std::vector<TKMeansOnline>; class CCluster; using TClusterClusterPr = std::pair<CCluster, CCluster>; using TOptionalClusterClusterPr = std::optional<TClusterClusterPr>; //! \brief Represents a cluster. class CCluster { public: explicit CCluster(const CXMeansOnline& clusterer) : m_Index{clusterer.m_ClusterIndexGenerator.next()}, m_DataType{clusterer.m_DataType}, m_DecayRate{clusterer.m_DecayRate}, m_Covariances{N}, m_Structure{STRUCTURE_SIZE, clusterer.m_DecayRate, MINIMUM_CATEGORY_COUNT, STRUCTURE_SIZE / 2} {} //! Initialize by traversing a state document. bool acceptRestoreTraverser(const SDistributionRestoreParams& params, core::CStateRestoreTraverser& traverser) { do { const std::string& name = traverser.name(); RESTORE_BUILT_IN(INDEX_TAG, m_Index) RESTORE(COVARIANCES_TAG, m_Covariances.fromDelimited(traverser.value())) RESTORE(STRUCTURE_TAG, traverser.traverseSubLevel([&](auto& traverser_) { return m_Structure.acceptRestoreTraverser(params, traverser_); })) } while (traverser.next()); return true; } //! Persist state by passing information to the supplied inserter. void acceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertValue(INDEX_TAG, m_Index); inserter.insertValue(COVARIANCES_TAG, m_Covariances.toDelimited()); inserter.insertLevel(STRUCTURE_TAG, [this](auto& inserter_) { m_Structure.acceptPersistInserter(inserter_); }); } //! Efficiently swap the contents of this and \p other. void swap(CCluster& other) { std::swap(m_Index, other.m_Index); std::swap(m_DataType, other.m_DataType); std::swap(m_DecayRate, other.m_DecayRate); std::swap(m_Covariances, other.m_Covariances); m_Structure.swap(other.m_Structure); } //! Set the type of data in the cluster. void dataType(maths_t::EDataType value) { m_DataType = value; } //! Set the rate at which information is aged out. void decayRate(double value) { m_DecayRate = value; m_Structure.decayRate(value); } //! Add \p x_ to this cluster. void add(const TPointPrecise& x, double count) { switch (m_DataType) { case maths_t::E_IntegerData: { TSphericalCluster x_(x, SCountAndVariance(count, 1.0 / 12.0)); m_Covariances.add(x_); break; } case maths_t::E_DiscreteData: case maths_t::E_ContinuousData: case maths_t::E_MixedData: m_Covariances.add(x, TPointPrecise(count)); break; } m_Structure.add(x, count); } //! Propagate the cluster forwards by \p time. void propagateForwardsByTime(double time) { double alpha = std::exp(-this->scaledDecayRate() * time); m_Covariances.age(alpha); m_Structure.age(alpha); } //! Get the unique index of this cluster. std::size_t index() const { return m_Index; } //! Get the centre of the cluster. //! //! This is defined as the sample mean. const TPointPrecise& centre() const { return CBasicStatistics::mean(m_Covariances); } //! Get the spread of the cluster. //! //! This is defined as the trace of the sample covariance matrix. double spread() const { return std::sqrt( CBasicStatistics::maximumLikelihoodCovariances(m_Covariances).trace() / static_cast<double>(N)); } //! Get the sample covariance matrix this cluster. const TCovariances& covariances() const { return m_Covariances; } //! Get the total count of values added to the cluster. double count() const { return CBasicStatistics::count(m_Covariances); } //! Get the weight of the cluster. double weight(maths_t::EClusterWeightCalc calc) const { switch (calc) { case maths_t::E_ClustersEqualWeight: return 1.0; case maths_t::E_ClustersFractionWeight: return this->count(); } LOG_ABORT(<< "Unexpected calculation style " << calc); return 1.0; } //! Get the likelihood that \p x is from this cluster. double logLikelihoodFromCluster(maths_t::EClusterWeightCalc calc, const TPointPrecise& x) const { double likelihood; const TPointPrecise& mean = CBasicStatistics::mean(m_Covariances); const TMatrixPrecise& covariances = CBasicStatistics::maximumLikelihoodCovariances(m_Covariances); maths_t::EFloatingPointErrorStatus status = gaussianLogLikelihood(covariances, x - mean, likelihood, false); if ((status & maths_t::E_FpFailed) != 0) { LOG_ERROR(<< "Unable to compute likelihood for " << x << " and cluster " << m_Index); return core::constants::LOG_MIN_DOUBLE - 1.0; } if ((status & maths_t::E_FpOverflowed) != 0) { return likelihood; } return likelihood + std::log(this->weight(calc)); } //! Get \p numberSamples from this cluster. void sample(std::size_t numberSamples, TPointPreciseVec& samples) const { m_Structure.sample(numberSamples, samples); } //! Try and find a split by a full search of the binary tree //! of possible optimal 2-splits of the data. //! //! \param[in] minimumCount The minimum count of a cluster //! in the split. //! \param[in] indexGenerator The unique cluster identifier //! generator. TOptionalClusterClusterPr split(CPRNG::CXorOShiro128Plus& rng, double minimumCount, CClustererTypes::CIndexGenerator& indexGenerator) { // We do our clustering top down to minimize space and avoid // making splits before we are confident they exist. This is // important for anomaly detection because we do *not* want // to fit a cluster to an outlier and judge it to be not // anomalous as a result. // // By analogy to x-means we choose a candidate split of the // data by minimizing the total within class deviation of the // two classes. In order to decide whether or not to split we // 1) impose minimum count on the smaller cluster 2) use an // information theoretic criterion. Specifically, we threshold // the BIC gain of using the multi-mode distribution verses // the single mode distribution. LOG_TRACE(<< "split"); if (m_Structure.buffering()) { return {}; } std::size_t n = m_Structure.size(); if (n < 2) { return {}; } TSizeVecVec split; if (this->splitSearch(rng, minimumCount, split) == false) { return {}; } LOG_TRACE(<< "split = " << split); TCovariances covariances[]{TCovariances(N), TCovariances(N)}; TSphericalClusterVec clusters; this->sphericalClusters(clusters); for (std::size_t i = 0; i < 2; ++i) { for (std::size_t j = 0; j < split[i].size(); ++j) { covariances[i].add(clusters[split[i][j]]); } } TKMeansOnlineVec structure; m_Structure.split(split, structure); LOG_TRACE(<< "Splitting cluster " << this->index() << " at " << this->centre() << " left = " << structure[0].print() << ", right = " << structure[1].print()); std::size_t index[]{indexGenerator.next(), indexGenerator.next()}; indexGenerator.recycle(m_Index); return TOptionalClusterClusterPr{ std::in_place, CCluster{index[0], m_DataType, m_DecayRate, covariances[0], std::move(structure[0])}, CCluster{index[1], m_DataType, m_DecayRate, covariances[1], std::move(structure[1])}}; } //! Check if this and \p other cluster should merge. //! //! \param[in] other The cluster to merge with this one. bool shouldMerge(CCluster& other) { return BICGain(*this, other) <= MAXIMUM_MERGE_DISTANCE; } //! Merge this and \p other cluster. CCluster merge(const CCluster& other, CClustererTypes::CIndexGenerator& indexGenerator) { TKMeansOnline structure{std::move(m_Structure)}; structure.merge(other.m_Structure); CCluster result{indexGenerator.next(), m_DataType, m_DecayRate, m_Covariances + other.m_Covariances, std::move(structure)}; indexGenerator.recycle(m_Index); indexGenerator.recycle(other.m_Index); return result; } //! Get a checksum for this object. std::uint64_t checksum(std::uint64_t seed) const { seed = CChecksum::calculate(seed, m_Index); seed = CChecksum::calculate(seed, m_DataType); seed = CChecksum::calculate(seed, m_DecayRate); seed = CChecksum::calculate(seed, m_Covariances); return CChecksum::calculate(seed, m_Structure); } //! Debug the memory used by this component. void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("CXMeansOnline"); core::memory_debug::dynamicSize("m_Structure", m_Structure, mem); } //! Get the memory used by this component. std::size_t memoryUsage() const { return core::memory::dynamicSize(m_Structure); } //! Get Bayes Information Criterion decrease in going from one //! to two clusters. //! //! \note This is not necessarily positive. static double BICGain(const CCluster& lhs, const CCluster& rhs) { return BICGain(lhs.m_Covariances, rhs.m_Covariances); } protected: CCluster(std::size_t index, maths_t::EDataType dataType, double decayRate, const TCovariances& covariances, TKMeansOnline structure) : m_Index(index), m_DataType(dataType), m_DecayRate(decayRate), m_Covariances(covariances), m_Structure(std::move(structure)) {} //! Search for a split of the data that satisfies the constraints //! on both the BIC divergence and minimum count. //! //! In order to handle the constraint on minimum count, we do a //! breadth first search of the binary tree of optimal 2-splits //! of subsets of the data looking for splits which satisfy the //! constraints on *both* BIC divergence and count. The search //! terminates at any node which can't be split subject to BIC. //! //! The intention of this is to find "natural" splits of the data //! which would be obscured when splitting into the optimal 2-split. //! This can occur when a small number of points (less than minimum //! count) are sufficient far from the other that they split off //! in preference to some other natural split of the data. Although //! we can impose the count constraint when finding the optimal //! 2-split this has associated problems. In particular, extreme //! outliers then tend to rip sufficient points away from their //! natural cluster in order to generate a new cluster. bool splitSearch(CPRNG::CXorOShiro128Plus& rng, double minimumCount, TSizeVecVec& result) { result.clear(); // The search visits a binary tree of candidate 2-splits of // the data breadth first. If a suitable split is found on a // level of the tree then the search terminates returning that // split. Note that if a subset of the data can be split we // also check that the corresponding full 2-split can be split // subject to the same constraints (to avoid merging the two // clusters straight away). TSphericalClusterVec node; this->sphericalClusters(node); TSphericalClusterVec remainder; remainder.reserve(node.size()); TSphericalClusterVecVec candidate; for (;;) { TKMeansOnline::kmeans(rng, node, 2, candidate); LOG_TRACE(<< "candidate = " << candidate); if (candidate.size() > 2) { LOG_ERROR(<< "Expected 2-split: " << candidate); break; } if (candidate[0].empty() || candidate[1].empty()) { // This can happen if all the points are co-located, // in which case we can't split this node anyway. break; } // We use the Ledoit and Wolf optimal shrinkage estimate // because the sample sizes here might be quite small in // which case the variance of the covariance estimates can // be large. TCovariances covariances[]{TCovariances(N), TCovariances(N)}; CBasicStatistics::covariancesLedoitWolf(candidate[0], covariances[0]); CBasicStatistics::covariancesLedoitWolf(candidate[1], covariances[1]); double n[]{CBasicStatistics::count(covariances[0]), CBasicStatistics::count(covariances[1])}; double nmin = std::min(n[0], n[1]); // Check the count constraint. bool satisfiesCount = (nmin >= minimumCount); LOG_TRACE(<< "count = " << nmin << " (to split " << minimumCount << ")"); // Check the gain constraint. double gain{BICGain(covariances[0], covariances[1])}; bool satisfiesGain{gain > MINIMUM_SPLIT_GAIN}; LOG_TRACE(<< "BIC(1) - BIC(2) = " << gain << " (to split " << MINIMUM_SPLIT_GAIN << ")"); if (satisfiesCount == false) { // Recurse to the (one) node with sufficient count. if (n[0] > minimumCount && candidate[0].size() > 1) { node.swap(candidate[0]); remainder.insert(remainder.end(), candidate[1].begin(), candidate[1].end()); continue; } if (n[1] > minimumCount && candidate[1].size() > 1) { node.swap(candidate[1]); remainder.insert(remainder.end(), candidate[0].begin(), candidate[0].end()); continue; } } else if (satisfiesGain) { LOG_TRACE(<< "Checking full split"); TSizeVec assignment(remainder.size()); for (std::size_t i = 0; i < remainder.size(); ++i) { assignment[i] = nearest(remainder[i], covariances); } for (std::size_t i = 0; i < assignment.size(); ++i) { std::size_t j = assignment[i]; TCovariances ci(N); ci.add(remainder[i]); candidate[j].push_back(remainder[i]); covariances[j] += ci; n[j] += CBasicStatistics::count(ci); } gain = BICGain(covariances[0], covariances[1]); LOG_TRACE(<< "BIC(1) - BIC(2) = " << gain << " (to split " << MINIMUM_SPLIT_GAIN << ")"); if (gain > MINIMUM_SPLIT_GAIN) { LOG_TRACE(<< "splitting"); typename CSphericalCluster<TPoint>::SLess less; result.resize(candidate.size()); TSphericalClusterVec clusters; this->sphericalClusters(clusters); TSizeVec indexes(clusters.size()); std::iota(indexes.begin(), indexes.end(), 0); COrderings::simultaneousSortWith( typename CSphericalCluster<TPoint>::SLess(), clusters, indexes); for (std::size_t i = 0; i < candidate.size(); ++i) { for (const auto& x : candidate[i]) { std::size_t j = std::lower_bound(clusters.begin(), clusters.end(), x, less) - clusters.begin(); if (j >= clusters.size()) { LOG_ERROR(<< "Missing " << x << " from clusters = " << clusters); return false; } result[i].push_back(indexes[j]); } } } } break; } return result.size() > 0; } //! Get the spherical clusters being maintained in the fine- //! grained structure model of this cluster. void sphericalClusters(TSphericalClusterVec& result) const { m_Structure.clusters(result); switch (m_DataType) { case maths_t::E_IntegerData: for (auto& x : result) { x.annotation().s_Variance += 1.0 / 12.0; } break; case maths_t::E_DiscreteData: case maths_t::E_ContinuousData: case maths_t::E_MixedData: break; } } //! Get the closest (in Mahalanobis distance) cluster to \p x. static std::size_t nearest(const TSphericalCluster& x, const TCovariances (&c)[2]) { TPrecise d[]{0, 0}; TPointPrecise x_(x); inverseQuadraticForm(CBasicStatistics::maximumLikelihoodCovariances(c[0]), x_ - CBasicStatistics::mean(c[0]), d[0]); inverseQuadraticForm(CBasicStatistics::maximumLikelihoodCovariances(c[1]), x_ - CBasicStatistics::mean(c[1]), d[1]); return d[0] < d[1] ? 0 : 1; } //! Get the Bayes Information Criterion gain, subject to Gaussian //! assumptions, in representing \p lhs and \p rhs using one or //! two modes. static double BICGain(const TCovariances& lhs, const TCovariances& rhs) { CGaussianInfoCriterion<TPoint, E_BIC> BIC1; BIC1.add(lhs + rhs); CGaussianInfoCriterion<TPoint, E_BIC> BIC2; BIC2.add(lhs); BIC2.add(rhs); return BIC1.calculate() - BIC2.calculate(); } private: //! Get the scaled decay rate for use by propagateForwardsByTime. double scaledDecayRate() const { return std::pow(0.5, static_cast<double>(N)) * m_DecayRate; } private: //! A unique identifier for this cluster. std::size_t m_Index; //! The type of data which will be clustered. maths_t::EDataType m_DataType; //! Controls the rate at which information is lost. double m_DecayRate; //! The mean, covariances of the data in this cluster. TCovariances m_Covariances; //! The data representing the internal structure of this cluster. TKMeansOnline m_Structure; }; using TClusterVec = std::vector<CCluster>; using TClusterVecItr = typename TClusterVec::iterator; public: //! \name Life-cycle //@{ //! Construct a new clusterer. //! //! \param[in] dataType The type of data which will be clustered. //! \param[in] weightCalc The style of the cluster weight calculation //! (see maths_t::EClusterWeightCalc for details). //! \param[in] decayRate Controls the rate at which information is //! lost from the clusters. //! \param[in] minimumClusterFraction The minimum fractional count //! of points in a cluster. //! \param[in] minimumClusterCount The minimum count of points in a //! cluster. //! \param[in] minimumCategoryCount The minimum count for a category //! in the sketch to cluster. //! \param[in] splitFunc Optional callback for when a cluster is split. //! \param[in] mergeFunc Optional callback for when two clusters are CXMeansOnline(maths_t::EDataType dataType, maths_t::EClusterWeightCalc weightCalc, double decayRate = 0.0, double minimumClusterFraction = MINIMUM_CLUSTER_SPLIT_FRACTION, double minimumClusterCount = MINIMUM_CLUSTER_SPLIT_COUNT, double minimumCategoryCount = MINIMUM_CATEGORY_COUNT, const CClustererTypes::TSplitFunc& splitFunc = CClustererTypes::CDoNothing(), const CClustererTypes::TMergeFunc& mergeFunc = CClustererTypes::CDoNothing()) : TClusterer(splitFunc, mergeFunc), m_DataType(dataType), m_WeightCalc(weightCalc), m_InitialDecayRate(decayRate), m_DecayRate(decayRate), m_MinimumClusterFraction(minimumClusterFraction), m_MinimumClusterCount(minimumClusterCount), m_MinimumCategoryCount(minimumCategoryCount), m_Clusters(1, CCluster{*this}) {} //! Construct by traversing a state document. CXMeansOnline(const SDistributionRestoreParams& params, core::CStateRestoreTraverser& traverser) : TClusterer(CClustererTypes::CDoNothing(), CClustererTypes::CDoNothing()), m_DataType(params.s_DataType), m_InitialDecayRate(params.s_DecayRate), m_DecayRate(params.s_DecayRate), m_MinimumCategoryCount(params.s_MinimumCategoryCount) { if (traverser.traverseSubLevel([&](auto& traverser_) { return this->acceptRestoreTraverser(params, traverser_); })) { traverser.setBadState(); } } //! Construct by traversing a state document. CXMeansOnline(const SDistributionRestoreParams& params, const CClustererTypes::TSplitFunc& splitFunc, const CClustererTypes::TMergeFunc& mergeFunc, core::CStateRestoreTraverser& traverser) : TClusterer(splitFunc, mergeFunc), m_DataType(params.s_DataType), m_InitialDecayRate(params.s_DecayRate), m_DecayRate(params.s_DecayRate), m_MinimumCategoryCount(params.s_MinimumCategoryCount) { if (traverser.traverseSubLevel([&](auto& traverser_) { return this->acceptRestoreTraverser(params, traverser_); }) == false) { traverser.setBadState(); } } //! The x-means clusterer has value semantics. CXMeansOnline(const CXMeansOnline& other) : TClusterer(other.splitFunc(), other.mergeFunc()), m_Rng(other.m_Rng), m_DataType(other.m_DataType), m_WeightCalc(other.m_WeightCalc), m_InitialDecayRate(other.m_InitialDecayRate), m_DecayRate(other.m_DecayRate), m_HistoryLength(other.m_HistoryLength), m_MinimumClusterFraction(other.m_MinimumClusterFraction), m_MinimumClusterCount(other.m_MinimumClusterCount), m_MinimumCategoryCount(other.m_MinimumCategoryCount), m_ClusterIndexGenerator(other.m_ClusterIndexGenerator.deepCopy()), m_Clusters(other.m_Clusters) {} CXMeansOnline(CXMeansOnline&&) = default; //! The x-means clusterer has value semantics. CXMeansOnline& operator=(const CXMeansOnline& other) { if (this != &other) { CXMeansOnline tmp(other); this->swap(tmp); } return *this; } CXMeansOnline& operator=(CXMeansOnline&&) = default; //! Efficiently swap the contents of two x-means objects. void swap(CXMeansOnline& other) { this->TClusterer::swap(other); std::swap(m_Rng, other.m_Rng); std::swap(m_DataType, other.m_DataType); std::swap(m_WeightCalc, other.m_WeightCalc); std::swap(m_InitialDecayRate, other.m_InitialDecayRate); std::swap(m_DecayRate, other.m_DecayRate); std::swap(m_HistoryLength, other.m_HistoryLength); std::swap(m_MinimumClusterFraction, other.m_MinimumClusterFraction); std::swap(m_MinimumClusterCount, other.m_MinimumClusterCount); std::swap(m_MinimumCategoryCount, other.m_MinimumCategoryCount); std::swap(m_ClusterIndexGenerator, other.m_ClusterIndexGenerator); m_Clusters.swap(other.m_Clusters); } //! \name Clusterer Contract //@{ //! Get the tag name for this clusterer. const core::TPersistenceTag& persistenceTag() const override { return CClustererTypes::X_MEANS_ONLINE_TAG; } //! Persist state by passing information to the supplied inserter. void acceptPersistInserter(core::CStatePersistInserter& inserter) const override { for (const auto& cluster : m_Clusters) { inserter.insertLevel(CLUSTER_TAG, [&cluster](auto& inserter_) { cluster.acceptPersistInserter(inserter_); }); } inserter.insertValue(DECAY_RATE_TAG, m_DecayRate.toString()); inserter.insertValue(HISTORY_LENGTH_TAG, m_HistoryLength.toString()); inserter.insertValue(RNG_TAG, m_Rng.toString()); inserter.insertValue(WEIGHT_CALC_TAG, static_cast<int>(m_WeightCalc)); inserter.insertValue(MINIMUM_CLUSTER_FRACTION_TAG, m_MinimumClusterFraction.toString()); inserter.insertValue(MINIMUM_CLUSTER_COUNT_TAG, m_MinimumClusterCount.toString()); inserter.insertLevel(CLUSTER_INDEX_GENERATOR_TAG, [this](auto& inserter_) { m_ClusterIndexGenerator.acceptPersistInserter(inserter_); }); } //! Creates a copy of the clusterer. //! //! \warning Caller owns returned object. CXMeansOnline* clone() const override { return new CXMeansOnline(*this); } //! Clear the current clusterer state. void clear() override { *this = CXMeansOnline(m_DataType, m_WeightCalc, m_InitialDecayRate, m_MinimumClusterFraction, m_MinimumClusterCount, m_MinimumCategoryCount, this->splitFunc(), this->mergeFunc()); } //! Remove the cluster with \p index. bool remove(std::size_t index) override { if (m_Clusters.size() <= 1) { return false; } auto i = std::find_if(m_Clusters.begin(), m_Clusters.end(), [&](const auto& cluster) { return cluster.index() == index; }); if (i != m_Clusters.end()) { CCluster& cluster = *i; CCluster* nearest = this->nearest(cluster); if (nearest != nullptr) { LOG_TRACE(<< "Merging cluster " << cluster.index() << " at " << cluster.centre() << " and cluster " << nearest->index() << " at " << nearest->centre()); CCluster merge = nearest->merge(cluster, m_ClusterIndexGenerator); (this->mergeFunc())(cluster.index(), nearest->index(), merge.index()); nearest->swap(merge); } m_Clusters.erase(i); return true; } return false; } //! Get the number of clusters. std::size_t numberClusters() const override { return m_Clusters.size(); } //! Set the type of data being clustered. void dataType(maths_t::EDataType dataType) override { m_DataType = dataType; for (auto& cluster : m_Clusters) { cluster.dataType(dataType); } } //! Set the rate at which information is aged out. void decayRate(double decayRate) override { m_DecayRate = decayRate; for (auto& cluster : m_Clusters) { cluster.decayRate(decayRate); } } //! Check if the cluster identified by \p index exists. bool hasCluster(std::size_t index) const override { return this->cluster(index) != nullptr; } //! Get the centre of the cluster identified by \p index. bool clusterCentre(std::size_t index, TPointPrecise& result) const override { const CCluster* cluster = this->cluster(index); if (cluster == nullptr) { LOG_ERROR(<< "Cluster " << index << " doesn't exist"); return false; } result = cluster->centre(); return true; } //! Get the spread of the cluster identified by \p index. bool clusterSpread(std::size_t index, double& result) const override { const CCluster* cluster = this->cluster(index); if (cluster == nullptr) { LOG_ERROR(<< "Cluster " << index << " doesn't exist"); return false; } result = cluster->spread(); return true; } //! Gets the index of the cluster(s) to which \p point belongs //! together with their weighting factor. void cluster(const TPointPrecise& point, TSizeDoublePr2Vec& result, double count = 1.0) const override { result.clear(); if (m_Clusters.empty()) { LOG_ERROR(<< "No clusters"); return; } // This does a soft assignment. Given we are finding a // partitioning clustering (as a result of targeting // the k-means objective) we only consider the case that // the point comes from either the left or right cluster. // A-priori the probability a randomly selected point // comes from a cluster is proportional to its weight: // P(i) = n(i) / Sum_j{ n(j) } // // Bayes theorem then immediately gives that the probability // that a given point is from the i'th cluster // P(i | x) = L(x | i) * P(i) / Z // // where Z is the normalization constant: // Z = Sum_i{ P(i | x) } result.reserve(m_Clusters.size()); double lmax = -std::numeric_limits<double>::max(); for (const auto& cluster : m_Clusters) { double likelihood = cluster.logLikelihoodFromCluster(m_WeightCalc, point); result.emplace_back(cluster.index(), likelihood); lmax = std::max(lmax, likelihood); } double Z = 0.0; for (auto& x : result) { x.second = std::exp(x.second - lmax); Z += x.second; } double pmax = 0.0; for (auto& x : result) { x.second /= Z; pmax = std::max(pmax, x.second); } result.erase(std::remove_if(result.begin(), result.end(), CProbabilityLessThan(HARD_ASSIGNMENT_THRESHOLD * pmax)), result.end()); Z = 0.0; for (const auto& x : result) { Z += x.second; } for (auto& x : result) { x.second *= count / Z; } } //! Update the clustering with \p point and return its cluster(s) //! together with their weighting factor. void add(const TPointPrecise& x, TSizeDoublePr2Vec& clusters, double count = 1.0) override { if (m_Clusters.size() == 1) { LOG_TRACE(<< "Adding " << x << " to " << m_Clusters[0].centre()); m_Clusters[0].add(x, count); clusters.emplace_back(m_Clusters[0].index(), count); if (this->maybeSplit(m_Clusters.begin())) { this->cluster(x, clusters, count); } } else { using TDoubleSizePr = std::pair<double, std::size_t>; using TMaxAccumulator = CBasicStatistics::SMax<TDoubleSizePr, 2>::TAccumulator; TMaxAccumulator closest; for (std::size_t i = 0; i < m_Clusters.size(); ++i) { closest.add({m_Clusters[i].logLikelihoodFromCluster(m_WeightCalc, x), i}); } closest.sort(); LOG_TRACE(<< "closest = " << closest); double likelihood0 = closest[0].first; double likelihood1 = closest[1].first; // Normalize the likelihood values. double p0 = 1.0; double p1 = std::exp(likelihood1 - likelihood0); double normalizer = p0 + p1; p0 /= normalizer; p1 /= normalizer; LOG_TRACE(<< "probabilities = [" << p0 << "," << p1 << "]"); auto cluster0 = m_Clusters.begin() + closest[0].second; auto cluster1 = m_Clusters.begin() + closest[1].second; if (p1 < HARD_ASSIGNMENT_THRESHOLD * p0) { LOG_TRACE(<< "Adding " << x << " to " << cluster0->centre()); cluster0->add(x, count); clusters.emplace_back(cluster0->index(), count); if (this->maybeSplit(cluster0) || this->maybeMerge(cluster0)) { this->cluster(x, clusters, count); } } else { // Get the weighted counts. double count0 = count * p0; double count1 = count * p1; LOG_TRACE(<< "Soft adding " << x << " " << count0 << " to " << cluster0->centre() << " and " << count1 << " to " << cluster1->centre()); cluster0->add(x, count0); cluster1->add(x, count1); clusters.emplace_back(cluster0->index(), count0); clusters.emplace_back(cluster1->index(), count1); if (this->maybeSplit(cluster0) || this->maybeSplit(cluster1) || this->maybeMerge(cluster0) || this->maybeMerge(cluster1)) { this->cluster(x, clusters, count); } } } if (this->prune()) { this->cluster(x, clusters, count); } } //! Update the clustering with \p points. void add(const TPointPreciseDoublePrVec& x) override { if (m_Clusters.empty()) { m_Clusters.emplace_back(*this); } TSizeDoublePr2Vec dummy; for (const auto& x_ : x) { this->add(x_.first, dummy, x_.second); } } //! Propagate the clustering forwards by \p time. //! //! The cluster priors relax back to non-informative and the //! cluster probabilities become less at a rate controlled by //! the decay rate parameter (optionally supplied to the constructor). //! //! \param time The time increment to apply. //! \note \p time must be non negative. void propagateForwardsByTime(double time) override { if (time < 0.0) { LOG_ERROR(<< "Can't propagate backwards in time"); return; } m_HistoryLength = (m_HistoryLength + time) * std::exp(-m_DecayRate * time); for (auto& cluster : m_Clusters) { cluster.propagateForwardsByTime(time); } } //! Sample the cluster with index \p index. //! //! \param[in] index The index of the cluster to sample. //! \param[in] numberSamples The desired number of samples. //! \param[out] samples Filled in with the samples. //! \return True if the cluster could be sampled and false otherwise. bool sample(std::size_t index, std::size_t numberSamples, TPointPreciseVec& samples) const override { const CCluster* cluster = this->cluster(index); if (cluster == nullptr) { LOG_ERROR(<< "Cluster " << index << " doesn't exist"); return false; } cluster->sample(numberSamples, samples); return true; } //! Get the probability of the cluster with index \p index. //! //! \param[in] index The index of the cluster of interest. //! \return The probability of the cluster identified by \p index. double probability(std::size_t index) const override { double weight = 0.0; double Z = 0.0; for (const auto& cluster : m_Clusters) { if (cluster.index() == index) { weight = cluster.weight(maths_t::E_ClustersFractionWeight); } Z += cluster.weight(maths_t::E_ClustersFractionWeight); } return Z == 0.0 ? 0.0 : weight / Z; } //! Debug the memory used by the object. void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const override { mem->setName("CXMeansOnline"); core::memory_debug::dynamicSize("m_ClusterIndexGenerator", m_ClusterIndexGenerator, mem); core::memory_debug::dynamicSize("m_Clusters", m_Clusters, mem); } //! Get the memory used by the object. std::size_t memoryUsage() const override { std::size_t mem = core::memory::dynamicSize(m_ClusterIndexGenerator); mem += core::memory::dynamicSize(m_Clusters); return mem; } //! Get the static size of this object - used for virtual hierarchies std::size_t staticSize() const override { return sizeof(*this); } //! Get a checksum for this object. std::uint64_t checksum(std::uint64_t seed = 0) const override { seed = CChecksum::calculate(seed, m_DataType); seed = CChecksum::calculate(seed, m_DecayRate); seed = CChecksum::calculate(seed, m_HistoryLength); seed = CChecksum::calculate(seed, m_WeightCalc); return CChecksum::calculate(seed, m_Clusters); } //@} //! The total count of points. double count() const { return std::accumulate(m_Clusters.begin(), m_Clusters.end(), 0.0, [](double count, const CCluster& cluster) { return count + cluster.count(); }); } //! Get the index generator. CClustererTypes::CIndexGenerator& indexGenerator() { return m_ClusterIndexGenerator; } protected: //! Restore by traversing a state document bool acceptRestoreTraverser(const SDistributionRestoreParams& params, core::CStateRestoreTraverser& traverser) { do { const std::string& name = traverser.name(); RESTORE_SETUP_TEARDOWN(CLUSTER_TAG, CCluster cluster(*this), traverser.traverseSubLevel([&](auto& traverser_) { return cluster.acceptRestoreTraverser(params, traverser_); }), m_Clusters.push_back(cluster)) RESTORE_SETUP_TEARDOWN(DECAY_RATE_TAG, double decayRate, core::CStringUtils::stringToType(traverser.value(), decayRate), this->decayRate(decayRate)) RESTORE(HISTORY_LENGTH_TAG, m_HistoryLength.fromString(traverser.value())) RESTORE(RNG_TAG, m_Rng.fromString(traverser.value())); RESTORE(CLUSTER_INDEX_GENERATOR_TAG, traverser.traverseSubLevel([this](auto& traverser_) { return m_ClusterIndexGenerator.acceptRestoreTraverser(traverser_); })) RESTORE_ENUM(WEIGHT_CALC_TAG, m_WeightCalc, maths_t::EClusterWeightCalc) RESTORE(MINIMUM_CLUSTER_FRACTION_TAG, m_MinimumClusterFraction.fromString(traverser.value())) RESTORE(MINIMUM_CLUSTER_COUNT_TAG, m_MinimumClusterCount.fromString(traverser.value())) } while (traverser.next()); return true; } //! Get the cluster with the index \p index. const CCluster* cluster(std::size_t index) const { for (const auto& cluster : m_Clusters) { if (cluster.index() == index) { return &cluster; } } return nullptr; } //! Compute the minimum split count. double minimumSplitCount() const { double result = m_MinimumClusterCount; if (m_MinimumClusterFraction > 0.0) { double count = this->count(); double scale = m_HistoryLength * (1.0 - std::exp(-m_InitialDecayRate)); count *= m_MinimumClusterFraction / std::max(scale, 1.0); result = std::max(result, count); } LOG_TRACE(<< "minimumSplitCount = " << result); return result; } //! Split \p cluster if we find a good split. bool maybeSplit(TClusterVecItr cluster) { if (cluster == m_Clusters.end()) { return false; } if (TOptionalClusterClusterPr split = cluster->split( m_Rng, this->minimumSplitCount(), m_ClusterIndexGenerator)) { LOG_TRACE(<< "Splitting cluster " << cluster->index() << " at " << cluster->centre()); std::size_t index = cluster->index(); *cluster = split->first; m_Clusters.push_back(split->second); (this->splitFunc())(index, split->first.index(), split->second.index()); return true; } return false; } //! Merge \p cluster and \p adjacentCluster if they are close enough. bool maybeMerge(TClusterVecItr cluster) { if (cluster == m_Clusters.end()) { return false; } CCluster* nearest = this->nearest(*cluster); if (nearest != nullptr && nearest->shouldMerge(*cluster)) { LOG_TRACE(<< "Merging cluster " << nearest->index() << " at " << nearest->centre() << " and cluster " << cluster->index() << " at " << cluster->centre()); std::size_t index1 = nearest->index(); std::size_t index2 = cluster->index(); CCluster merged = nearest->merge(*cluster, m_ClusterIndexGenerator); *nearest = merged; m_Clusters.erase(cluster); (this->mergeFunc())(index1, index2, merged.index()); return true; } return false; } //! Remove any clusters which are effectively dead. bool prune() { if (m_Clusters.size() <= 1) { return false; } using TDoubleSizePr = std::pair<double, std::size_t>; using TMinAccumulator = CBasicStatistics::COrderStatisticsStack<TDoubleSizePr, 1, COrderings::SFirstLess>; bool result = false; double minimumCount = this->minimumSplitCount() * CLUSTER_DELETE_FRACTION; // Get the clusters to prune. for (;;) { TMinAccumulator prune; for (std::size_t i = 0; i < m_Clusters.size(); ++i) { if (m_Clusters[i].count() < minimumCount) { prune.add({m_Clusters[i].count(), i}); } } if (prune.count() == 0) { break; } LOG_TRACE(<< "prune = " << prune); result = true; // Merge the clusters to prune in increasing count order. CCluster& cluster = m_Clusters[prune[0].second]; CCluster* nearest = this->nearest(cluster); if (nearest != nullptr) { LOG_TRACE(<< "Merging cluster " << cluster.index() << " at " << cluster.centre() << " and cluster " << nearest->index() << " at " << nearest->centre()); CCluster merge = nearest->merge(cluster, m_ClusterIndexGenerator); (this->mergeFunc())(cluster.index(), nearest->index(), merge.index()); nearest->swap(merge); } // Actually remove the pruned clusters. m_Clusters.erase(m_Clusters.begin() + prune[0].second); } return result; } //! Get the cluster closest to \p cluster. CCluster* nearest(const CCluster& cluster) { if (m_Clusters.size() == 1) { return &m_Clusters[0]; } using TMinAccumulator = CBasicStatistics::SMin<double>::TAccumulator; CCluster* result = nullptr; TMinAccumulator min; for (auto& candidate : m_Clusters) { if (cluster.index() == candidate.index()) { continue; } if (min.add(CCluster::BICGain(cluster, candidate))) { result = &candidate; } } if (result == nullptr) { LOG_ERROR(<< "Couldn't find nearest cluster"); } return result; } //! Get the clusters. const TClusterVec& clusters() const { return m_Clusters; } private: //! \brief Checks if probabilities are less than a specified threshold. class CProbabilityLessThan { public: explicit CProbabilityLessThan(double threshold) : m_Threshold(threshold) {} bool operator()(const TSizeDoublePr& p) const { return p.second < m_Threshold; } private: double m_Threshold; }; private: //! \name Tags for Persisting CXMeansOnline //@{ static const core::TPersistenceTag WEIGHT_CALC_TAG; static const core::TPersistenceTag MINIMUM_CLUSTER_FRACTION_TAG; static const core::TPersistenceTag MINIMUM_CLUSTER_COUNT_TAG; static const core::TPersistenceTag WINSORIZATION_CONFIDENCE_INTERVAL_TAG; static const core::TPersistenceTag CLUSTER_INDEX_GENERATOR_TAG; static const core::TPersistenceTag CLUSTER_TAG; static const core::TPersistenceTag RNG_TAG; static const core::TPersistenceTag DECAY_RATE_TAG; static const core::TPersistenceTag HISTORY_LENGTH_TAG; //@} //! \name Tags for Persisting CXMeansOnline::CCluster //@{ static const core::TPersistenceTag INDEX_TAG; static const core::TPersistenceTag COVARIANCES_TAG; static const core::TPersistenceTag STRUCTURE_TAG; //@} //! The minimum Kullback-Leibler divergence at which we'll //! split a cluster. static const double MINIMUM_SPLIT_GAIN; //! The maximum Kullback-Leibler divergence for which we'll //! merge two cluster. This is intended to introduce hysteresis //! in the cluster creation and deletion process and so should //! be less than the minimum split distance. static const double MAXIMUM_MERGE_DISTANCE; //! The default fraction of the minimum cluster split count //! for which we'll delete a cluster. This is intended to //! introduce hysteresis in the cluster creation and deletion //! process and so should be in the range (0, 1). static const double CLUSTER_DELETE_FRACTION; //! The size of the data we use to maintain cluster detail. static const std::size_t STRUCTURE_SIZE; //! 1 - "smallest hard assignment weight". static const double HARD_ASSIGNMENT_THRESHOLD; private: //! The random number generator. CPRNG::CXorOShiro128Plus m_Rng; //! The type of data being clustered. maths_t::EDataType m_DataType; //! The style of the cluster weight calculation (see maths_t::EClusterWeightCalc). maths_t::EClusterWeightCalc m_WeightCalc = maths_t::E_ClustersEqualWeight; //! The initial rate at which information is lost. CFloatStorage m_InitialDecayRate; //! The rate at which information is lost. CFloatStorage m_DecayRate; //! A measure of the length of history of the data clustered. CFloatStorage m_HistoryLength = 0.0; //! The minimum cluster fractional count. CFloatStorage m_MinimumClusterFraction = 0.0; //! The minimum cluster count. CFloatStorage m_MinimumClusterCount = 0.0; //! The minimum count for a category in the sketch to cluster. CFloatStorage m_MinimumCategoryCount = 0.0; //! A generator of unique cluster indices. CClustererTypes::CIndexGenerator m_ClusterIndexGenerator; //! The clusters. TClusterVec m_Clusters; }; template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::WEIGHT_CALC_TAG("a", "weight_calc"); template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::MINIMUM_CLUSTER_FRACTION_TAG("b", "minimum_cluster_fraction"); template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::MINIMUM_CLUSTER_COUNT_TAG("c", "minimum_cluster_count"); template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::CLUSTER_INDEX_GENERATOR_TAG("e", "cluster_index_generator"); template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::CLUSTER_TAG("f", "cluster"); template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::RNG_TAG("g", "rng"); template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::DECAY_RATE_TAG("h", "decay_rate"); template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::HISTORY_LENGTH_TAG("i", "history_length"); template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::INDEX_TAG("a", "index"); template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::COVARIANCES_TAG("b", "covariances"); template<typename T, std::size_t N> const core::TPersistenceTag CXMeansOnline<T, N>::STRUCTURE_TAG("c", "structure"); template<typename T, std::size_t N> const double CXMeansOnline<T, N>::MINIMUM_SPLIT_GAIN(6.0); template<typename T, std::size_t N> const double CXMeansOnline<T, N>::MAXIMUM_MERGE_DISTANCE(2.0); template<typename T, std::size_t N> const double CXMeansOnline<T, N>::CLUSTER_DELETE_FRACTION(0.8); template<typename T, std::size_t N> const std::size_t CXMeansOnline<T, N>::STRUCTURE_SIZE(24); template<typename T, std::size_t N> const double CXMeansOnline<T, N>::HARD_ASSIGNMENT_THRESHOLD(0.01); } } } #endif // INCLUDED_ml_maths_common_CXMeansOnline_h