lib/model/unittest/CMetricPopulationModelTest.cc (1,039 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <core/CIEEE754.h>
#include <core/CJsonStatePersistInserter.h>
#include <core/CJsonStateRestoreTraverser.h>
#include <core/CLogger.h>
#include <core/CPatternSet.h>
#include <core/CStringUtils.h>
#include <maths/common/CBasicStatistics.h>
#include <maths/common/CBasicStatisticsPersist.h>
#include <maths/common/COrderings.h>
#include <maths/common/COrderingsSimultaneousSort.h>
#include <maths/common/CSampling.h>
#include <model/CAnnotatedProbabilityBuilder.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CDataGatherer.h>
#include <model/CDetectionRule.h>
#include <model/CEventData.h>
#include <model/CInterimBucketCorrector.h>
#include <model/CMetricPopulationModel.h>
#include <model/CMetricPopulationModelFactory.h>
#include <model/CModelDetailsView.h>
#include <model/CPartitioningFields.h>
#include <model/CResourceMonitor.h>
#include <test/BoostTestCloseAbsolute.h>
#include <test/BoostTestPointerOutput.h>
#include <test/CRandomNumbers.h>
#include "CModelTestFixtureBase.h"
#include <boost/test/unit_test.hpp>
#include <boost/unordered_map.hpp>
#include <algorithm>
#include <cstddef>
#include <map>
#include <string>
#include <utility>
#include <vector>
BOOST_AUTO_TEST_SUITE(CMetricPopulationModelTest)
using namespace ml;
using namespace model;
namespace {
using TMinAccumulator = maths::common::CBasicStatistics::COrderStatisticsStack<double, 1>;
using TMaxAccumulator =
maths::common::CBasicStatistics::COrderStatisticsStack<double, 1, std::greater<double>>;
struct SValuesAndWeights {
maths::common::CModel::TTimeDouble2VecSizeTrVec s_Values;
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec s_TrendWeights;
maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec s_ResidualWeights;
};
const std::size_t numberAttributes{5};
const std::size_t numberPeople{10};
}
class CTestFixture : public CModelTestFixtureBase {
public:
void generateTestMessages(std::size_t dimension,
core_t::TTime startTime,
core_t::TTime bucketLength,
TMessageVec& messages) {
// The test case is as follows:
//
// attribute | 0 | 1 | 2 | 3 | 4
// ------------------+---------+---------+---------+---------+--------
// rate | 10 | 2 | 15 | 2 | 1
// ------------------+---------+---------+---------+---------+--------
// mean | 5 | 10 | 7 | 3 | 15
// ------------------+---------+---------+---------+---------+--------
// variance | 1 | 0.5 | 2 | 0.1 | 4
// ------------------+---------+---------+---------+---------+--------
// metric anomaly | (12,2), | - | (30,5), | (12,2), | (60,2)
// (bucket, people) | (15,3), | | (44,9) | (80,1) |
// | (40,6) | | | |
//
// There are 10 people, 5 attributes and 100 buckets.
const std::size_t numberBuckets{100};
TStrVec people;
for (std::size_t i = 0; i < numberPeople; ++i) {
people.push_back("p" + core::CStringUtils::typeToString(i));
}
LOG_DEBUG(<< "people = " << people);
TStrVec attributes;
for (std::size_t i = 0; i < numberAttributes; ++i) {
attributes.push_back("c" + core::CStringUtils::typeToString(i));
}
LOG_DEBUG(<< "attributes = " << attributes);
const TDoubleVec attributeRates{10.0, 2.0, 15.0, 2.0, 1.0};
const TDoubleVec means{5.0, 10.0, 7.0, 3.0, 15.0};
const TDoubleVec variances{1.0, 0.5, 2.0, 0.1, 4.0};
TSizeSizePrVecVec anomalies{{{40, 6}, {15, 3}, {12, 2}},
{},
{{44, 9}, {30, 5}},
{{80, 1}, {12, 2}},
{{60, 2}}};
test::CRandomNumbers rng;
for (std::size_t i = 0; i < numberBuckets; ++i, startTime += bucketLength) {
for (std::size_t j = 0; j < numberAttributes; ++j) {
TUIntVec samples;
rng.generatePoissonSamples(attributeRates[j], numberPeople, samples);
for (std::size_t k = 0; k < numberPeople; ++k) {
bool anomaly = !anomalies[j].empty() &&
anomalies[j].back().first == i &&
anomalies[j].back().second == k;
if (anomaly) {
samples[k] += 4;
anomalies[j].pop_back();
}
if (samples[k] == 0) {
continue;
}
TDoubleVec values;
rng.generateNormalSamples(means[j], variances[j],
dimension * samples[k], values);
for (std::size_t l = 0; l < values.size(); l += dimension) {
TDouble1Vec value(dimension);
for (std::size_t d = 0; d < dimension; ++d) {
double vd = values[l + d];
if (anomaly && (l % (2 * dimension)) == 0) {
vd += 6.0 * std::sqrt(variances[j]);
}
value[d] = this->roundToNearestPersisted(vd);
}
core_t::TTime dt = (static_cast<core_t::TTime>(l) * bucketLength) /
static_cast<core_t::TTime>(values.size());
SMessage message(startTime + dt, people[k], attributes[j], value);
messages.push_back(message);
}
}
}
}
LOG_DEBUG(<< "# messages = " << messages.size());
std::sort(messages.begin(), messages.end());
}
void makeModel(const SModelParams& params,
const model_t::TFeatureVec& features,
core_t::TTime startTime) {
this->makeModelT<CMetricPopulationModelFactory>(
params, features, startTime, model_t::E_MetricOnline, m_Gatherer, m_Model);
}
private:
double roundToNearestPersisted(double value) {
std::string valueAsString{core::CStringUtils::typeToStringPrecise(
value, core::CIEEE754::E_DoublePrecision)};
double result{0.0};
core::CStringUtils::stringToType(valueAsString, result);
return result;
}
};
BOOST_FIXTURE_TEST_CASE(testBasicAccessors, CTestFixture) {
// Check the correct data is retrieved by the basic model accessors.
using TMeanAccumulatorVec = std::vector<TMeanAccumulator>;
using TMinAccumulatorVec = std::vector<TMinAccumulator>;
using TMaxAccumulatorVec = std::vector<TMaxAccumulator>;
core_t::TTime startTime{1367280000};
const core_t::TTime bucketLength{3600};
TMessageVec messages;
generateTestMessages(1, startTime, bucketLength, messages);
LOG_DEBUG(<< "# messages = " << messages.size());
SModelParams params(bucketLength);
model_t::TFeatureVec features{model_t::E_PopulationMeanByPersonAndAttribute,
model_t::E_PopulationMinByPersonAndAttribute,
model_t::E_PopulationMaxByPersonAndAttribute};
this->makeModel(params, features, startTime);
auto* model = dynamic_cast<CMetricPopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(model);
TStrUInt64Map expectedBucketPersonCounts;
TMeanAccumulatorVec expectedBucketMeans(numberPeople * numberAttributes);
TMinAccumulatorVec expectedBucketMins(numberPeople * numberAttributes);
TMaxAccumulatorVec expectedBucketMaxs(numberPeople * numberAttributes);
for (const auto& message : messages) {
if (message.s_Time >= startTime + bucketLength) {
model->sample(startTime, startTime + bucketLength, m_ResourceMonitor);
LOG_DEBUG(<< "Testing bucket = [" << startTime << ","
<< startTime + bucketLength << ")");
BOOST_REQUIRE_EQUAL(numberPeople, m_Gatherer->numberActivePeople());
BOOST_REQUIRE_EQUAL(numberAttributes, m_Gatherer->numberActiveAttributes());
// Test the person and attribute invariants.
for (std::size_t j = 0; j < m_Gatherer->numberActivePeople(); ++j) {
const std::string& name = model->personName(j);
std::size_t pid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(name, pid));
BOOST_REQUIRE_EQUAL(j, pid);
}
for (std::size_t j = 0; j < m_Gatherer->numberActiveAttributes(); ++j) {
const std::string& name = model->attributeName(j);
std::size_t cid;
BOOST_TEST_REQUIRE(m_Gatherer->attributeId(name, cid));
BOOST_REQUIRE_EQUAL(j, cid);
}
LOG_DEBUG(<< "expected counts = " << expectedBucketPersonCounts);
TSizeVec expectedCurrentBucketPersonIds;
// Test the person counts.
for (const auto& count_ : expectedBucketPersonCounts) {
std::size_t pid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(count_.first, pid));
expectedCurrentBucketPersonIds.push_back(pid);
TOptionalUInt64 count = model->currentBucketCount(pid, startTime);
BOOST_TEST_REQUIRE(count.has_value());
BOOST_REQUIRE_EQUAL(count_.second, *count);
}
std::sort(expectedCurrentBucketPersonIds.begin(),
expectedCurrentBucketPersonIds.end());
TSizeVec bucketPersonIds;
model->currentBucketPersonIds(startTime, bucketPersonIds);
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedCurrentBucketPersonIds),
core::CContainerPrinter::print(bucketPersonIds));
if ((startTime / bucketLength) % 10 == 0) {
LOG_DEBUG(<< "expected means = " << expectedBucketMeans);
LOG_DEBUG(<< "expected mins = " << expectedBucketMins);
LOG_DEBUG(<< "expected maxs = " << expectedBucketMaxs);
}
for (std::size_t cid = 0; cid < numberAttributes; ++cid) {
for (std::size_t pid = 0; pid < numberPeople; ++pid) {
const TMeanAccumulator& expectedMean =
expectedBucketMeans[pid * numberAttributes + cid];
const TMinAccumulator& expectedMin =
expectedBucketMins[pid * numberAttributes + cid];
const TMaxAccumulator& expectedMax =
expectedBucketMaxs[pid * numberAttributes + cid];
TDouble1Vec mean = model->currentBucketValue(
model_t::E_PopulationMeanByPersonAndAttribute, pid, cid, startTime);
TDouble1Vec min = model->currentBucketValue(
model_t::E_PopulationMinByPersonAndAttribute, pid, cid, startTime);
TDouble1Vec max = model->currentBucketValue(
model_t::E_PopulationMaxByPersonAndAttribute, pid, cid, startTime);
if (mean.empty()) {
BOOST_TEST_REQUIRE(maths::common::CBasicStatistics::count(
expectedMean) == 0.0);
} else {
BOOST_TEST_REQUIRE(
maths::common::CBasicStatistics::count(expectedMean) > 0.0);
BOOST_REQUIRE_EQUAL(maths::common::CBasicStatistics::mean(expectedMean),
mean[0]);
}
if (min.empty()) {
BOOST_TEST_REQUIRE(expectedMin.count() == 0);
} else {
BOOST_TEST_REQUIRE(expectedMin.count() > 0);
BOOST_REQUIRE_EQUAL(expectedMin[0], min[0]);
}
if (max.empty()) {
BOOST_TEST_REQUIRE(expectedMax.count() == 0);
} else {
BOOST_TEST_REQUIRE(expectedMax.count() > 0);
BOOST_REQUIRE_EQUAL(expectedMax[0], max[0]);
}
}
}
expectedBucketMeans = TMeanAccumulatorVec(numberPeople * numberAttributes);
expectedBucketMins = TMinAccumulatorVec(numberPeople * numberAttributes);
expectedBucketMaxs = TMaxAccumulatorVec(numberPeople * numberAttributes);
expectedBucketPersonCounts.clear();
startTime += bucketLength;
}
CEventData eventData = this->addArrival(message, m_Gatherer);
std::size_t pid = *eventData.personId();
std::size_t cid = *eventData.attributeId();
++expectedBucketPersonCounts[message.s_Person];
expectedBucketMeans[pid * numberAttributes + cid].add((*message.s_Dbl1Vec)[0]);
expectedBucketMins[pid * numberAttributes + cid].add((*message.s_Dbl1Vec)[0]);
expectedBucketMaxs[pid * numberAttributes + cid].add((*message.s_Dbl1Vec)[0]);
}
}
BOOST_FIXTURE_TEST_CASE(testMinMaxAndMean, CTestFixture) {
// Check the correct data is read from the gatherer into the model on sample.
using TSizeTimeUMap = boost::unordered_map<std::size_t, core_t::TTime>;
using TSizeValueAndWeightsMap = std::map<std::size_t, SValuesAndWeights>;
using TSizeSizeValueAndWeightsMapMap = std::map<std::size_t, TSizeValueAndWeightsMap>;
using TSizeSizePrDoubleVecMap = std::map<TSizeSizePr, TDoubleVec>;
using TSizeSizePrMeanAccumulatorUMap = std::map<TSizeSizePr, TMeanAccumulator>;
using TSizeSizePrMinAccumulatorMap = std::map<TSizeSizePr, TMinAccumulator>;
using TSizeSizePrMaxAccumulatorMap = std::map<TSizeSizePr, TMaxAccumulator>;
using TMathsModelPtr = std::shared_ptr<maths::common::CModel>;
using TSizeMathsModelPtrMap = std::map<std::size_t, TMathsModelPtr>;
core_t::TTime startTime{1367280000};
const core_t::TTime bucketLength{3600};
TMessageVec messages;
generateTestMessages(1, startTime, bucketLength, messages);
SModelParams params(bucketLength);
params.s_InitialDecayRateMultiplier = 1.0;
params.s_MaximumUpdatesPerBucket = 0.0;
model_t::TFeatureVec features{model_t::E_PopulationMeanByPersonAndAttribute,
model_t::E_PopulationMinByPersonAndAttribute,
model_t::E_PopulationMaxByPersonAndAttribute};
this->makeModel(params, features, startTime);
auto* model = dynamic_cast<CMetricPopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(model);
CModelFactory::TFeatureMathsModelPtrPrVec models{
m_Factory->defaultFeatureModels(features, bucketLength, 1.0, false)};
BOOST_REQUIRE_EQUAL(features.size(), models.size());
BOOST_REQUIRE_EQUAL(features[0], models[0].first);
BOOST_REQUIRE_EQUAL(features[1], models[1].first);
BOOST_REQUIRE_EQUAL(features[2], models[2].first);
TSizeTimeUMap attributeFirstValueTimes;
TSizeSizePrMeanAccumulatorUMap sampleTimes;
TSizeSizePrMeanAccumulatorUMap sampleMeans;
TSizeSizePrMinAccumulatorMap sampleMins;
TSizeSizePrMaxAccumulatorMap sampleMaxs;
TSizeSizePrDoubleVecMap expectedSampleTimes;
TSizeSizePrDoubleVecMap expectedSamples[3];
TSizeMathsModelPtrMap expectedPopulationModels[3];
bool isNonNegative = true;
for (const auto& message : messages) {
if (message.s_Time >= startTime + bucketLength) {
model->sample(startTime, startTime + bucketLength, m_ResourceMonitor);
TSizeSizeValueAndWeightsMapMap populationWeightedSamples;
for (std::size_t feature = 0; feature < features.size(); ++feature) {
for (const auto& samples_ : expectedSamples[feature]) {
std::size_t pid = samples_.first.first;
std::size_t cid = samples_.first.second;
attributeFirstValueTimes.emplace(cid, startTime);
auto& attribute = populationWeightedSamples[feature][cid];
TMathsModelPtr& attributeModel = expectedPopulationModels[feature][cid];
if (attributeModel == nullptr) {
attributeModel = m_Factory->defaultFeatureModel(
features[feature], bucketLength, 1.0, false);
}
for (std::size_t j = 0; j < samples_.second.size(); ++j) {
// We round to the nearest integer time (note this has to
// match the behaviour of CMetricPartialStatistic::time).
core_t::TTime time_ = static_cast<core_t::TTime>(
expectedSampleTimes[{pid, cid}][j] + 0.5);
TDouble2Vec sample{samples_.second[j]};
attribute.s_Values.emplace_back(time_, sample, pid);
attribute.s_TrendWeights.push_back(
maths_t::CUnitWeights::unit<TDouble2Vec>(1));
attribute.s_ResidualWeights.push_back(
maths_t::CUnitWeights::unit<TDouble2Vec>(1));
double countWeight{model->sampleRateWeight(pid, cid)};
attributeModel->countWeights(
time_, sample, countWeight, countWeight, 1.0, 1.0,
attribute.s_TrendWeights.back(),
attribute.s_ResidualWeights.back());
}
}
}
for (auto& feature : populationWeightedSamples) {
for (auto& attribute : feature.second) {
maths::common::COrderings::simultaneousSort(
attribute.second.s_Values, attribute.second.s_TrendWeights,
attribute.second.s_ResidualWeights);
maths::common::CModelAddSamplesParams params_;
params_.isInteger(false)
.isNonNegative(isNonNegative)
.propagationInterval(1.0)
.trendWeights(attribute.second.s_TrendWeights)
.priorWeights(attribute.second.s_ResidualWeights)
.firstValueTime(attributeFirstValueTimes[attribute.first]);
expectedPopulationModels[feature.first][attribute.first]->addSamples(
params_, attribute.second.s_Values);
}
}
for (std::size_t feature = 0; feature < features.size(); ++feature) {
for (std::size_t cid = 0; cid < numberAttributes; ++cid) {
if (expectedPopulationModels[feature].count(cid) > 0) {
BOOST_REQUIRE_EQUAL(
expectedPopulationModels[feature][cid]->checksum(),
model->details()->model(features[feature], cid)->checksum());
}
}
}
expectedSampleTimes.clear();
expectedSamples[0].clear();
expectedSamples[1].clear();
expectedSamples[2].clear();
startTime += bucketLength;
}
CEventData eventData = this->addArrival(message, m_Gatherer);
std::size_t pid = *eventData.personId();
std::size_t cid = *eventData.attributeId();
isNonNegative &= (*message.s_Dbl1Vec)[0] < 0.0;
double sampleCount = m_Gatherer->sampleCount(cid);
if (sampleCount > 0.0) {
TSizeSizePr key{pid, cid};
sampleTimes[key].add(static_cast<double>(message.s_Time));
sampleMeans[key].add((*message.s_Dbl1Vec)[0]);
sampleMins[key].add((*message.s_Dbl1Vec)[0]);
sampleMaxs[key].add((*message.s_Dbl1Vec)[0]);
if (maths::common::CBasicStatistics::count(sampleTimes[key]) == sampleCount) {
expectedSampleTimes[key].push_back(
maths::common::CBasicStatistics::mean(sampleTimes[key]));
expectedSamples[0][key].push_back(
maths::common::CBasicStatistics::mean(sampleMeans[key]));
expectedSamples[1][key].push_back(sampleMins[key][0]);
expectedSamples[2][key].push_back(sampleMaxs[key][0]);
sampleTimes[key] = TMeanAccumulator();
sampleMeans[key] = TMeanAccumulator();
sampleMins[key] = TMinAccumulator();
sampleMaxs[key] = TMaxAccumulator();
}
}
}
}
BOOST_FIXTURE_TEST_CASE(testVarp, CTestFixture) {
core_t::TTime startTime{3600};
core_t::TTime bucketLength{3600};
SModelParams params(bucketLength);
model_t::TFeatureVec features{model_t::E_PopulationVarianceByPersonAndAttribute};
m_InterimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
m_Factory.reset(new CMetricPopulationModelFactory(params, m_InterimBucketCorrector));
m_Factory->features({model_t::E_PopulationVarianceByPersonAndAttribute});
m_Factory->fieldNames("", "P", "", "V", TStrVec{1, "I"});
this->makeModel(params, features, startTime);
CMetricPopulationModel& model =
static_cast<CMetricPopulationModel&>(*m_Model.get());
TDoubleStrPrVec b1{{1.0, "i1"}, {1.1, "i1"}, {1.01, "i2"}, {1.02, "i2"}};
TDoubleStrPrVec b2{{10.0, "i1"}};
TDoubleStrPrVec b3{{4.3, "i1"}, {4.4, "i1"}, {4.6, "i1"}, {4.2, "i1"}, {4.8, "i3"}};
TDoubleStrPrVec b4{{3.2, "i3"}, {3.3, "i3"}};
TDoubleStrPrVec b5{{20.1, "i2"}, {20.8, "i1"}, {20.9, "i1"}};
TDoubleStrPrVec b6{{4.1, "i1"}, {4.2, "i2"}, {3.9, "i2"}, {4.2, "i2"}};
TDoubleStrPrVec b7{{0.1, "i1"}, {0.3, "i1"}, {0.2, "i3"}};
TDoubleStrPrVec b8{{12.5, "i1"}, {12.3, "i2"}};
TDoubleStrPrVec b9{{6.9, "i1"}, {7.0, "i2"}, {7.1, "i3"},
{6.6, "i4"}, {7.1, "i5"}, {6.7, "i6"}};
// This last bucket is much more improbable, with influencer i2 being responsible
TDoubleStrPrVec b10{{0.3, "i2"}, {15.4, "i2"}, {77.62, "i2"},
{112.999, "i2"}, {5.1, "i1"}, {5.1, "i1"},
{5.1, "i1"}, {5.1, "i1"}, {5.1, "i1"}};
SAnnotatedProbability annotatedProbability;
core_t::TTime time = startTime;
processBucket(time, bucketLength, b1, m_Gatherer, model, annotatedProbability);
BOOST_TEST_REQUIRE(annotatedProbability.s_Probability > 0.8);
time += bucketLength;
processBucket(time, bucketLength, b2, m_Gatherer, model, annotatedProbability);
BOOST_TEST_REQUIRE(annotatedProbability.s_Probability > 0.8);
time += bucketLength;
processBucket(time, bucketLength, b3, m_Gatherer, model, annotatedProbability);
BOOST_TEST_REQUIRE(annotatedProbability.s_Probability > 0.8);
time += bucketLength;
processBucket(time, bucketLength, b4, m_Gatherer, model, annotatedProbability);
BOOST_TEST_REQUIRE(annotatedProbability.s_Probability > 0.8);
time += bucketLength;
processBucket(time, bucketLength, b5, m_Gatherer, model, annotatedProbability);
BOOST_TEST_REQUIRE(annotatedProbability.s_Probability > 0.8);
time += bucketLength;
processBucket(time, bucketLength, b6, m_Gatherer, model, annotatedProbability);
BOOST_TEST_REQUIRE(annotatedProbability.s_Probability > 0.8);
time += bucketLength;
processBucket(time, bucketLength, b7, m_Gatherer, model, annotatedProbability);
BOOST_TEST_REQUIRE(annotatedProbability.s_Probability > 0.8);
time += bucketLength;
processBucket(time, bucketLength, b8, m_Gatherer, model, annotatedProbability);
BOOST_TEST_REQUIRE(annotatedProbability.s_Probability > 0.8);
time += bucketLength;
processBucket(time, bucketLength, b9, m_Gatherer, model, annotatedProbability);
BOOST_TEST_REQUIRE(annotatedProbability.s_Probability < 0.85);
time += bucketLength;
processBucket(time, bucketLength, b10, m_Gatherer, model, annotatedProbability);
BOOST_TEST_REQUIRE(annotatedProbability.s_Probability < 0.1);
BOOST_REQUIRE_EQUAL(1, annotatedProbability.s_Influences.size());
BOOST_REQUIRE_EQUAL(std::string("I"),
*annotatedProbability.s_Influences[0].first.first);
BOOST_REQUIRE_EQUAL(std::string("i2"),
*annotatedProbability.s_Influences[0].first.second);
BOOST_REQUIRE_CLOSE_ABSOLUTE(1.0, annotatedProbability.s_Influences[0].second, 0.00001);
}
BOOST_FIXTURE_TEST_CASE(testComputeProbability, CTestFixture) {
maths::common::CSampling::CScopeMockRandomNumberGenerator scopeMockRng;
// Test that we correctly pick out synthetic the anomalies,
// their people and attributes.
core_t::TTime startTime{1367280000};
const core_t::TTime bucketLength{3600};
model_t::TFeatureVec features{model_t::E_PopulationMaxByPersonAndAttribute,
model_t::E_PopulationMeanLatLongByPersonAndAttribute};
for (auto& feature : features) {
LOG_DEBUG(<< "Testing " << model_t::print(feature));
TMessageVec messages;
generateTestMessages(model_t::dimension(feature), startTime, bucketLength, messages);
SModelParams params(bucketLength);
this->makeModel(params, {feature}, startTime);
auto* model = static_cast<CMetricPopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(model);
TStrVec expectedAnomalies{
"[12, p2, c0 c3]", "[15, p3, c0]", "[30, p5, c2]", "[40, p6, c0]",
"[44, p9, c2]", "[60, p2, c4]", "[80, p1, c3]"};
TAnomalyVec orderedAnomalies;
this->generateOrderedAnomalies(7, startTime, bucketLength, messages,
m_Gatherer, *model, orderedAnomalies);
BOOST_REQUIRE_EQUAL(expectedAnomalies.size(), orderedAnomalies.size());
for (std::size_t j = 0; j < orderedAnomalies.size(); ++j) {
BOOST_REQUIRE_EQUAL(expectedAnomalies[j], orderedAnomalies[j].print());
}
}
}
BOOST_FIXTURE_TEST_CASE(testPrune, CTestFixture) {
// This test has four people and five attributes. We expect
// person 2 and attributes 1, 2 and 5 to be deleted.
core_t::TTime startTime{1367280000};
const core_t::TTime bucketLength{3600};
const std::size_t numberBuckets{1000};
TStrVec people{"p1", "p2", "p3", "p4"};
TStrVec attributes{"c1", "c2", "c3", "c4", "c5"};
TStrSizePrVecVecVec eventCounts{{}, {}, {}, {}};
{
TStrSizePrVec attributeCounts{{attributes[0], 0}, {attributes[4], 0}};
eventCounts[0].resize(numberBuckets, attributeCounts);
eventCounts[0][0][0].second = 2; // p1, bucket 1, c1
eventCounts[0][2][0].second = 3; // p1, bucket 3, c1
eventCounts[0][4][0].second = 4; // p1, bucket 5, c1
eventCounts[0][8][0].second = 5; // p1, bucket 9, c1
eventCounts[0][9][0].second = 3; // p1, bucket 10, c1
eventCounts[0][0][1].second = 4; // p1, bucket 1, c5
eventCounts[0][3][1].second = 1; // p1, bucket 4, c5
eventCounts[0][7][1].second = 1; // p1, bucket 8, c5
eventCounts[0][8][1].second = 3; // p1, bucket 9, c5
}
{
TStrSizePrVec attributeCounts{
{attributes[0], 0}, {attributes[1], 0}, {attributes[2], 1}, {attributes[4], 0}};
eventCounts[1].resize(numberBuckets, attributeCounts);
eventCounts[1][1][0].second = 2; // p2, bucket 2, c1
eventCounts[1][3][0].second = 4; // p2, bucket 3, c1
eventCounts[1][4][0].second = 4; // p2, bucket 5, c1
eventCounts[1][7][0].second = 3; // p2, bucket 8, c1
eventCounts[1][5][1].second = 4; // p2, bucket 6, c2
eventCounts[1][6][1].second = 4; // p2, bucket 7, c2
eventCounts[1][7][1].second = 3; // p2, bucket 8, c2
eventCounts[1][8][1].second = 3; // p2, bucket 9, c2
eventCounts[1][0][3].second = 3; // p2, bucket 1, c5
eventCounts[1][1][3].second = 3; // p2, bucket 2, c5
eventCounts[1][2][3].second = 3; // p2, bucket 3, c5
eventCounts[1][3][3].second = 3; // p2, bucket 4, c5
eventCounts[1][4][3].second = 2; // p2, bucket 5, c5
eventCounts[1][5][3].second = 2; // p2, bucket 6, c5
}
{
TStrSizePrVec attributeCounts{{attributes[2], 0}, {attributes[3], 2}};
eventCounts[2].resize(numberBuckets, attributeCounts);
eventCounts[2][0][0].second = 1; // p3, bucket 1, c3
eventCounts[2][20][0].second = 4; // p3, bucket 21, c3
eventCounts[2][25][0].second = 6; // p3, bucket 26, c3
eventCounts[2][80][0].second = 3; // p3, bucket 81, c3
eventCounts[2][180][0].second = 7; // p3, bucket 181, c3
eventCounts[2][200][0].second = 9; // p3, bucket 201, c3
eventCounts[2][800][0].second = 2; // p3, bucket 801, c3
}
{
TStrSizePrVec attributeCounts{{attributes[1], 0}, {attributes[3], 3}};
eventCounts[3].resize(numberBuckets, attributeCounts);
eventCounts[3][0][0].second = 2; // p4, bucket 1, c2
eventCounts[3][1][0].second = 1; // p4, bucket 2, c2
eventCounts[3][2][0].second = 5; // p4, bucket 3, c2
eventCounts[3][15][0].second = 3; // p4, bucket 16, c2
eventCounts[3][26][0].second = 1; // p4, bucket 27, c2
eventCounts[3][70][0].second = 4; // p4, bucket 70, c2
}
const std::string expectedPeople[] = {people[1], people[2], people[3]};
const std::string expectedAttributes[] = {attributes[2], attributes[3]};
SModelParams params(bucketLength);
params.s_DecayRate = 0.01;
model_t::TFeatureVec features{model_t::E_PopulationMeanByPersonAndAttribute,
model_t::E_PopulationMinByPersonAndAttribute,
model_t::E_PopulationMaxByPersonAndAttribute};
this->makeModel(params, features, startTime);
CModelFactory::TDataGathererPtr expectedGatherer(m_Factory->makeDataGatherer({startTime}));
CModelFactory::SModelInitializationData expectedModelInitData(expectedGatherer);
CAnomalyDetectorModel::TModelPtr expectedModel(m_Factory->makeModel(expectedModelInitData));
BOOST_TEST_REQUIRE(expectedModel);
test::CRandomNumbers rng;
TMessageVec messages;
for (std::size_t i = 0; i < people.size(); ++i) {
core_t::TTime bucketStart{startTime};
for (std::size_t j = 0; j < numberBuckets; ++j, bucketStart += bucketLength) {
const TStrSizePrVec& attributeEventCounts = eventCounts[i][j];
for (auto& attributeEventCount : attributeEventCounts) {
if (attributeEventCount.second == 0) {
continue;
}
std::size_t n{attributeEventCount.second};
TDoubleVec samples;
rng.generateUniformSamples(0.0, 8.0, n, samples);
core_t::TTime time{bucketStart};
core_t::TTime dt{bucketLength / static_cast<core_t::TTime>(n)};
for (std::size_t l = 0; l < n; ++l, time += dt) {
messages.emplace_back(time, people[i], attributeEventCount.first,
TDouble1Vec{1, samples[l]});
}
}
}
}
std::sort(messages.begin(), messages.end());
TMessageVec expectedMessages;
expectedMessages.reserve(messages.size());
for (const auto& message : messages) {
if (std::binary_search(std::begin(expectedPeople),
std::end(expectedPeople), message.s_Person) &&
std::binary_search(std::begin(expectedAttributes),
std::end(expectedAttributes), message.s_Attribute)) {
expectedMessages.push_back(message);
}
}
core_t::TTime bucketStart{startTime};
for (const auto& message : messages) {
if (message.s_Time >= bucketStart + bucketLength) {
m_Model->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
bucketStart += bucketLength;
}
this->addArrival(message, m_Gatherer);
}
m_Model->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
size_t maxDimensionBeforePrune(m_Model->dataGatherer().maxDimension());
m_Model->prune();
size_t maxDimensionAfterPrune(m_Model->dataGatherer().maxDimension());
BOOST_REQUIRE_EQUAL(maxDimensionBeforePrune, maxDimensionAfterPrune);
bucketStart = startTime;
for (const auto& expectedMessage : expectedMessages) {
if (expectedMessage.s_Time >= bucketStart + bucketLength) {
expectedModel->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
bucketStart += bucketLength;
}
this->addArrival(expectedMessage, expectedGatherer);
}
expectedModel->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
LOG_DEBUG(<< "checksum = " << m_Model->checksum());
LOG_DEBUG(<< "expected checksum = " << expectedModel->checksum());
BOOST_REQUIRE_EQUAL(expectedModel->checksum(), m_Model->checksum());
// Now check that we recycle the person and attribute slots.
bucketStart = m_Gatherer->currentBucketStartTime() + bucketLength;
TMessageVec newMessages{
{bucketStart + 10, "p1", TOptionalStr{"c2"}, TDouble1Vec(1, 20.0)},
{bucketStart + 200, "p5", TOptionalStr{"c6"}, TDouble1Vec(1, 10.0)},
{bucketStart + 2100, "p5", TOptionalStr{"c6"}, TDouble1Vec(1, 15.0)}};
for (auto& newMessage : newMessages) {
this->addArrival(newMessage, m_Gatherer);
this->addArrival(newMessage, expectedGatherer);
}
m_Model->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
expectedModel->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
LOG_DEBUG(<< "checksum = " << m_Model->checksum());
LOG_DEBUG(<< "expected checksum = " << expectedModel->checksum());
BOOST_REQUIRE_EQUAL(expectedModel->checksum(), m_Model->checksum());
// Test that calling prune on a cloned model which has seen no new data does nothing
CAnomalyDetectorModel::TModelPtr clonedModelHolder(m_Model->cloneForPersistence());
std::size_t numberOfPeopleBeforePrune(
clonedModelHolder->dataGatherer().numberActivePeople());
BOOST_TEST_REQUIRE(numberOfPeopleBeforePrune > 0);
clonedModelHolder->prune(clonedModelHolder->defaultPruneWindow());
BOOST_REQUIRE_EQUAL(numberOfPeopleBeforePrune,
clonedModelHolder->dataGatherer().numberActivePeople());
}
BOOST_FIXTURE_TEST_CASE(testKey, CTestFixture) {
function_t::TFunctionVec countFunctions{
function_t::E_PopulationMetric, function_t::E_PopulationMetricMean,
function_t::E_PopulationMetricMin, function_t::E_PopulationMetricMax,
function_t::E_PopulationMetricSum};
std::string fieldName{"value"};
std::string overFieldName{"over"};
generateAndCompareKey(countFunctions, fieldName, overFieldName,
[](CSearchKey expectedKey, CSearchKey actualKey) {
BOOST_TEST_REQUIRE(expectedKey == actualKey);
});
}
BOOST_FIXTURE_TEST_CASE(testFrequency, CTestFixture) {
// Test we correctly compute frequencies for people and attributes.
struct SDatum {
std::string s_Attribute;
std::string s_Person;
std::size_t s_Period{0};
};
using TDataVec = std::vector<SDatum>;
TDataVec data{{"a1", "p1", 1}, {"a2", "p2", 1}, {"a3", "p3", 10},
{"a4", "p4", 3}, {"a5", "p5", 4}, {"a6", "p6", 5},
{"a7", "p7", 2}, {"a8", "p8", 1}, {"a9", "p9", 3},
{"a10", "p10", 7}};
const core_t::TTime bucketLength{600};
core_t::TTime startTime{0};
TMessageVec messages;
std::size_t bucket{0};
for (core_t::TTime bucketStart = startTime; bucketStart < 100 * bucketLength;
bucketStart += bucketLength, ++bucket) {
std::size_t i{0};
for (auto& datum : data) {
if (bucket % datum.s_Period == 0) {
for (std::size_t j = 0; j < i + 1; ++j) {
messages.emplace_back(bucketStart + bucketLength / 2, datum.s_Person,
data[j].s_Attribute, TDouble1Vec{1, 0.0});
}
}
++i;
}
}
std::sort(messages.begin(), messages.end());
LOG_DEBUG(<< "# messages = " << messages.size());
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
this->makeModel(params, {model_t::E_PopulationMeanByPersonAndAttribute}, startTime);
core_t::TTime time{startTime};
for (const auto& message : messages) {
if (message.s_Time >= time + bucketLength) {
m_Model->sample(time, time + bucketLength, m_ResourceMonitor);
time += bucketLength;
}
this->addArrival(message, m_Gatherer);
}
{
TMeanAccumulator meanError;
for (auto& datum : data) {
LOG_DEBUG(<< "*** person = " << datum.s_Person << " ***");
std::size_t pid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(datum.s_Person, pid));
LOG_DEBUG(<< "frequency = " << m_Model->personFrequency(pid));
LOG_DEBUG(<< "expected frequency = "
<< 1.0 / static_cast<double>(datum.s_Period));
BOOST_REQUIRE_CLOSE_ABSOLUTE(1.0 / static_cast<double>(datum.s_Period),
m_Model->personFrequency(pid),
0.1 / static_cast<double>(datum.s_Period));
meanError.add(std::fabs(m_Model->personFrequency(pid) -
1.0 / static_cast<double>(datum.s_Period)));
}
LOG_DEBUG(<< "error = " << maths::common::CBasicStatistics::mean(meanError));
BOOST_TEST_REQUIRE(maths::common::CBasicStatistics::mean(meanError) < 0.002);
}
{
std::size_t i{0};
for (auto& datum : data) {
LOG_DEBUG(<< "*** attributes = " << datum.s_Attribute << " ***");
std::size_t cid;
BOOST_TEST_REQUIRE(m_Gatherer->attributeId(datum.s_Attribute, cid));
LOG_DEBUG(<< "frequency = " << m_Model->attributeFrequency(cid));
LOG_DEBUG(<< "expected frequency = " << (10.0 - static_cast<double>(i)) / 10.0);
BOOST_REQUIRE_EQUAL((10.0 - static_cast<double>(i)) / 10.0,
m_Model->attributeFrequency(cid));
++i;
}
}
}
BOOST_FIXTURE_TEST_CASE(testSampleRateWeight, CTestFixture) {
// Test that we correctly compensate for heavy hitters.
// There are 10 attributes.
//
// People p1 and p5 generate messages for every attribute every bucket.
// The remaining 18 people only generate one message per bucket, i.e.
// one message per attribute per 10 buckets.
const core_t::TTime bucketLength{600};
const TStrVec attributes{"a1", "a2", "a3", "a4", "a5",
"a6", "a7", "a8", "a9", "a10"};
const TStrVec people{"p1", "p2", "p3", "p4", "p5", "p6", "p7",
"p8", "p9", "p10", "p11", "p12", "p13", "p14",
"p15", "p16", "p17", "p18", "p19", "p20"};
TSizeVec heavyHitters{0, 4};
TSizeVec normal{1, 2, 3, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19};
std::size_t messagesPerBucket = heavyHitters.size() * attributes.size() +
normal.size();
test::CRandomNumbers rng;
core_t::TTime startTime{0};
TMessageVec messages;
for (core_t::TTime bucketStart = startTime;
bucketStart < 100 * bucketLength; bucketStart += bucketLength) {
TSizeVec times;
rng.generateUniformSamples(static_cast<std::size_t>(bucketStart),
static_cast<std::size_t>(bucketStart + bucketLength),
messagesPerBucket, times);
std::size_t m{0};
for (auto& attribute : attributes) {
for (auto& heavyHitter : heavyHitters) {
messages.emplace_back(static_cast<core_t::TTime>(times[m++]),
people[heavyHitter], attribute,
TDouble1Vec{1, 0.0});
}
}
TSizeVec attributeIndexes;
rng.generateUniformSamples(0, attributes.size(), normal.size(), attributeIndexes);
std::size_t i{0};
for (auto& norm : normal) {
messages.emplace_back(static_cast<core_t::TTime>(times[m++]),
people[norm], attributes[attributeIndexes[i++]],
TDouble1Vec{1, 0.0});
}
}
std::sort(messages.begin(), messages.end());
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
this->makeModel(params, {model_t::E_PopulationSumByBucketPersonAndAttribute}, startTime);
auto* populationModel = dynamic_cast<CMetricPopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(populationModel);
core_t::TTime time{startTime};
for (const auto& message : messages) {
if (message.s_Time >= time + bucketLength) {
populationModel->sample(time, time + bucketLength, m_ResourceMonitor);
time += bucketLength;
}
this->addArrival(message, m_Gatherer);
}
// The heavy hitters generate one value per attribute per bucket.
// The rest generate one value per bucket. Therefore, we expect
// the mean person rate per bucket to be:
// ( ("# people" - "# heavy hitters" / "# attributes")
// + ("# heavy hitters"))
// / "# people"
double expectedRateWeight = (static_cast<double>(normal.size()) /
static_cast<double>(attributes.size()) +
static_cast<double>(heavyHitters.size())) /
static_cast<double>(people.size());
LOG_DEBUG(<< "expectedRateWeight = " << expectedRateWeight);
for (auto& heavyHitter : heavyHitters) {
LOG_DEBUG(<< "*** person = " << people[heavyHitter] << " ***");
std::size_t pid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(people[heavyHitter], pid));
for (std::size_t cid = 0; cid < attributes.size(); ++cid) {
double sampleRateWeight = populationModel->sampleRateWeight(pid, cid);
LOG_DEBUG(<< "attribute = " << populationModel->attributeName(cid)
<< ", sampleRateWeight = " << sampleRateWeight);
BOOST_REQUIRE_CLOSE_ABSOLUTE(expectedRateWeight, sampleRateWeight,
0.15 * expectedRateWeight);
}
}
for (auto& norm : normal) {
LOG_DEBUG(<< "*** person = " << people[norm] << " ***");
std::size_t pid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(people[norm], pid));
for (std::size_t cid = 0; cid < attributes.size(); ++cid) {
double sampleRateWeight = populationModel->sampleRateWeight(pid, cid);
LOG_DEBUG(<< "attribute = " << populationModel->attributeName(cid)
<< ", sampleRateWeight = " << sampleRateWeight);
BOOST_REQUIRE_EQUAL(1.0, sampleRateWeight);
}
}
}
BOOST_FIXTURE_TEST_CASE(testPeriodicity, CTestFixture) {
// Create a daily periodic population and check that the
// periodicity is learned and compensated (approximately).
using TStrDoubleMap = std::map<std::string, double>;
static const core_t::TTime HOUR{3600};
static const core_t::TTime DAY{86400};
const core_t::TTime bucketLength{3600};
TDoubleVec baseline{1, 1, 2, 2, 3, 5, 6, 6, 20, 21, 4, 3,
4, 4, 8, 25, 7, 6, 5, 1, 1, 4, 1, 1};
TDoubleStrPrVec attribs{{2.0, "a1"}, {3.0, "a2"}};
const TStrVec people{"p1", "p2", "p3", "p4", "p5",
"p6", "p7", "p8", "p9", "p10"};
test::CRandomNumbers rng;
core_t::TTime startTime{0};
core_t::TTime endTime{604800};
TMessageVec messages;
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
for (const auto& attrib : attribs) {
TDoubleVec values;
rng.generateNormalSamples(baseline[(time % DAY) / HOUR],
attrib.first * attrib.first, people.size(), values);
std::size_t j{0};
for (const auto& value : values) {
for (unsigned int t = 0; t < 4; ++t) {
messages.emplace_back(time + (t * bucketLength) / 4, people[j],
attrib.second, TDouble1Vec(1, value));
}
++j;
}
}
}
std::sort(messages.begin(), messages.end());
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
this->makeModel(params, {model_t::E_PopulationMeanByPersonAndAttribute}, startTime);
TStrDoubleMap personProbabilitiesWithoutPeriodicity;
TStrDoubleMap personProbabilitiesWithPeriodicity;
core_t::TTime time = startTime;
for (const auto& message : messages) {
if (message.s_Time >= time + bucketLength) {
m_Model->sample(time, time + bucketLength, m_ResourceMonitor);
for (const auto& person : people) {
std::size_t pid;
if (!m_Gatherer->personId(person, pid)) {
continue;
}
CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING);
SAnnotatedProbability annotatedProbability;
if (m_Model->computeProbability(pid, time, time + bucketLength, partitioningFields,
1, annotatedProbability) == false) {
continue;
}
if (time < startTime + 3 * DAY) {
double& minimumProbability = personProbabilitiesWithoutPeriodicity
.insert({person, 1.0})
.first->second;
minimumProbability = std::min(
minimumProbability, annotatedProbability.s_Probability);
} else if (time > startTime + 5 * DAY) {
double& minimumProbability =
personProbabilitiesWithPeriodicity.insert({person, 1.0})
.first->second;
minimumProbability = std::min(
minimumProbability, annotatedProbability.s_Probability);
}
}
time += bucketLength;
}
this->addArrival(message, m_Gatherer);
}
double totalw{0.0};
double totalwo{0.0};
for (const auto& person : people) {
auto wo = personProbabilitiesWithoutPeriodicity.find(person);
auto w = personProbabilitiesWithPeriodicity.find(person);
LOG_DEBUG(<< "person = " << person);
LOG_DEBUG(<< "minimum probability with periodicity = " << w->second);
LOG_DEBUG(<< "minimum probability without periodicity = " << wo->second);
totalwo += wo->second;
totalw += w->second;
}
LOG_DEBUG(<< "total minimum probability with periodicity = " << totalw);
LOG_DEBUG(<< "total minimum probability without periodicity = " << totalwo);
BOOST_TEST_REQUIRE(totalw > 3.0 * totalwo);
}
BOOST_FIXTURE_TEST_CASE(testPersistence, CTestFixture) {
core_t::TTime startTime{1367280000};
const core_t::TTime bucketLength{3600};
TMessageVec messages;
generateTestMessages(1, startTime, bucketLength, messages);
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
model_t::TFeatureVec features{model_t::E_PopulationMeanByPersonAndAttribute,
model_t::E_PopulationMinByPersonAndAttribute,
model_t::E_PopulationMaxByPersonAndAttribute};
this->makeModel(params, features, startTime);
auto* populationModel = dynamic_cast<CMetricPopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(populationModel);
for (auto& message : messages) {
if (message.s_Time >= startTime + bucketLength) {
m_Model->sample(startTime, startTime + bucketLength, m_ResourceMonitor);
startTime += bucketLength;
}
this->addArrival(message, m_Gatherer);
}
std::ostringstream origJson;
core::CJsonStatePersistInserter::persist(
origJson, [& m_Model = m_Model](core::CJsonStatePersistInserter & inserter) {
m_Model->acceptPersistInserter(inserter);
});
LOG_TRACE(<< "origJson = " << origJson.str());
// Restore the JSON into a new data gatherer
std::istringstream origJsonStrm{"{\"topLevel\" : " + origJson.str() + "}"};
core::CJsonStateRestoreTraverser traverser(origJsonStrm);
CAnomalyDetectorModel::TModelPtr restoredModel(
m_Factory->makeModel({m_Gatherer}, traverser));
populationModel = dynamic_cast<CMetricPopulationModel*>(restoredModel.get());
BOOST_TEST_REQUIRE(populationModel);
// The JSON representation of the new data gatherer should be the same as the
// original
std::ostringstream newJson;
core::CJsonStatePersistInserter::persist(
newJson, [&restoredModel](core::CJsonStatePersistInserter& inserter) {
restoredModel->acceptPersistInserter(inserter);
});
LOG_DEBUG(<< "original checksum = " << m_Model->checksum(false));
LOG_DEBUG(<< "restored checksum = " << restoredModel->checksum(false));
BOOST_REQUIRE_EQUAL(m_Model->checksum(false), restoredModel->checksum(false));
BOOST_REQUIRE_EQUAL(origJson.str(), newJson.str());
}
BOOST_FIXTURE_TEST_CASE(testIgnoreSamplingGivenDetectionRules, CTestFixture) {
// Create 2 models, one of which has a skip sampling rule.
// The skip sampling rule doesn't cause the samples to be completely ignored,
// instead it applies a small multiplicative weighting when the rule applies.
// Feed the same data into both models including the case when the rule will apply
// for one model but not the other.
core_t::TTime startTime{0};
const core_t::TTime bucketLength{300};
core_t::TTime endTime = startTime + bucketLength * 100;
// Create a categorical rule to reduce the weight applied to samples for attribute c4
std::string filterJson("[\"c4\"]");
core::CPatternSet valueFilter;
valueFilter.initFromJson(filterJson);
CDetectionRule rule;
rule.action(CDetectionRule::E_SkipModelUpdate);
rule.includeScope("byFieldName", valueFilter);
model_t::TFeatureVec features{model_t::E_PopulationMeanByPersonAndAttribute};
CModelFactory::SGathererInitializationData gathererInitData(startTime);
SModelParams paramsNoRules(bucketLength);
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
CMetricPopulationModelFactory factoryNoSkip(paramsNoRules, interimBucketCorrector);
factoryNoSkip.features(features);
factoryNoSkip.fieldNames("partitionFieldName", "", "byFieldName", "", {});
CModelFactory::TDataGathererPtr gathererNoSkip(
factoryNoSkip.makeDataGatherer(gathererInitData));
CModelFactory::SModelInitializationData modelNoSkipInitData(gathererNoSkip);
CAnomalyDetectorModel::TModelPtr modelNoSkip(factoryNoSkip.makeModel(modelNoSkipInitData));
SModelParams paramsWithRules(bucketLength);
SModelParams::TDetectionRuleVec rules{rule};
paramsWithRules.s_DetectionRules = SModelParams::TDetectionRuleVecCRef(rules);
auto interimBucketCorrectorWithRules =
std::make_shared<CInterimBucketCorrector>(bucketLength);
CMetricPopulationModelFactory factoryWithSkip(paramsWithRules, interimBucketCorrectorWithRules);
factoryWithSkip.features(features);
factoryWithSkip.fieldNames("partitionFieldName", "", "byFieldName", "", {});
CModelFactory::TDataGathererPtr gathererWithSkip(
factoryWithSkip.makeDataGatherer(gathererInitData));
CModelFactory::SModelInitializationData modelWithSkipInitData(gathererWithSkip);
CAnomalyDetectorModel::TModelPtr modelWithSkip(
factoryWithSkip.makeModel(modelWithSkipInitData));
// Use the existing test function to generate a set of messages sufficiently complex
// that we know some will cause samples to be added to the models.
TMessageVec messages;
generateTestMessages(1, startTime, bucketLength, messages);
using TDataGathererPtrModelPtrPr =
std::pair<CModelFactory::TDataGathererPtr, CAnomalyDetectorModel::TModelPtr&>;
std::vector<TDataGathererPtrModelPtrPr> configs{
TDataGathererPtrModelPtrPr{gathererNoSkip, modelNoSkip},
TDataGathererPtrModelPtrPr{gathererWithSkip, modelWithSkip}};
// Run the same data through both models, ignoring messages with the c4 attribute
// so the skip sampling rule won't apply
for (auto& config : configs) {
core_t::TTime start{startTime};
for (auto& message : messages) {
if (*message.s_Attribute == "c4") {
continue;
}
if (message.s_Time >= start + bucketLength) {
config.second->sample(start, start + bucketLength, m_ResourceMonitor);
start += bucketLength;
}
this->addArrival(message, config.first);
}
}
// The checksums should match
BOOST_REQUIRE_EQUAL(modelWithSkip->checksum(), modelNoSkip->checksum());
CAnomalyDetectorModel::TModelDetailsViewUPtr modelWithSkipView =
modelWithSkip->details();
CAnomalyDetectorModel::TModelDetailsViewUPtr modelNoSkipView = modelNoSkip->details();
const maths::common::CModel* mathsModelWithSkipView = nullptr;
const maths::common::CModel* mathsModelNoSkipView = nullptr;
// expect models for attributes c0 - c3...
for (std::size_t i = 0; i < 4; ++i) {
mathsModelWithSkipView = modelWithSkipView->model(
model_t::E_PopulationMeanByPersonAndAttribute, i);
BOOST_TEST_REQUIRE(mathsModelWithSkipView != nullptr);
mathsModelNoSkipView =
modelNoSkipView->model(model_t::E_PopulationMeanByPersonAndAttribute, i);
BOOST_TEST_REQUIRE(mathsModelNoSkipView != nullptr);
}
// ...But not for c4
mathsModelWithSkipView =
modelWithSkipView->model(model_t::E_PopulationMeanByPersonAndAttribute, 4);
BOOST_REQUIRE_EQUAL(mathsModelWithSkipView, nullptr);
mathsModelNoSkipView =
modelNoSkipView->model(model_t::E_PopulationMeanByPersonAndAttribute, 4);
BOOST_REQUIRE_EQUAL(mathsModelNoSkipView, nullptr);
messages.clear();
startTime = endTime;
generateTestMessages(1, startTime, bucketLength, messages);
// Now add messages with the c4 attribute only to both models.
// The model with the skip sampling rule will have a small weighting applied
// to the samples.
for (auto& config : configs) {
core_t::TTime start{startTime};
for (auto& message : messages) {
if (*message.s_Attribute != "c4") {
continue;
}
if (message.s_Time >= start + bucketLength) {
config.second->sample(start, start + bucketLength, m_ResourceMonitor);
start += bucketLength;
}
this->addArrival(message, config.first);
}
}
// This time we expect the model checksums to differ because of the
// different weightings applied to the samples for attribute c4
std::uint64_t withSkipChecksum = modelWithSkip->checksum();
std::uint64_t noSkipChecksum = modelNoSkip->checksum();
BOOST_TEST_REQUIRE(withSkipChecksum != noSkipChecksum);
// expect models for attributes c0 - c4
for (std::size_t i = 0; i < 5; ++i) {
mathsModelWithSkipView = modelWithSkipView->model(
model_t::E_PopulationMeanByPersonAndAttribute, i);
BOOST_TEST_REQUIRE(mathsModelWithSkipView != nullptr);
mathsModelNoSkipView =
modelNoSkipView->model(model_t::E_PopulationMeanByPersonAndAttribute, i);
BOOST_TEST_REQUIRE(mathsModelNoSkipView != nullptr);
}
// The underlying models for attributes c0 - c3 are the same
for (std::size_t i = 0; i < 4; ++i) {
mathsModelWithSkipView = modelWithSkipView->model(
model_t::E_PopulationMeanByPersonAndAttribute, i);
withSkipChecksum = mathsModelWithSkipView->checksum();
mathsModelNoSkipView =
modelNoSkipView->model(model_t::E_PopulationMeanByPersonAndAttribute, i);
noSkipChecksum = mathsModelNoSkipView->checksum();
BOOST_REQUIRE_EQUAL(withSkipChecksum, noSkipChecksum);
}
// While the underlying models for attribute c4 differ due to the different weighting applied to the samples
mathsModelWithSkipView =
modelWithSkipView->model(model_t::E_PopulationMeanByPersonAndAttribute, 4);
BOOST_TEST_REQUIRE(mathsModelWithSkipView != nullptr);
withSkipChecksum = mathsModelWithSkipView->checksum();
mathsModelNoSkipView =
modelNoSkipView->model(model_t::E_PopulationMeanByPersonAndAttribute, 4);
BOOST_TEST_REQUIRE(mathsModelNoSkipView != nullptr);
noSkipChecksum = mathsModelNoSkipView->checksum();
BOOST_TEST_REQUIRE(withSkipChecksum != noSkipChecksum);
// Check the last value times are the same for each of the underlying models with the skip rule
// and the corresponding model with no skip rule
for (std::size_t i = 0; i < 5; ++i) {
const auto* timeSeriesModel =
dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelWithSkipView->model(model_t::E_PopulationMeanByPersonAndAttribute, i));
BOOST_TEST_REQUIRE(timeSeriesModel != nullptr);
const auto* trendModel =
dynamic_cast<const maths::time_series::CTimeSeriesDecomposition*>(
&timeSeriesModel->trendModel());
BOOST_TEST_REQUIRE(trendModel != nullptr);
core_t::TTime modelWithSkipTime = trendModel->lastValueTime();
timeSeriesModel = dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelNoSkipView->model(model_t::E_PopulationMeanByPersonAndAttribute, i));
BOOST_TEST_REQUIRE(timeSeriesModel != nullptr);
trendModel = dynamic_cast<const maths::time_series::CTimeSeriesDecomposition*>(
&timeSeriesModel->trendModel());
BOOST_TEST_REQUIRE(trendModel != nullptr);
core_t::TTime modelNoSkipTime = trendModel->lastValueTime();
BOOST_REQUIRE_EQUAL(modelWithSkipTime, modelNoSkipTime);
}
}
BOOST_AUTO_TEST_SUITE_END()