include/maths/common/CXMeansOnline.h (894 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#ifndef INCLUDED_ml_maths_common_CXMeansOnline_h
#define INCLUDED_ml_maths_common_CXMeansOnline_h
#include <core/CLogger.h>
#include <core/CStatePersistInserter.h>
#include <core/CStateRestoreTraverser.h>
#include <core/RestoreMacros.h>
#include <maths/common/CBasicStatistics.h>
#include <maths/common/CBasicStatisticsPersist.h>
#include <maths/common/CClusterer.h>
#include <maths/common/CInformationCriteria.h>
#include <maths/common/CKMeansOnline.h>
#include <maths/common/CLinearAlgebra.h>
#include <maths/common/CLinearAlgebraPersist.h>
#include <maths/common/COrderings.h>
#include <maths/common/COrderingsSimultaneousSort.h>
#include <maths/common/CPRNG.h>
#include <maths/common/CRestoreParams.h>
#include <maths/common/CSphericalCluster.h>
#include <maths/common/CTypeTraits.h>
#include <maths/common/Constants.h>
#include <maths/common/MathsTypes.h>
#include <cmath>
#include <cstddef>
#include <numeric>
#include <optional>
#include <vector>
namespace ml {
namespace maths {
namespace common {
//! \brief A single pass online clusterer based on the x-means
//! algorithm of Pelleg and Moore.
//!
//! DESCRIPTION:\n
//! This implements the CClusterer contract targeting the k-means
//! optimization objective (the standard implementation is not possible
//! because the data are processed using the turnstile model).
//!
//! As with x-means clustering proceeds top down, so we only create
//! a split when we are confident. This minimizes our state size and
//! is important for anomaly detection because over splitting the data
//! significantly impairs anomaly detection. An information theoretic
//! criterion is used to decide whether or not to split clusters;
//! specifically, we compare the BIC of the most promising split with
//! that of the unsplit data and require strong evidence for the split.
//! A sketch data structure is used to keep state size down on which
//! to run k-means. In addition, clusters can be merged if they are no
//! longer worth keeping (again based on BIC, but with some hysteresis).
//!
//! Verses standard x-means this has one additional feature which is
//! particularly desirable for anomaly detection: the minimum cluster
//! size (in number of data points) and fraction (in proportion of data
//! points) can both be controlled. Typically we don't want to create
//! clusters for a small number of well separated points, since these
//! may be genuine anomalies. This principally affects the search for
//! candidate splits of the data.
//!
//! Note that this is a soft clustering so that we assign the soft
//! membership of a point to a cluster based on the probability that
//! it is generated by the corresponding normal. However, this is not
//! trying to learn a mixture distribution representation of the data,
//! which would require more involved inference scheme such as EM. It
//! is expected to give largely order (of points processed) invariant
//! unsupervised clustering of the data which identifies reasonably
//! separated clusters.
template<typename T, std::size_t N>
class CXMeansOnline : public CClusterer<CVectorNx1<T, N>> {
public:
using TPoint = CVectorNx1<T, N>;
using TPointVec = std::vector<TPoint>;
using TClusterer = CClusterer<TPoint>;
using TPointPrecise = typename TClusterer::TPointPrecise;
using TPointPreciseVec = typename TClusterer::TPointPreciseVec;
using TPointPreciseDoublePrVec = typename TClusterer::TPointPreciseDoublePrVec;
using TSizeDoublePr = typename TClusterer::TSizeDoublePr;
using TSizeDoublePr2Vec = typename TClusterer::TSizeDoublePr2Vec;
using TDoubleVec = std::vector<double>;
using TDoubleVecVec = std::vector<TDoubleVec>;
using TSizeVec = std::vector<std::size_t>;
using TSizeVecVec = std::vector<TSizeVec>;
using TPrecise = typename SPromoted<T>::Type;
using TMatrixPrecise = CSymmetricMatrixNxN<TPrecise, N>;
using TCovariances = CBasicStatistics::SSampleCovariances<TPointPrecise>;
using TSphericalCluster = typename CSphericalCluster<TPoint>::Type;
using TSphericalClusterVec = std::vector<TSphericalCluster>;
using TSphericalClusterVecVec = std::vector<TSphericalClusterVec>;
using TKMeansOnline = CKMeansOnline<TPoint>;
using TKMeansOnlineVec = std::vector<TKMeansOnline>;
class CCluster;
using TClusterClusterPr = std::pair<CCluster, CCluster>;
using TOptionalClusterClusterPr = std::optional<TClusterClusterPr>;
//! \brief Represents a cluster.
class CCluster {
public:
explicit CCluster(const CXMeansOnline& clusterer)
: m_Index{clusterer.m_ClusterIndexGenerator.next()},
m_DataType{clusterer.m_DataType}, m_DecayRate{clusterer.m_DecayRate},
m_Covariances{N}, m_Structure{STRUCTURE_SIZE, clusterer.m_DecayRate,
MINIMUM_CATEGORY_COUNT, STRUCTURE_SIZE / 2} {}
//! Initialize by traversing a state document.
bool acceptRestoreTraverser(const SDistributionRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(INDEX_TAG, m_Index)
RESTORE(COVARIANCES_TAG, m_Covariances.fromDelimited(traverser.value()))
RESTORE(STRUCTURE_TAG, traverser.traverseSubLevel([&](auto& traverser_) {
return m_Structure.acceptRestoreTraverser(params, traverser_);
}))
} while (traverser.next());
return true;
}
//! Persist state by passing information to the supplied inserter.
void acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(INDEX_TAG, m_Index);
inserter.insertValue(COVARIANCES_TAG, m_Covariances.toDelimited());
inserter.insertLevel(STRUCTURE_TAG, [this](auto& inserter_) {
m_Structure.acceptPersistInserter(inserter_);
});
}
//! Efficiently swap the contents of this and \p other.
void swap(CCluster& other) {
std::swap(m_Index, other.m_Index);
std::swap(m_DataType, other.m_DataType);
std::swap(m_DecayRate, other.m_DecayRate);
std::swap(m_Covariances, other.m_Covariances);
m_Structure.swap(other.m_Structure);
}
//! Set the type of data in the cluster.
void dataType(maths_t::EDataType value) { m_DataType = value; }
//! Set the rate at which information is aged out.
void decayRate(double value) {
m_DecayRate = value;
m_Structure.decayRate(value);
}
//! Add \p x_ to this cluster.
void add(const TPointPrecise& x, double count) {
switch (m_DataType) {
case maths_t::E_IntegerData: {
TSphericalCluster x_(x, SCountAndVariance(count, 1.0 / 12.0));
m_Covariances.add(x_);
break;
}
case maths_t::E_DiscreteData:
case maths_t::E_ContinuousData:
case maths_t::E_MixedData:
m_Covariances.add(x, TPointPrecise(count));
break;
}
m_Structure.add(x, count);
}
//! Propagate the cluster forwards by \p time.
void propagateForwardsByTime(double time) {
double alpha = std::exp(-this->scaledDecayRate() * time);
m_Covariances.age(alpha);
m_Structure.age(alpha);
}
//! Get the unique index of this cluster.
std::size_t index() const { return m_Index; }
//! Get the centre of the cluster.
//!
//! This is defined as the sample mean.
const TPointPrecise& centre() const {
return CBasicStatistics::mean(m_Covariances);
}
//! Get the spread of the cluster.
//!
//! This is defined as the trace of the sample covariance matrix.
double spread() const {
return std::sqrt(
CBasicStatistics::maximumLikelihoodCovariances(m_Covariances).trace() /
static_cast<double>(N));
}
//! Get the sample covariance matrix this cluster.
const TCovariances& covariances() const { return m_Covariances; }
//! Get the total count of values added to the cluster.
double count() const { return CBasicStatistics::count(m_Covariances); }
//! Get the weight of the cluster.
double weight(maths_t::EClusterWeightCalc calc) const {
switch (calc) {
case maths_t::E_ClustersEqualWeight:
return 1.0;
case maths_t::E_ClustersFractionWeight:
return this->count();
}
LOG_ABORT(<< "Unexpected calculation style " << calc);
return 1.0;
}
//! Get the likelihood that \p x is from this cluster.
double logLikelihoodFromCluster(maths_t::EClusterWeightCalc calc,
const TPointPrecise& x) const {
double likelihood;
const TPointPrecise& mean = CBasicStatistics::mean(m_Covariances);
const TMatrixPrecise& covariances =
CBasicStatistics::maximumLikelihoodCovariances(m_Covariances);
maths_t::EFloatingPointErrorStatus status =
gaussianLogLikelihood(covariances, x - mean, likelihood, false);
if ((status & maths_t::E_FpFailed) != 0) {
LOG_ERROR(<< "Unable to compute likelihood for " << x
<< " and cluster " << m_Index);
return core::constants::LOG_MIN_DOUBLE - 1.0;
}
if ((status & maths_t::E_FpOverflowed) != 0) {
return likelihood;
}
return likelihood + std::log(this->weight(calc));
}
//! Get \p numberSamples from this cluster.
void sample(std::size_t numberSamples, TPointPreciseVec& samples) const {
m_Structure.sample(numberSamples, samples);
}
//! Try and find a split by a full search of the binary tree
//! of possible optimal 2-splits of the data.
//!
//! \param[in] minimumCount The minimum count of a cluster
//! in the split.
//! \param[in] indexGenerator The unique cluster identifier
//! generator.
TOptionalClusterClusterPr split(CPRNG::CXorOShiro128Plus& rng,
double minimumCount,
CClustererTypes::CIndexGenerator& indexGenerator) {
// We do our clustering top down to minimize space and avoid
// making splits before we are confident they exist. This is
// important for anomaly detection because we do *not* want
// to fit a cluster to an outlier and judge it to be not
// anomalous as a result.
//
// By analogy to x-means we choose a candidate split of the
// data by minimizing the total within class deviation of the
// two classes. In order to decide whether or not to split we
// 1) impose minimum count on the smaller cluster 2) use an
// information theoretic criterion. Specifically, we threshold
// the BIC gain of using the multi-mode distribution verses
// the single mode distribution.
LOG_TRACE(<< "split");
if (m_Structure.buffering()) {
return {};
}
std::size_t n = m_Structure.size();
if (n < 2) {
return {};
}
TSizeVecVec split;
if (this->splitSearch(rng, minimumCount, split) == false) {
return {};
}
LOG_TRACE(<< "split = " << split);
TCovariances covariances[]{TCovariances(N), TCovariances(N)};
TSphericalClusterVec clusters;
this->sphericalClusters(clusters);
for (std::size_t i = 0; i < 2; ++i) {
for (std::size_t j = 0; j < split[i].size(); ++j) {
covariances[i].add(clusters[split[i][j]]);
}
}
TKMeansOnlineVec structure;
m_Structure.split(split, structure);
LOG_TRACE(<< "Splitting cluster " << this->index() << " at "
<< this->centre() << " left = " << structure[0].print()
<< ", right = " << structure[1].print());
std::size_t index[]{indexGenerator.next(), indexGenerator.next()};
indexGenerator.recycle(m_Index);
return TOptionalClusterClusterPr{
std::in_place,
CCluster{index[0], m_DataType, m_DecayRate, covariances[0],
std::move(structure[0])},
CCluster{index[1], m_DataType, m_DecayRate, covariances[1],
std::move(structure[1])}};
}
//! Check if this and \p other cluster should merge.
//!
//! \param[in] other The cluster to merge with this one.
bool shouldMerge(CCluster& other) {
return BICGain(*this, other) <= MAXIMUM_MERGE_DISTANCE;
}
//! Merge this and \p other cluster.
CCluster merge(const CCluster& other, CClustererTypes::CIndexGenerator& indexGenerator) {
TKMeansOnline structure{std::move(m_Structure)};
structure.merge(other.m_Structure);
CCluster result{indexGenerator.next(), m_DataType, m_DecayRate,
m_Covariances + other.m_Covariances, std::move(structure)};
indexGenerator.recycle(m_Index);
indexGenerator.recycle(other.m_Index);
return result;
}
//! Get a checksum for this object.
std::uint64_t checksum(std::uint64_t seed) const {
seed = CChecksum::calculate(seed, m_Index);
seed = CChecksum::calculate(seed, m_DataType);
seed = CChecksum::calculate(seed, m_DecayRate);
seed = CChecksum::calculate(seed, m_Covariances);
return CChecksum::calculate(seed, m_Structure);
}
//! Debug the memory used by this component.
void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CXMeansOnline");
core::memory_debug::dynamicSize("m_Structure", m_Structure, mem);
}
//! Get the memory used by this component.
std::size_t memoryUsage() const {
return core::memory::dynamicSize(m_Structure);
}
//! Get Bayes Information Criterion decrease in going from one
//! to two clusters.
//!
//! \note This is not necessarily positive.
static double BICGain(const CCluster& lhs, const CCluster& rhs) {
return BICGain(lhs.m_Covariances, rhs.m_Covariances);
}
protected:
CCluster(std::size_t index,
maths_t::EDataType dataType,
double decayRate,
const TCovariances& covariances,
TKMeansOnline structure)
: m_Index(index), m_DataType(dataType), m_DecayRate(decayRate),
m_Covariances(covariances), m_Structure(std::move(structure)) {}
//! Search for a split of the data that satisfies the constraints
//! on both the BIC divergence and minimum count.
//!
//! In order to handle the constraint on minimum count, we do a
//! breadth first search of the binary tree of optimal 2-splits
//! of subsets of the data looking for splits which satisfy the
//! constraints on *both* BIC divergence and count. The search
//! terminates at any node which can't be split subject to BIC.
//!
//! The intention of this is to find "natural" splits of the data
//! which would be obscured when splitting into the optimal 2-split.
//! This can occur when a small number of points (less than minimum
//! count) are sufficient far from the other that they split off
//! in preference to some other natural split of the data. Although
//! we can impose the count constraint when finding the optimal
//! 2-split this has associated problems. In particular, extreme
//! outliers then tend to rip sufficient points away from their
//! natural cluster in order to generate a new cluster.
bool splitSearch(CPRNG::CXorOShiro128Plus& rng, double minimumCount, TSizeVecVec& result) {
result.clear();
// The search visits a binary tree of candidate 2-splits of
// the data breadth first. If a suitable split is found on a
// level of the tree then the search terminates returning that
// split. Note that if a subset of the data can be split we
// also check that the corresponding full 2-split can be split
// subject to the same constraints (to avoid merging the two
// clusters straight away).
TSphericalClusterVec node;
this->sphericalClusters(node);
TSphericalClusterVec remainder;
remainder.reserve(node.size());
TSphericalClusterVecVec candidate;
for (;;) {
TKMeansOnline::kmeans(rng, node, 2, candidate);
LOG_TRACE(<< "candidate = " << candidate);
if (candidate.size() > 2) {
LOG_ERROR(<< "Expected 2-split: " << candidate);
break;
}
if (candidate[0].empty() || candidate[1].empty()) {
// This can happen if all the points are co-located,
// in which case we can't split this node anyway.
break;
}
// We use the Ledoit and Wolf optimal shrinkage estimate
// because the sample sizes here might be quite small in
// which case the variance of the covariance estimates can
// be large.
TCovariances covariances[]{TCovariances(N), TCovariances(N)};
CBasicStatistics::covariancesLedoitWolf(candidate[0], covariances[0]);
CBasicStatistics::covariancesLedoitWolf(candidate[1], covariances[1]);
double n[]{CBasicStatistics::count(covariances[0]),
CBasicStatistics::count(covariances[1])};
double nmin = std::min(n[0], n[1]);
// Check the count constraint.
bool satisfiesCount = (nmin >= minimumCount);
LOG_TRACE(<< "count = " << nmin << " (to split " << minimumCount << ")");
// Check the gain constraint.
double gain{BICGain(covariances[0], covariances[1])};
bool satisfiesGain{gain > MINIMUM_SPLIT_GAIN};
LOG_TRACE(<< "BIC(1) - BIC(2) = " << gain << " (to split "
<< MINIMUM_SPLIT_GAIN << ")");
if (satisfiesCount == false) {
// Recurse to the (one) node with sufficient count.
if (n[0] > minimumCount && candidate[0].size() > 1) {
node.swap(candidate[0]);
remainder.insert(remainder.end(), candidate[1].begin(),
candidate[1].end());
continue;
}
if (n[1] > minimumCount && candidate[1].size() > 1) {
node.swap(candidate[1]);
remainder.insert(remainder.end(), candidate[0].begin(),
candidate[0].end());
continue;
}
} else if (satisfiesGain) {
LOG_TRACE(<< "Checking full split");
TSizeVec assignment(remainder.size());
for (std::size_t i = 0; i < remainder.size(); ++i) {
assignment[i] = nearest(remainder[i], covariances);
}
for (std::size_t i = 0; i < assignment.size(); ++i) {
std::size_t j = assignment[i];
TCovariances ci(N);
ci.add(remainder[i]);
candidate[j].push_back(remainder[i]);
covariances[j] += ci;
n[j] += CBasicStatistics::count(ci);
}
gain = BICGain(covariances[0], covariances[1]);
LOG_TRACE(<< "BIC(1) - BIC(2) = " << gain << " (to split "
<< MINIMUM_SPLIT_GAIN << ")");
if (gain > MINIMUM_SPLIT_GAIN) {
LOG_TRACE(<< "splitting");
typename CSphericalCluster<TPoint>::SLess less;
result.resize(candidate.size());
TSphericalClusterVec clusters;
this->sphericalClusters(clusters);
TSizeVec indexes(clusters.size());
std::iota(indexes.begin(), indexes.end(), 0);
COrderings::simultaneousSortWith(
typename CSphericalCluster<TPoint>::SLess(), clusters, indexes);
for (std::size_t i = 0; i < candidate.size(); ++i) {
for (const auto& x : candidate[i]) {
std::size_t j = std::lower_bound(clusters.begin(),
clusters.end(), x, less) -
clusters.begin();
if (j >= clusters.size()) {
LOG_ERROR(<< "Missing " << x
<< " from clusters = " << clusters);
return false;
}
result[i].push_back(indexes[j]);
}
}
}
}
break;
}
return result.size() > 0;
}
//! Get the spherical clusters being maintained in the fine-
//! grained structure model of this cluster.
void sphericalClusters(TSphericalClusterVec& result) const {
m_Structure.clusters(result);
switch (m_DataType) {
case maths_t::E_IntegerData:
for (auto& x : result) {
x.annotation().s_Variance += 1.0 / 12.0;
}
break;
case maths_t::E_DiscreteData:
case maths_t::E_ContinuousData:
case maths_t::E_MixedData:
break;
}
}
//! Get the closest (in Mahalanobis distance) cluster to \p x.
static std::size_t nearest(const TSphericalCluster& x,
const TCovariances (&c)[2]) {
TPrecise d[]{0, 0};
TPointPrecise x_(x);
inverseQuadraticForm(CBasicStatistics::maximumLikelihoodCovariances(c[0]),
x_ - CBasicStatistics::mean(c[0]), d[0]);
inverseQuadraticForm(CBasicStatistics::maximumLikelihoodCovariances(c[1]),
x_ - CBasicStatistics::mean(c[1]), d[1]);
return d[0] < d[1] ? 0 : 1;
}
//! Get the Bayes Information Criterion gain, subject to Gaussian
//! assumptions, in representing \p lhs and \p rhs using one or
//! two modes.
static double BICGain(const TCovariances& lhs, const TCovariances& rhs) {
CGaussianInfoCriterion<TPoint, E_BIC> BIC1;
BIC1.add(lhs + rhs);
CGaussianInfoCriterion<TPoint, E_BIC> BIC2;
BIC2.add(lhs);
BIC2.add(rhs);
return BIC1.calculate() - BIC2.calculate();
}
private:
//! Get the scaled decay rate for use by propagateForwardsByTime.
double scaledDecayRate() const {
return std::pow(0.5, static_cast<double>(N)) * m_DecayRate;
}
private:
//! A unique identifier for this cluster.
std::size_t m_Index;
//! The type of data which will be clustered.
maths_t::EDataType m_DataType;
//! Controls the rate at which information is lost.
double m_DecayRate;
//! The mean, covariances of the data in this cluster.
TCovariances m_Covariances;
//! The data representing the internal structure of this cluster.
TKMeansOnline m_Structure;
};
using TClusterVec = std::vector<CCluster>;
using TClusterVecItr = typename TClusterVec::iterator;
public:
//! \name Life-cycle
//@{
//! Construct a new clusterer.
//!
//! \param[in] dataType The type of data which will be clustered.
//! \param[in] weightCalc The style of the cluster weight calculation
//! (see maths_t::EClusterWeightCalc for details).
//! \param[in] decayRate Controls the rate at which information is
//! lost from the clusters.
//! \param[in] minimumClusterFraction The minimum fractional count
//! of points in a cluster.
//! \param[in] minimumClusterCount The minimum count of points in a
//! cluster.
//! \param[in] minimumCategoryCount The minimum count for a category
//! in the sketch to cluster.
//! \param[in] splitFunc Optional callback for when a cluster is split.
//! \param[in] mergeFunc Optional callback for when two clusters are
CXMeansOnline(maths_t::EDataType dataType,
maths_t::EClusterWeightCalc weightCalc,
double decayRate = 0.0,
double minimumClusterFraction = MINIMUM_CLUSTER_SPLIT_FRACTION,
double minimumClusterCount = MINIMUM_CLUSTER_SPLIT_COUNT,
double minimumCategoryCount = MINIMUM_CATEGORY_COUNT,
const CClustererTypes::TSplitFunc& splitFunc = CClustererTypes::CDoNothing(),
const CClustererTypes::TMergeFunc& mergeFunc = CClustererTypes::CDoNothing())
: TClusterer(splitFunc, mergeFunc), m_DataType(dataType),
m_WeightCalc(weightCalc), m_InitialDecayRate(decayRate),
m_DecayRate(decayRate), m_MinimumClusterFraction(minimumClusterFraction),
m_MinimumClusterCount(minimumClusterCount),
m_MinimumCategoryCount(minimumCategoryCount),
m_Clusters(1, CCluster{*this}) {}
//! Construct by traversing a state document.
CXMeansOnline(const SDistributionRestoreParams& params, core::CStateRestoreTraverser& traverser)
: TClusterer(CClustererTypes::CDoNothing(), CClustererTypes::CDoNothing()),
m_DataType(params.s_DataType), m_InitialDecayRate(params.s_DecayRate),
m_DecayRate(params.s_DecayRate),
m_MinimumCategoryCount(params.s_MinimumCategoryCount) {
if (traverser.traverseSubLevel([&](auto& traverser_) {
return this->acceptRestoreTraverser(params, traverser_);
})) {
traverser.setBadState();
}
}
//! Construct by traversing a state document.
CXMeansOnline(const SDistributionRestoreParams& params,
const CClustererTypes::TSplitFunc& splitFunc,
const CClustererTypes::TMergeFunc& mergeFunc,
core::CStateRestoreTraverser& traverser)
: TClusterer(splitFunc, mergeFunc), m_DataType(params.s_DataType),
m_InitialDecayRate(params.s_DecayRate), m_DecayRate(params.s_DecayRate),
m_MinimumCategoryCount(params.s_MinimumCategoryCount) {
if (traverser.traverseSubLevel([&](auto& traverser_) {
return this->acceptRestoreTraverser(params, traverser_);
}) == false) {
traverser.setBadState();
}
}
//! The x-means clusterer has value semantics.
CXMeansOnline(const CXMeansOnline& other)
: TClusterer(other.splitFunc(), other.mergeFunc()), m_Rng(other.m_Rng),
m_DataType(other.m_DataType), m_WeightCalc(other.m_WeightCalc),
m_InitialDecayRate(other.m_InitialDecayRate),
m_DecayRate(other.m_DecayRate), m_HistoryLength(other.m_HistoryLength),
m_MinimumClusterFraction(other.m_MinimumClusterFraction),
m_MinimumClusterCount(other.m_MinimumClusterCount),
m_MinimumCategoryCount(other.m_MinimumCategoryCount),
m_ClusterIndexGenerator(other.m_ClusterIndexGenerator.deepCopy()),
m_Clusters(other.m_Clusters) {}
CXMeansOnline(CXMeansOnline&&) = default;
//! The x-means clusterer has value semantics.
CXMeansOnline& operator=(const CXMeansOnline& other) {
if (this != &other) {
CXMeansOnline tmp(other);
this->swap(tmp);
}
return *this;
}
CXMeansOnline& operator=(CXMeansOnline&&) = default;
//! Efficiently swap the contents of two x-means objects.
void swap(CXMeansOnline& other) {
this->TClusterer::swap(other);
std::swap(m_Rng, other.m_Rng);
std::swap(m_DataType, other.m_DataType);
std::swap(m_WeightCalc, other.m_WeightCalc);
std::swap(m_InitialDecayRate, other.m_InitialDecayRate);
std::swap(m_DecayRate, other.m_DecayRate);
std::swap(m_HistoryLength, other.m_HistoryLength);
std::swap(m_MinimumClusterFraction, other.m_MinimumClusterFraction);
std::swap(m_MinimumClusterCount, other.m_MinimumClusterCount);
std::swap(m_MinimumCategoryCount, other.m_MinimumCategoryCount);
std::swap(m_ClusterIndexGenerator, other.m_ClusterIndexGenerator);
m_Clusters.swap(other.m_Clusters);
}
//! \name Clusterer Contract
//@{
//! Get the tag name for this clusterer.
const core::TPersistenceTag& persistenceTag() const override {
return CClustererTypes::X_MEANS_ONLINE_TAG;
}
//! Persist state by passing information to the supplied inserter.
void acceptPersistInserter(core::CStatePersistInserter& inserter) const override {
for (const auto& cluster : m_Clusters) {
inserter.insertLevel(CLUSTER_TAG, [&cluster](auto& inserter_) {
cluster.acceptPersistInserter(inserter_);
});
}
inserter.insertValue(DECAY_RATE_TAG, m_DecayRate.toString());
inserter.insertValue(HISTORY_LENGTH_TAG, m_HistoryLength.toString());
inserter.insertValue(RNG_TAG, m_Rng.toString());
inserter.insertValue(WEIGHT_CALC_TAG, static_cast<int>(m_WeightCalc));
inserter.insertValue(MINIMUM_CLUSTER_FRACTION_TAG,
m_MinimumClusterFraction.toString());
inserter.insertValue(MINIMUM_CLUSTER_COUNT_TAG, m_MinimumClusterCount.toString());
inserter.insertLevel(CLUSTER_INDEX_GENERATOR_TAG, [this](auto& inserter_) {
m_ClusterIndexGenerator.acceptPersistInserter(inserter_);
});
}
//! Creates a copy of the clusterer.
//!
//! \warning Caller owns returned object.
CXMeansOnline* clone() const override { return new CXMeansOnline(*this); }
//! Clear the current clusterer state.
void clear() override {
*this = CXMeansOnline(m_DataType, m_WeightCalc, m_InitialDecayRate,
m_MinimumClusterFraction, m_MinimumClusterCount,
m_MinimumCategoryCount, this->splitFunc(),
this->mergeFunc());
}
//! Remove the cluster with \p index.
bool remove(std::size_t index) override {
if (m_Clusters.size() <= 1) {
return false;
}
auto i = std::find_if(m_Clusters.begin(), m_Clusters.end(), [&](const auto& cluster) {
return cluster.index() == index;
});
if (i != m_Clusters.end()) {
CCluster& cluster = *i;
CCluster* nearest = this->nearest(cluster);
if (nearest != nullptr) {
LOG_TRACE(<< "Merging cluster " << cluster.index() << " at "
<< cluster.centre() << " and cluster "
<< nearest->index() << " at " << nearest->centre());
CCluster merge = nearest->merge(cluster, m_ClusterIndexGenerator);
(this->mergeFunc())(cluster.index(), nearest->index(), merge.index());
nearest->swap(merge);
}
m_Clusters.erase(i);
return true;
}
return false;
}
//! Get the number of clusters.
std::size_t numberClusters() const override { return m_Clusters.size(); }
//! Set the type of data being clustered.
void dataType(maths_t::EDataType dataType) override {
m_DataType = dataType;
for (auto& cluster : m_Clusters) {
cluster.dataType(dataType);
}
}
//! Set the rate at which information is aged out.
void decayRate(double decayRate) override {
m_DecayRate = decayRate;
for (auto& cluster : m_Clusters) {
cluster.decayRate(decayRate);
}
}
//! Check if the cluster identified by \p index exists.
bool hasCluster(std::size_t index) const override {
return this->cluster(index) != nullptr;
}
//! Get the centre of the cluster identified by \p index.
bool clusterCentre(std::size_t index, TPointPrecise& result) const override {
const CCluster* cluster = this->cluster(index);
if (cluster == nullptr) {
LOG_ERROR(<< "Cluster " << index << " doesn't exist");
return false;
}
result = cluster->centre();
return true;
}
//! Get the spread of the cluster identified by \p index.
bool clusterSpread(std::size_t index, double& result) const override {
const CCluster* cluster = this->cluster(index);
if (cluster == nullptr) {
LOG_ERROR(<< "Cluster " << index << " doesn't exist");
return false;
}
result = cluster->spread();
return true;
}
//! Gets the index of the cluster(s) to which \p point belongs
//! together with their weighting factor.
void cluster(const TPointPrecise& point,
TSizeDoublePr2Vec& result,
double count = 1.0) const override {
result.clear();
if (m_Clusters.empty()) {
LOG_ERROR(<< "No clusters");
return;
}
// This does a soft assignment. Given we are finding a
// partitioning clustering (as a result of targeting
// the k-means objective) we only consider the case that
// the point comes from either the left or right cluster.
// A-priori the probability a randomly selected point
// comes from a cluster is proportional to its weight:
// P(i) = n(i) / Sum_j{ n(j) }
//
// Bayes theorem then immediately gives that the probability
// that a given point is from the i'th cluster
// P(i | x) = L(x | i) * P(i) / Z
//
// where Z is the normalization constant:
// Z = Sum_i{ P(i | x) }
result.reserve(m_Clusters.size());
double lmax = -std::numeric_limits<double>::max();
for (const auto& cluster : m_Clusters) {
double likelihood = cluster.logLikelihoodFromCluster(m_WeightCalc, point);
result.emplace_back(cluster.index(), likelihood);
lmax = std::max(lmax, likelihood);
}
double Z = 0.0;
for (auto& x : result) {
x.second = std::exp(x.second - lmax);
Z += x.second;
}
double pmax = 0.0;
for (auto& x : result) {
x.second /= Z;
pmax = std::max(pmax, x.second);
}
result.erase(std::remove_if(result.begin(), result.end(),
CProbabilityLessThan(HARD_ASSIGNMENT_THRESHOLD * pmax)),
result.end());
Z = 0.0;
for (const auto& x : result) {
Z += x.second;
}
for (auto& x : result) {
x.second *= count / Z;
}
}
//! Update the clustering with \p point and return its cluster(s)
//! together with their weighting factor.
void add(const TPointPrecise& x, TSizeDoublePr2Vec& clusters, double count = 1.0) override {
if (m_Clusters.size() == 1) {
LOG_TRACE(<< "Adding " << x << " to " << m_Clusters[0].centre());
m_Clusters[0].add(x, count);
clusters.emplace_back(m_Clusters[0].index(), count);
if (this->maybeSplit(m_Clusters.begin())) {
this->cluster(x, clusters, count);
}
} else {
using TDoubleSizePr = std::pair<double, std::size_t>;
using TMaxAccumulator = CBasicStatistics::SMax<TDoubleSizePr, 2>::TAccumulator;
TMaxAccumulator closest;
for (std::size_t i = 0; i < m_Clusters.size(); ++i) {
closest.add({m_Clusters[i].logLikelihoodFromCluster(m_WeightCalc, x), i});
}
closest.sort();
LOG_TRACE(<< "closest = " << closest);
double likelihood0 = closest[0].first;
double likelihood1 = closest[1].first;
// Normalize the likelihood values.
double p0 = 1.0;
double p1 = std::exp(likelihood1 - likelihood0);
double normalizer = p0 + p1;
p0 /= normalizer;
p1 /= normalizer;
LOG_TRACE(<< "probabilities = [" << p0 << "," << p1 << "]");
auto cluster0 = m_Clusters.begin() + closest[0].second;
auto cluster1 = m_Clusters.begin() + closest[1].second;
if (p1 < HARD_ASSIGNMENT_THRESHOLD * p0) {
LOG_TRACE(<< "Adding " << x << " to " << cluster0->centre());
cluster0->add(x, count);
clusters.emplace_back(cluster0->index(), count);
if (this->maybeSplit(cluster0) || this->maybeMerge(cluster0)) {
this->cluster(x, clusters, count);
}
} else {
// Get the weighted counts.
double count0 = count * p0;
double count1 = count * p1;
LOG_TRACE(<< "Soft adding " << x << " " << count0 << " to "
<< cluster0->centre() << " and " << count1 << " to "
<< cluster1->centre());
cluster0->add(x, count0);
cluster1->add(x, count1);
clusters.emplace_back(cluster0->index(), count0);
clusters.emplace_back(cluster1->index(), count1);
if (this->maybeSplit(cluster0) || this->maybeSplit(cluster1) ||
this->maybeMerge(cluster0) || this->maybeMerge(cluster1)) {
this->cluster(x, clusters, count);
}
}
}
if (this->prune()) {
this->cluster(x, clusters, count);
}
}
//! Update the clustering with \p points.
void add(const TPointPreciseDoublePrVec& x) override {
if (m_Clusters.empty()) {
m_Clusters.emplace_back(*this);
}
TSizeDoublePr2Vec dummy;
for (const auto& x_ : x) {
this->add(x_.first, dummy, x_.second);
}
}
//! Propagate the clustering forwards by \p time.
//!
//! The cluster priors relax back to non-informative and the
//! cluster probabilities become less at a rate controlled by
//! the decay rate parameter (optionally supplied to the constructor).
//!
//! \param time The time increment to apply.
//! \note \p time must be non negative.
void propagateForwardsByTime(double time) override {
if (time < 0.0) {
LOG_ERROR(<< "Can't propagate backwards in time");
return;
}
m_HistoryLength = (m_HistoryLength + time) * std::exp(-m_DecayRate * time);
for (auto& cluster : m_Clusters) {
cluster.propagateForwardsByTime(time);
}
}
//! Sample the cluster with index \p index.
//!
//! \param[in] index The index of the cluster to sample.
//! \param[in] numberSamples The desired number of samples.
//! \param[out] samples Filled in with the samples.
//! \return True if the cluster could be sampled and false otherwise.
bool sample(std::size_t index, std::size_t numberSamples, TPointPreciseVec& samples) const override {
const CCluster* cluster = this->cluster(index);
if (cluster == nullptr) {
LOG_ERROR(<< "Cluster " << index << " doesn't exist");
return false;
}
cluster->sample(numberSamples, samples);
return true;
}
//! Get the probability of the cluster with index \p index.
//!
//! \param[in] index The index of the cluster of interest.
//! \return The probability of the cluster identified by \p index.
double probability(std::size_t index) const override {
double weight = 0.0;
double Z = 0.0;
for (const auto& cluster : m_Clusters) {
if (cluster.index() == index) {
weight = cluster.weight(maths_t::E_ClustersFractionWeight);
}
Z += cluster.weight(maths_t::E_ClustersFractionWeight);
}
return Z == 0.0 ? 0.0 : weight / Z;
}
//! Debug the memory used by the object.
void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const override {
mem->setName("CXMeansOnline");
core::memory_debug::dynamicSize("m_ClusterIndexGenerator",
m_ClusterIndexGenerator, mem);
core::memory_debug::dynamicSize("m_Clusters", m_Clusters, mem);
}
//! Get the memory used by the object.
std::size_t memoryUsage() const override {
std::size_t mem = core::memory::dynamicSize(m_ClusterIndexGenerator);
mem += core::memory::dynamicSize(m_Clusters);
return mem;
}
//! Get the static size of this object - used for virtual hierarchies
std::size_t staticSize() const override { return sizeof(*this); }
//! Get a checksum for this object.
std::uint64_t checksum(std::uint64_t seed = 0) const override {
seed = CChecksum::calculate(seed, m_DataType);
seed = CChecksum::calculate(seed, m_DecayRate);
seed = CChecksum::calculate(seed, m_HistoryLength);
seed = CChecksum::calculate(seed, m_WeightCalc);
return CChecksum::calculate(seed, m_Clusters);
}
//@}
//! The total count of points.
double count() const {
return std::accumulate(m_Clusters.begin(), m_Clusters.end(), 0.0,
[](double count, const CCluster& cluster) {
return count + cluster.count();
});
}
//! Get the index generator.
CClustererTypes::CIndexGenerator& indexGenerator() {
return m_ClusterIndexGenerator;
}
protected:
//! Restore by traversing a state document
bool acceptRestoreTraverser(const SDistributionRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
do {
const std::string& name = traverser.name();
RESTORE_SETUP_TEARDOWN(CLUSTER_TAG, CCluster cluster(*this),
traverser.traverseSubLevel([&](auto& traverser_) {
return cluster.acceptRestoreTraverser(params, traverser_);
}),
m_Clusters.push_back(cluster))
RESTORE_SETUP_TEARDOWN(DECAY_RATE_TAG, double decayRate,
core::CStringUtils::stringToType(traverser.value(), decayRate),
this->decayRate(decayRate))
RESTORE(HISTORY_LENGTH_TAG, m_HistoryLength.fromString(traverser.value()))
RESTORE(RNG_TAG, m_Rng.fromString(traverser.value()));
RESTORE(CLUSTER_INDEX_GENERATOR_TAG,
traverser.traverseSubLevel([this](auto& traverser_) {
return m_ClusterIndexGenerator.acceptRestoreTraverser(traverser_);
}))
RESTORE_ENUM(WEIGHT_CALC_TAG, m_WeightCalc, maths_t::EClusterWeightCalc)
RESTORE(MINIMUM_CLUSTER_FRACTION_TAG,
m_MinimumClusterFraction.fromString(traverser.value()))
RESTORE(MINIMUM_CLUSTER_COUNT_TAG,
m_MinimumClusterCount.fromString(traverser.value()))
} while (traverser.next());
return true;
}
//! Get the cluster with the index \p index.
const CCluster* cluster(std::size_t index) const {
for (const auto& cluster : m_Clusters) {
if (cluster.index() == index) {
return &cluster;
}
}
return nullptr;
}
//! Compute the minimum split count.
double minimumSplitCount() const {
double result = m_MinimumClusterCount;
if (m_MinimumClusterFraction > 0.0) {
double count = this->count();
double scale = m_HistoryLength * (1.0 - std::exp(-m_InitialDecayRate));
count *= m_MinimumClusterFraction / std::max(scale, 1.0);
result = std::max(result, count);
}
LOG_TRACE(<< "minimumSplitCount = " << result);
return result;
}
//! Split \p cluster if we find a good split.
bool maybeSplit(TClusterVecItr cluster) {
if (cluster == m_Clusters.end()) {
return false;
}
if (TOptionalClusterClusterPr split = cluster->split(
m_Rng, this->minimumSplitCount(), m_ClusterIndexGenerator)) {
LOG_TRACE(<< "Splitting cluster " << cluster->index() << " at "
<< cluster->centre());
std::size_t index = cluster->index();
*cluster = split->first;
m_Clusters.push_back(split->second);
(this->splitFunc())(index, split->first.index(), split->second.index());
return true;
}
return false;
}
//! Merge \p cluster and \p adjacentCluster if they are close enough.
bool maybeMerge(TClusterVecItr cluster) {
if (cluster == m_Clusters.end()) {
return false;
}
CCluster* nearest = this->nearest(*cluster);
if (nearest != nullptr && nearest->shouldMerge(*cluster)) {
LOG_TRACE(<< "Merging cluster " << nearest->index() << " at "
<< nearest->centre() << " and cluster "
<< cluster->index() << " at " << cluster->centre());
std::size_t index1 = nearest->index();
std::size_t index2 = cluster->index();
CCluster merged = nearest->merge(*cluster, m_ClusterIndexGenerator);
*nearest = merged;
m_Clusters.erase(cluster);
(this->mergeFunc())(index1, index2, merged.index());
return true;
}
return false;
}
//! Remove any clusters which are effectively dead.
bool prune() {
if (m_Clusters.size() <= 1) {
return false;
}
using TDoubleSizePr = std::pair<double, std::size_t>;
using TMinAccumulator =
CBasicStatistics::COrderStatisticsStack<TDoubleSizePr, 1, COrderings::SFirstLess>;
bool result = false;
double minimumCount = this->minimumSplitCount() * CLUSTER_DELETE_FRACTION;
// Get the clusters to prune.
for (;;) {
TMinAccumulator prune;
for (std::size_t i = 0; i < m_Clusters.size(); ++i) {
if (m_Clusters[i].count() < minimumCount) {
prune.add({m_Clusters[i].count(), i});
}
}
if (prune.count() == 0) {
break;
}
LOG_TRACE(<< "prune = " << prune);
result = true;
// Merge the clusters to prune in increasing count order.
CCluster& cluster = m_Clusters[prune[0].second];
CCluster* nearest = this->nearest(cluster);
if (nearest != nullptr) {
LOG_TRACE(<< "Merging cluster " << cluster.index() << " at "
<< cluster.centre() << " and cluster "
<< nearest->index() << " at " << nearest->centre());
CCluster merge = nearest->merge(cluster, m_ClusterIndexGenerator);
(this->mergeFunc())(cluster.index(), nearest->index(), merge.index());
nearest->swap(merge);
}
// Actually remove the pruned clusters.
m_Clusters.erase(m_Clusters.begin() + prune[0].second);
}
return result;
}
//! Get the cluster closest to \p cluster.
CCluster* nearest(const CCluster& cluster) {
if (m_Clusters.size() == 1) {
return &m_Clusters[0];
}
using TMinAccumulator = CBasicStatistics::SMin<double>::TAccumulator;
CCluster* result = nullptr;
TMinAccumulator min;
for (auto& candidate : m_Clusters) {
if (cluster.index() == candidate.index()) {
continue;
}
if (min.add(CCluster::BICGain(cluster, candidate))) {
result = &candidate;
}
}
if (result == nullptr) {
LOG_ERROR(<< "Couldn't find nearest cluster");
}
return result;
}
//! Get the clusters.
const TClusterVec& clusters() const { return m_Clusters; }
private:
//! \brief Checks if probabilities are less than a specified threshold.
class CProbabilityLessThan {
public:
explicit CProbabilityLessThan(double threshold)
: m_Threshold(threshold) {}
bool operator()(const TSizeDoublePr& p) const {
return p.second < m_Threshold;
}
private:
double m_Threshold;
};
private:
//! \name Tags for Persisting CXMeansOnline
//@{
static const core::TPersistenceTag WEIGHT_CALC_TAG;
static const core::TPersistenceTag MINIMUM_CLUSTER_FRACTION_TAG;
static const core::TPersistenceTag MINIMUM_CLUSTER_COUNT_TAG;
static const core::TPersistenceTag WINSORIZATION_CONFIDENCE_INTERVAL_TAG;
static const core::TPersistenceTag CLUSTER_INDEX_GENERATOR_TAG;
static const core::TPersistenceTag CLUSTER_TAG;
static const core::TPersistenceTag RNG_TAG;
static const core::TPersistenceTag DECAY_RATE_TAG;
static const core::TPersistenceTag HISTORY_LENGTH_TAG;
//@}
//! \name Tags for Persisting CXMeansOnline::CCluster
//@{
static const core::TPersistenceTag INDEX_TAG;
static const core::TPersistenceTag COVARIANCES_TAG;
static const core::TPersistenceTag STRUCTURE_TAG;
//@}
//! The minimum Kullback-Leibler divergence at which we'll
//! split a cluster.
static const double MINIMUM_SPLIT_GAIN;
//! The maximum Kullback-Leibler divergence for which we'll
//! merge two cluster. This is intended to introduce hysteresis
//! in the cluster creation and deletion process and so should
//! be less than the minimum split distance.
static const double MAXIMUM_MERGE_DISTANCE;
//! The default fraction of the minimum cluster split count
//! for which we'll delete a cluster. This is intended to
//! introduce hysteresis in the cluster creation and deletion
//! process and so should be in the range (0, 1).
static const double CLUSTER_DELETE_FRACTION;
//! The size of the data we use to maintain cluster detail.
static const std::size_t STRUCTURE_SIZE;
//! 1 - "smallest hard assignment weight".
static const double HARD_ASSIGNMENT_THRESHOLD;
private:
//! The random number generator.
CPRNG::CXorOShiro128Plus m_Rng;
//! The type of data being clustered.
maths_t::EDataType m_DataType;
//! The style of the cluster weight calculation (see maths_t::EClusterWeightCalc).
maths_t::EClusterWeightCalc m_WeightCalc = maths_t::E_ClustersEqualWeight;
//! The initial rate at which information is lost.
CFloatStorage m_InitialDecayRate;
//! The rate at which information is lost.
CFloatStorage m_DecayRate;
//! A measure of the length of history of the data clustered.
CFloatStorage m_HistoryLength = 0.0;
//! The minimum cluster fractional count.
CFloatStorage m_MinimumClusterFraction = 0.0;
//! The minimum cluster count.
CFloatStorage m_MinimumClusterCount = 0.0;
//! The minimum count for a category in the sketch to cluster.
CFloatStorage m_MinimumCategoryCount = 0.0;
//! A generator of unique cluster indices.
CClustererTypes::CIndexGenerator m_ClusterIndexGenerator;
//! The clusters.
TClusterVec m_Clusters;
};
template<typename T, std::size_t N>
const core::TPersistenceTag CXMeansOnline<T, N>::WEIGHT_CALC_TAG("a", "weight_calc");
template<typename T, std::size_t N>
const core::TPersistenceTag
CXMeansOnline<T, N>::MINIMUM_CLUSTER_FRACTION_TAG("b", "minimum_cluster_fraction");
template<typename T, std::size_t N>
const core::TPersistenceTag
CXMeansOnline<T, N>::MINIMUM_CLUSTER_COUNT_TAG("c", "minimum_cluster_count");
template<typename T, std::size_t N>
const core::TPersistenceTag
CXMeansOnline<T, N>::CLUSTER_INDEX_GENERATOR_TAG("e", "cluster_index_generator");
template<typename T, std::size_t N>
const core::TPersistenceTag CXMeansOnline<T, N>::CLUSTER_TAG("f", "cluster");
template<typename T, std::size_t N>
const core::TPersistenceTag CXMeansOnline<T, N>::RNG_TAG("g", "rng");
template<typename T, std::size_t N>
const core::TPersistenceTag CXMeansOnline<T, N>::DECAY_RATE_TAG("h", "decay_rate");
template<typename T, std::size_t N>
const core::TPersistenceTag CXMeansOnline<T, N>::HISTORY_LENGTH_TAG("i", "history_length");
template<typename T, std::size_t N>
const core::TPersistenceTag CXMeansOnline<T, N>::INDEX_TAG("a", "index");
template<typename T, std::size_t N>
const core::TPersistenceTag CXMeansOnline<T, N>::COVARIANCES_TAG("b", "covariances");
template<typename T, std::size_t N>
const core::TPersistenceTag CXMeansOnline<T, N>::STRUCTURE_TAG("c", "structure");
template<typename T, std::size_t N>
const double CXMeansOnline<T, N>::MINIMUM_SPLIT_GAIN(6.0);
template<typename T, std::size_t N>
const double CXMeansOnline<T, N>::MAXIMUM_MERGE_DISTANCE(2.0);
template<typename T, std::size_t N>
const double CXMeansOnline<T, N>::CLUSTER_DELETE_FRACTION(0.8);
template<typename T, std::size_t N>
const std::size_t CXMeansOnline<T, N>::STRUCTURE_SIZE(24);
template<typename T, std::size_t N>
const double CXMeansOnline<T, N>::HARD_ASSIGNMENT_THRESHOLD(0.01);
}
}
}
#endif // INCLUDED_ml_maths_common_CXMeansOnline_h