lib/model/unittest/CEventRatePopulationModelTest.cc (1,068 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <core/CJsonStatePersistInserter.h>
#include <core/CJsonStateRestoreTraverser.h>
#include <core/CLogger.h>
#include <core/CPatternSet.h>
#include <core/CSmallVector.h>
#include <maths/common/CModelWeight.h>
#include <maths/common/COrderings.h>
#include <maths/common/COrderingsSimultaneousSort.h>
#include <maths/time_series/CTimeSeriesDecomposition.h>
#include <model/CAnnotatedProbabilityBuilder.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CCountingModel.h>
#include <model/CDataGatherer.h>
#include <model/CDetectionRule.h>
#include <model/CEventData.h>
#include <model/CEventRatePopulationModel.h>
#include <model/CEventRatePopulationModelFactory.h>
#include <model/CFeatureData.h>
#include <model/CInterimBucketCorrector.h>
#include <model/CModelDetailsView.h>
#include <model/CPartitioningFields.h>
#include <model/CResourceMonitor.h>
#include <test/BoostTestCloseAbsolute.h>
#include <test/BoostTestPointerOutput.h>
#include <test/CRandomNumbers.h>
#include "CModelTestFixtureBase.h"
#include <boost/test/unit_test.hpp>
#include <boost/unordered_map.hpp>
#include <algorithm>
#include <cstddef>
#include <map>
#include <set>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
BOOST_AUTO_TEST_SUITE(CEventRatePopulationModelTest)
using namespace ml;
using namespace model;
namespace {
const CModelTestFixtureBase::TSizeDoublePr1Vec NO_CORRELATES;
}
class CTestFixture : public CModelTestFixtureBase {
public:
void generateTestMessages(core_t::TTime startTime,
core_t::TTime bucketLength,
TMessageVec& messages) {
// The test case is as follows:
//
// attribute | 0 | 1 | 2 | 3 | 4
// --------------+---------+---------+---------+---------+--------
// people | [0-19] | [0-2], | [0,2], | 3,4 | 3
// | | [5-19] | [4,19] | |
// --------------+---------+---------+---------+---------+--------
// rate | 10 | 0.02 | 15 | 2 | 1
// --------------+---------+---------+---------+---------+--------
// rate anomaly | 1,11 | - | 4,5 | n/a | n/a
// people | | | | |
//
// There are 100 buckets.
using TStrVec = std::vector<std::string>;
using TSizeSizeSizeTr = std::tuple<std::size_t, std::size_t, size_t>;
const std::size_t numberBuckets = 100;
const std::size_t numberAttributes = 5;
const std::size_t numberPeople = 20;
TStrVec attributes;
for (std::size_t i = 0; i < numberAttributes; ++i) {
attributes.push_back("c" + std::to_string(i));
}
TStrVec people;
for (std::size_t i = 0; i < numberPeople; ++i) {
people.push_back("p" + std::to_string(i));
}
TSizeVecVec attributePeople{
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12, 13, 14, 15, 16, 17, 18, 19},
{0, 1, 2, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19},
{0, 1, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19},
{3, 4},
{3}};
TDoubleVec attributeRates{10.0, 0.02, 15.0, 2.0, 1.0};
TSizeSizeSizeTr anomaliesAttributePerson[]{{10, 0, 1}, {15, 0, 11},
{30, 2, 4}, {35, 2, 5},
{50, 0, 11}, {75, 2, 5}};
test::CRandomNumbers rng;
for (std::size_t i = 0; i < numberBuckets; ++i, startTime += bucketLength) {
for (std::size_t j = 0; j < numberAttributes; ++j) {
TUIntVec samples;
rng.generatePoissonSamples(attributeRates[j],
attributePeople[j].size(), samples);
for (std::size_t k = 0; k < samples.size(); ++k) {
unsigned int n = samples[k];
if (std::binary_search(std::begin(anomaliesAttributePerson),
std::end(anomaliesAttributePerson),
TSizeSizeSizeTr(i, j, attributePeople[j][k]))) {
n += static_cast<unsigned int>(2.5 * attributeRates[j]);
LOG_DEBUG(<< i << " " << attributes[j]
<< " generating anomaly " << n);
}
TDoubleVec times;
rng.generateUniformSamples(
0.0, static_cast<double>(bucketLength - 1), n, times);
for (std::size_t l = 0; l < times.size(); ++l) {
core_t::TTime time = startTime +
static_cast<core_t::TTime>(times[l]);
messages.emplace_back(time, people[attributePeople[j][k]],
attributes[j]);
}
}
}
}
std::sort(messages.begin(), messages.end());
}
void makeModel(const SModelParams& params,
const model_t::TFeatureVec& features,
core_t::TTime startTime) {
this->makeModelT<CEventRatePopulationModelFactory>(
params, features, startTime, model_t::E_EventRateOnline, m_Gatherer, m_Model);
}
};
BOOST_FIXTURE_TEST_CASE(testBasicAccessors, CTestFixture) {
// Check that the correct data is read retrieved by the
// basic model accessors.
using TSizeUInt64Map = std::map<std::size_t, std::uint64_t>;
core_t::TTime startTime = 1367280000;
const core_t::TTime bucketLength = 3600;
TMessageVec messages;
generateTestMessages(startTime, bucketLength, messages);
LOG_DEBUG(<< "# messages = " << messages.size());
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
this->makeModel(params, {model_t::E_PopulationCountByBucketPersonAndAttribute}, startTime);
CEventRatePopulationModel* model =
dynamic_cast<CEventRatePopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(model);
TSizeUInt64Map expectedBucketPersonCounts;
TSizeSizePrUInt64Map expectedBucketPersonAttributeCounts;
for (const auto& message : messages) {
if (message.s_Time >= startTime + bucketLength) {
model->sample(startTime, startTime + bucketLength, m_ResourceMonitor);
LOG_DEBUG(<< "Testing bucket = [" << startTime << ","
<< startTime + bucketLength << ")");
// Test the person and attribute invariants.
for (std::size_t j = 0; j < m_Gatherer->numberActivePeople(); ++j) {
const std::string& name = model->personName(j);
std::size_t pid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(name, pid));
BOOST_REQUIRE_EQUAL(j, pid);
}
for (std::size_t j = 0; j < m_Gatherer->numberActiveAttributes(); ++j) {
const std::string& name = model->attributeName(j);
std::size_t cid;
BOOST_TEST_REQUIRE(m_Gatherer->attributeId(name, cid));
BOOST_REQUIRE_EQUAL(j, cid);
}
TSizeVec expectedCurrentBucketPersonIds;
// Test the person counts.
for (const auto& expectedCount : expectedBucketPersonCounts) {
std::size_t pid = expectedCount.first;
expectedCurrentBucketPersonIds.push_back(pid);
auto count = model->currentBucketCount(pid, startTime);
BOOST_TEST_REQUIRE(count.has_value());
BOOST_REQUIRE_EQUAL(expectedCount.second, *count);
}
// Test the person attribute counts.
for (const auto& expectedCount : expectedBucketPersonAttributeCounts) {
std::size_t pid = expectedCount.first.first;
std::size_t cid = expectedCount.first.second;
TDouble1Vec count = model->currentBucketValue(
model_t::E_PopulationCountByBucketPersonAndAttribute, pid, cid, startTime);
BOOST_TEST_REQUIRE(!count.empty());
BOOST_REQUIRE_EQUAL(static_cast<double>(expectedCount.second), count[0]);
}
// Test the current bucket people.
std::sort(expectedCurrentBucketPersonIds.begin(),
expectedCurrentBucketPersonIds.end());
TSizeVec bucketPersonIds;
model->currentBucketPersonIds(startTime, bucketPersonIds);
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedCurrentBucketPersonIds),
core::CContainerPrinter::print(bucketPersonIds));
expectedBucketPersonCounts.clear();
expectedBucketPersonAttributeCounts.clear();
startTime += bucketLength;
}
this->addArrival(message, m_Gatherer);
std::size_t pid;
std::size_t cid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(message.s_Person, pid));
BOOST_TEST_REQUIRE(m_Gatherer->attributeId(*message.s_Attribute, cid));
++expectedBucketPersonCounts[pid];
++expectedBucketPersonAttributeCounts[{pid, cid}];
}
}
BOOST_FIXTURE_TEST_CASE(testFeatures, CTestFixture) {
// We check that the correct data is read from the gatherer
// into the model on sample.
using TSizeSet = std::set<std::size_t>;
using TSizeSizeSetMap = std::map<std::size_t, TSizeSet>;
using TSizeTimeUMap = boost::unordered_map<std::size_t, core_t::TTime>;
using TFeatureData = SEventRateFeatureData;
using TSizeSizePrFeatureDataPr = CEventRatePopulationModel::TSizeSizePrFeatureDataPr;
using TSizeSizePrFeatureDataPrVec = std::vector<TSizeSizePrFeatureDataPr>;
using TDouble2Vec = core::CSmallVector<double, 2>;
using TDouble2VecVec = std::vector<TDouble2Vec>;
using TDouble2VecWeightsAryVec = std::vector<maths_t::TDouble2VecWeightsAry>;
using TMathsModelPtr = std::shared_ptr<maths::common::CModel>;
using TSizeMathsModelPtrMap = std::map<std::size_t, TMathsModelPtr>;
// Manages de-duplication of values.
class CUniqueValues {
public:
void add(double value,
const maths_t::TDouble2VecWeightsAry& trendWeight,
const maths_t::TDouble2VecWeightsAry& residualWeight) {
std::size_t duplicate =
m_Uniques.emplace(value, m_Uniques.size()).first->second;
if (duplicate < m_Values.size()) {
maths_t::addCount(maths_t::count(trendWeight), m_TrendWeights[duplicate]);
maths_t::addCount(maths_t::count(residualWeight),
m_ResidualWeights[duplicate]);
} else {
m_Values.push_back({value});
m_TrendWeights.push_back(trendWeight);
m_ResidualWeights.push_back(residualWeight);
}
}
TDouble2VecVec& values() { return m_Values; }
TDouble2VecWeightsAryVec& trendWeights() { return m_TrendWeights; }
TDouble2VecWeightsAryVec& residualWeights() {
return m_ResidualWeights;
}
private:
using TDoubleSizeUMap = boost::unordered_map<double, std::size_t>;
private:
TDoubleSizeUMap m_Uniques;
TDouble2VecVec m_Values;
TDouble2VecWeightsAryVec m_TrendWeights;
TDouble2VecWeightsAryVec m_ResidualWeights;
};
using TSizeUniqueValuesUMap = boost::unordered_map<std::size_t, CUniqueValues>;
core_t::TTime startTime = 1367280000;
const core_t::TTime bucketLength = 3600;
TMessageVec messages;
generateTestMessages(startTime, bucketLength, messages);
LOG_DEBUG(<< "# messages = " << messages.size());
// Bucket non-zero count unique person count.
SModelParams params(bucketLength);
params.s_InitialDecayRateMultiplier = 1.0;
model_t::TFeatureVec features{model_t::E_PopulationCountByBucketPersonAndAttribute,
model_t::E_PopulationUniquePersonCountByAttribute};
this->makeModel(params, features, startTime);
auto* model = dynamic_cast<CEventRatePopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(model);
model::CModelFactory::TFeatureMathsModelPtrPrVec models{
m_Factory->defaultFeatureModels(features, bucketLength, 1.0, false)};
BOOST_REQUIRE_EQUAL(1, models.size());
BOOST_REQUIRE_EQUAL(model_t::E_PopulationCountByBucketPersonAndAttribute,
models[0].first);
TSizeTimeUMap attributeFirstValueTimes;
std::size_t numberAttributes = 0;
std::size_t numberPeople = 0;
TSizeSizeSetMap attributePeople;
TSizeSizePrUInt64Map expectedCounts;
TSizeMathsModelPtrMap expectedPopulationModels;
for (const auto& message : messages) {
if (message.s_Time >= startTime + bucketLength) {
model->sample(startTime, startTime + bucketLength, m_ResourceMonitor);
for (const auto& expectedCount : expectedCounts) {
std::size_t pid = expectedCount.first.first;
std::size_t cid = expectedCount.first.second;
attributeFirstValueTimes.emplace(cid, startTime);
numberAttributes = std::max(numberAttributes, cid + 1);
numberPeople = std::max(numberPeople, pid + 1);
attributePeople[cid].insert(pid);
}
TSizeUniqueValuesUMap expectedValuesAndWeights;
for (const auto& expectedCount : expectedCounts) {
std::size_t pid = expectedCount.first.first;
std::size_t cid = expectedCount.first.second;
core_t::TTime time = startTime + bucketLength / 2;
double count = model_t::offsetCountToZero(
model_t::E_PopulationCountByBucketPersonAndAttribute,
static_cast<double>(expectedCount.second));
TMathsModelPtr& attributeModel = expectedPopulationModels[cid];
if (attributeModel == nullptr) {
attributeModel.reset(models[0].second->clone(cid));
}
double countWeight{model->sampleRateWeight(pid, cid)};
maths_t::TDouble2VecWeightsAry trendWeight(
maths_t::CUnitWeights::unit<TDouble2Vec>(1));
maths_t::TDouble2VecWeightsAry residualWeight(
maths_t::CUnitWeights::unit<TDouble2Vec>(1));
attributeModel->countWeights(time, {count}, countWeight, countWeight,
1.0, 1.0, trendWeight, residualWeight);
expectedValuesAndWeights[cid].add(count, trendWeight, residualWeight);
}
for (auto& attributeExpectedValues : expectedValuesAndWeights) {
std::size_t cid = attributeExpectedValues.first;
TDouble2VecVec& values = attributeExpectedValues.second.values();
TDouble2VecWeightsAryVec& trendWeights =
attributeExpectedValues.second.trendWeights();
TDouble2VecWeightsAryVec& residualWeights =
attributeExpectedValues.second.residualWeights();
maths::common::COrderings::simultaneousSort(values, trendWeights, residualWeights);
maths::common::CModel::TTimeDouble2VecSizeTrVec samples;
for (const auto& sample : values) {
samples.emplace_back(startTime + bucketLength / 2, sample, 0);
}
maths::common::CModelAddSamplesParams params_;
params_.isInteger(true)
.isNonNegative(true)
.propagationInterval(1.0)
.trendWeights(trendWeights)
.priorWeights(residualWeights)
.bucketOccupancy(1.0)
.firstValueTime(attributeFirstValueTimes[cid]);
expectedPopulationModels[cid]->addSamples(params_, samples);
}
TSizeSizePrFeatureDataPrVec expectedPeoplePerAttribute;
expectedPeoplePerAttribute.reserve(numberAttributes);
for (std::size_t j = 0; j < numberAttributes; ++j) {
expectedPeoplePerAttribute.emplace_back(
std::make_pair(std::size_t(0), j), TFeatureData(j));
}
for (const auto& attribute : attributePeople) {
expectedPeoplePerAttribute[attribute.first].second =
attribute.second.size();
}
// Check the number of people per attribute.
const TSizeSizePrFeatureDataPrVec& peoplePerAttribute = model->featureData(
model_t::E_PopulationUniquePersonCountByAttribute, startTime);
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedPeoplePerAttribute),
core::CContainerPrinter::print(peoplePerAttribute));
// Check the non-zero (person, attribute) counts.
const TSizeSizePrFeatureDataPrVec& nonZeroCounts = model->featureData(
model_t::E_PopulationCountByBucketPersonAndAttribute, startTime);
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedCounts),
core::CContainerPrinter::print(nonZeroCounts));
for (std::size_t cid = 0; cid < numberAttributes; ++cid) {
const maths::common::CModel* populationModel = model->details()->model(
model_t::E_PopulationCountByBucketPersonAndAttribute, cid);
BOOST_TEST_REQUIRE(populationModel);
BOOST_REQUIRE_EQUAL(expectedPopulationModels[cid]->checksum(),
populationModel->checksum());
}
startTime += bucketLength;
expectedCounts.clear();
}
this->addArrival(message, m_Gatherer);
std::size_t pid;
std::size_t cid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(message.s_Person, pid));
BOOST_TEST_REQUIRE(m_Gatherer->attributeId(*message.s_Attribute, cid));
++expectedCounts[{pid, cid}];
}
}
BOOST_FIXTURE_TEST_CASE(testComputeProbability, CTestFixture) {
// Check that we get the probabilities we expect.
core_t::TTime startTime{1367280000};
const core_t::TTime bucketLength{3600};
TMessageVec messages;
generateTestMessages(startTime, bucketLength, messages);
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
model_t::TFeatureVec features{model_t::E_PopulationCountByBucketPersonAndAttribute,
model_t::E_PopulationUniquePersonCountByAttribute};
this->makeModel(params, features, startTime);
auto* model = dynamic_cast<CEventRatePopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(model);
TStrVec expectedAnomalies{"[10, p1, c0]", "[15, p11, c0]", "[30, p4, c2]",
"[35, p5, c2]", "[50, p11, c0]", "[75, p5, c2]"};
TAnomalyVec orderedAnomalies;
this->generateOrderedAnomalies(6, startTime, bucketLength, messages,
m_Gatherer, *model, orderedAnomalies);
BOOST_REQUIRE_EQUAL(expectedAnomalies.size(), orderedAnomalies.size());
for (std::size_t i = 0; i < orderedAnomalies.size(); ++i) {
BOOST_REQUIRE_EQUAL(expectedAnomalies[i], orderedAnomalies[i].print());
}
}
BOOST_FIXTURE_TEST_CASE(testPrune, CTestFixture) {
// This test has four people and five attributes. We expect
// person 2 and attributes 1, 2 and 5 to be deleted.
core_t::TTime startTime = 1367280000;
const core_t::TTime bucketLength = 3600;
const std::size_t numberBuckets = 1000;
TStrVec people{"p1", "p2", "p3", "p4"};
TStrVec attributes{"c1", "c2", "c3", "c4", "c5"};
TStrSizePrVecVecVec eventCounts{{}, {}, {}, {}};
{
TStrSizePrVec attributeCounts;
attributeCounts.emplace_back(attributes[0], 0);
attributeCounts.emplace_back(attributes[1], 0);
attributeCounts.emplace_back(attributes[2], 1);
attributeCounts.emplace_back(attributes[4], 0);
eventCounts[0].resize(numberBuckets, attributeCounts);
eventCounts[0][1][0].second = 2; // p1, bucket 2, c1
eventCounts[0][3][0].second = 4; // p1, bucket 3, c1
eventCounts[0][5][1].second = 4; // p1, bucket 5, c2
eventCounts[0][5][3].second = 3; // p1, bucket 5, c5
}
{
TStrSizePrVec attributeCounts;
attributeCounts.emplace_back(attributes[0], 0);
attributeCounts.emplace_back(attributes[4], 0);
eventCounts[1].resize(numberBuckets, attributeCounts);
eventCounts[1][0][0].second = 2; // p2, bucket 1, c1
eventCounts[1][2][0].second = 3; // p2, bucket 3, c1
eventCounts[1][0][1].second = 4; // p2, bucket 1, c5
eventCounts[1][3][1].second = 1; // p2, bucket 4, c5
}
{
TStrSizePrVec attributeCounts;
attributeCounts.emplace_back(attributes[2], 0);
attributeCounts.emplace_back(attributes[3], 2);
eventCounts[2].resize(numberBuckets, attributeCounts);
eventCounts[2][0][0].second = 1; // p3, bucket 1, c3
eventCounts[2][20][0].second = 4; // p3, bucket 21, c3
eventCounts[2][25][0].second = 6; // p3, bucket 26, c3
eventCounts[2][80][0].second = 3; // p3, bucket 81, c3
eventCounts[2][180][0].second = 7; // p3, bucket 181, c3
eventCounts[2][200][0].second = 9; // p3, bucket 201, c3
eventCounts[2][800][0].second = 2; // p3, bucket 801, c3
}
{
TStrSizePrVec attributeCounts;
attributeCounts.emplace_back(attributes[1], 0);
attributeCounts.emplace_back(attributes[3], 3);
eventCounts[3].resize(numberBuckets, attributeCounts);
eventCounts[3][0][0].second = 2; // p4, bucket 1, c2
eventCounts[3][15][0].second = 3; // p4, bucket 16, c2
eventCounts[3][26][0].second = 1; // p4, bucket 27, c2
eventCounts[3][70][0].second = 4; // p4, bucket 70, c2
}
const std::string expectedPeople[]{people[0], people[2], people[3]};
const std::string expectedAttributes[]{attributes[2], attributes[3]};
SModelParams params(bucketLength);
params.s_DecayRate = 0.01;
model_t::TFeatureVec features{model_t::E_PopulationCountByBucketPersonAndAttribute,
model_t::E_PopulationUniquePersonCountByAttribute};
this->makeModel(params, features, startTime);
auto* model = dynamic_cast<CEventRatePopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(model);
CModelFactory::TDataGathererPtr expectedGatherer(m_Factory->makeDataGatherer({startTime}));
CAnomalyDetectorModel::TModelPtr expectedModel(m_Factory->makeModel({expectedGatherer}));
BOOST_TEST_REQUIRE(expectedModel);
TMessageVec messages;
for (std::size_t i = 0; i < people.size(); ++i) {
core_t::TTime bucketStart = startTime;
for (std::size_t j = 0; j < numberBuckets; ++j, bucketStart += bucketLength) {
const TStrSizePrVec& attributeEventCounts = eventCounts[i][j];
for (std::size_t k = 0; k < attributeEventCounts.size(); ++k) {
if (attributeEventCounts[k].second == 0) {
continue;
}
std::size_t n = attributeEventCounts[k].second;
core_t::TTime time = bucketStart;
core_t::TTime dt = bucketLength / static_cast<core_t::TTime>(n);
for (std::size_t l = 0; l < n; ++l, time += dt) {
messages.emplace_back(time, people[i],
attributeEventCounts[k].first);
}
}
}
}
std::sort(messages.begin(), messages.end());
TMessageVec expectedMessages;
expectedMessages.reserve(messages.size());
for (const auto& message : messages) {
if (std::binary_search(std::begin(expectedPeople),
std::end(expectedPeople), message.s_Person) &&
std::binary_search(std::begin(expectedAttributes),
std::end(expectedAttributes), *message.s_Attribute)) {
expectedMessages.push_back(message);
}
}
core_t::TTime bucketStart = startTime;
for (const auto& message : messages) {
if (message.s_Time >= bucketStart + bucketLength) {
model->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
bucketStart += bucketLength;
}
this->addArrival(message, m_Gatherer);
}
model->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
size_t maxDimensionBeforePrune(model->dataGatherer().maxDimension());
model->prune(model->defaultPruneWindow());
size_t maxDimensionAfterPrune(model->dataGatherer().maxDimension());
BOOST_REQUIRE_EQUAL(maxDimensionBeforePrune, maxDimensionAfterPrune);
bucketStart = startTime;
for (const auto& expectedMessage : expectedMessages) {
if (expectedMessage.s_Time >= bucketStart + bucketLength) {
expectedModel->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
bucketStart += bucketLength;
}
this->addArrival(expectedMessage, expectedGatherer);
}
expectedModel->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
LOG_DEBUG(<< "checksum = " << model->checksum());
LOG_DEBUG(<< "expected checksum = " << expectedModel->checksum());
BOOST_REQUIRE_EQUAL(expectedModel->checksum(), model->checksum());
// Now check that we recycle the person and attribute slots.
bucketStart = m_Gatherer->currentBucketStartTime() + bucketLength;
TMessageVec newMessages{{bucketStart + 10, "p1", "c2"},
{bucketStart + 200, "p5", "c6"},
{bucketStart + 2100, "p5", "c6"}};
for (const auto& newMessage : newMessages) {
this->addArrival(newMessage, m_Gatherer);
this->addArrival(newMessage, expectedGatherer);
}
model->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
expectedModel->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor);
LOG_DEBUG(<< "checksum = " << model->checksum());
LOG_DEBUG(<< "expected checksum = " << expectedModel->checksum());
BOOST_REQUIRE_EQUAL(expectedModel->checksum(), model->checksum());
// Test that calling prune on a cloned model which has seen no new data does nothing
CAnomalyDetectorModel::TModelPtr clonedModelHolder(model->cloneForPersistence());
std::size_t numberOfPeopleBeforePrune(
clonedModelHolder->dataGatherer().numberActivePeople());
BOOST_TEST_REQUIRE(numberOfPeopleBeforePrune > 0);
clonedModelHolder->prune(clonedModelHolder->defaultPruneWindow());
BOOST_REQUIRE_EQUAL(numberOfPeopleBeforePrune,
clonedModelHolder->dataGatherer().numberActivePeople());
}
BOOST_FIXTURE_TEST_CASE(testKey, CTestFixture) {
function_t::TFunctionVec countFunctions{function_t::E_PopulationCount,
function_t::E_PopulationDistinctCount,
function_t::E_PopulationRare,
function_t::E_PopulationRareCount,
function_t::E_PopulationFreqRare,
function_t::E_PopulationFreqRareCount,
function_t::E_PopulationLowCounts,
function_t::E_PopulationHighCounts};
std::string fieldName;
std::string overFieldName{"over"};
generateAndCompareKey(countFunctions, fieldName, overFieldName,
[](CSearchKey expectedKey, CSearchKey actualKey) {
BOOST_TEST_REQUIRE(expectedKey == actualKey);
});
}
BOOST_FIXTURE_TEST_CASE(testFrequency, CTestFixture) {
// Test we correctly compute frequencies for people and attributes.
struct SDatum {
std::string s_Attribute;
std::string s_Person;
std::size_t s_Period{0};
};
using TDataVec = std::vector<SDatum>;
TDataVec data{{"a1", "p1", 1}, {"a2", "p2", 1}, {"a3", "p3", 10},
{"a4", "p4", 3}, {"a5", "p5", 4}, {"a6", "p6", 5},
{"a7", "p7", 2}, {"a8", "p8", 1}, {"a9", "p9", 3},
{"a10", "p10", 7}};
const core_t::TTime bucketLength{600};
core_t::TTime startTime{0};
TMessageVec messages;
std::size_t bucket{0};
for (core_t::TTime bucketStart = startTime; bucketStart < 100 * bucketLength;
bucketStart += bucketLength, ++bucket) {
std::size_t i{0};
for (auto& datum : data) {
if (bucket % datum.s_Period == 0) {
for (std::size_t j = 0; j < i + 1; ++j) {
messages.emplace_back(bucketStart + bucketLength / 2,
datum.s_Person, data[j].s_Attribute);
}
}
++i;
}
}
std::sort(messages.begin(), messages.end());
LOG_DEBUG(<< "# messages = " << messages.size());
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
model_t::TFeatureVec features{model_t::E_PopulationCountByBucketPersonAndAttribute,
model_t::E_PopulationUniquePersonCountByAttribute};
this->makeModel(params, features, startTime);
auto* populationModel = dynamic_cast<CEventRatePopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(populationModel);
core_t::TTime time{startTime};
for (const auto& message : messages) {
if (message.s_Time >= time + bucketLength) {
populationModel->sample(time, time + bucketLength, m_ResourceMonitor);
time += bucketLength;
}
this->addArrival(message, m_Gatherer);
}
{
TMeanAccumulator meanError;
for (auto& datum : data) {
LOG_DEBUG(<< "*** person = " << datum.s_Person << " ***");
std::size_t pid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(datum.s_Person, pid));
LOG_DEBUG(<< "frequency = " << populationModel->personFrequency(pid));
LOG_DEBUG(<< "expected frequency = "
<< 1.0 / static_cast<double>(datum.s_Period));
BOOST_REQUIRE_CLOSE_ABSOLUTE(1.0 / static_cast<double>(datum.s_Period),
populationModel->personFrequency(pid),
0.1 / static_cast<double>(datum.s_Period));
meanError.add(std::fabs(populationModel->personFrequency(pid) -
1.0 / static_cast<double>(datum.s_Period)));
}
LOG_DEBUG(<< "error = " << maths::common::CBasicStatistics::mean(meanError));
BOOST_TEST_REQUIRE(maths::common::CBasicStatistics::mean(meanError) < 0.002);
}
{
std::size_t i{0};
for (auto& datum : data) {
LOG_DEBUG(<< "*** attribute = " << datum.s_Attribute << " ***");
std::size_t cid;
BOOST_TEST_REQUIRE(m_Gatherer->attributeId(datum.s_Attribute, cid));
LOG_DEBUG(<< "frequency = " << populationModel->attributeFrequency(cid));
LOG_DEBUG(<< "expected frequency = " << (10.0 - static_cast<double>(i)) / 10.0);
BOOST_REQUIRE_EQUAL((10.0 - static_cast<double>(i)) / 10.0,
populationModel->attributeFrequency(cid));
++i;
}
}
}
BOOST_FIXTURE_TEST_CASE(testSampleRateWeight, CTestFixture) {
// Test that we correctly compensate for heavy hitters.
// There are 10 attributes.
//
// People p1 and p5 generate messages for every attribute every bucket.
// The remaining 18 people only generate one message per bucket, i.e.
// one message per attribute per 10 buckets.
const core_t::TTime bucketLength = 600;
const TStrVec attributes{"a1", "a2", "a3", "a4", "a5",
"a6", "a7", "a8", "a9", "a10"};
const TStrVec people{"p1", "p2", "p3", "p4", "p5", "p6", "p7",
"p8", "p9", "p10", "p11", "p12", "p13", "p14",
"p15", "p16", "p17", "p18", "p19", "p20"};
TSizeVec heavyHitters{0, 4};
TSizeVec normal{1, 2, 3, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19};
std::size_t messagesPerBucket = heavyHitters.size() * attributes.size() +
normal.size();
test::CRandomNumbers rng;
core_t::TTime startTime{0};
TMessageVec messages;
for (core_t::TTime bucketStart = startTime;
bucketStart < 100 * bucketLength; bucketStart += bucketLength) {
TSizeVec times;
rng.generateUniformSamples(static_cast<std::size_t>(bucketStart),
static_cast<std::size_t>(bucketStart + bucketLength),
messagesPerBucket, times);
std::size_t m{0};
for (const auto& attribute : attributes) {
for (const auto& heavyHitter : heavyHitters) {
messages.emplace_back(static_cast<core_t::TTime>(times[m++]),
people[heavyHitter], attribute);
}
}
TSizeVec attributeIndexes;
rng.generateUniformSamples(0, attributes.size(), normal.size(), attributeIndexes);
std::size_t i{0};
for (auto& norm : normal) {
messages.emplace_back(static_cast<core_t::TTime>(times[m++]),
people[norm], attributes[attributeIndexes[i++]]);
}
}
std::sort(messages.begin(), messages.end());
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
model_t::TFeatureVec features{model_t::E_PopulationCountByBucketPersonAndAttribute,
model_t::E_PopulationUniquePersonCountByAttribute};
this->makeModel(params, features, startTime);
auto* populationModel = dynamic_cast<CEventRatePopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(populationModel);
core_t::TTime time{startTime};
for (const auto& message : messages) {
if (message.s_Time >= time + bucketLength) {
populationModel->sample(time, time + bucketLength, m_ResourceMonitor);
time += bucketLength;
}
this->addArrival(message, m_Gatherer);
}
// The heavy hitters generate one value per attribute per bucket.
// The rest generate one value per bucket. Therefore, we expect
// the mean person rate per bucket to be:
// ( ("# people" - "# heavy hitters" / "# attributes")
// + ("# heavy hitters"))
// / "# people"
double expectedRateWeight = (static_cast<double>(normal.size()) /
static_cast<double>(attributes.size()) +
static_cast<double>(heavyHitters.size())) /
static_cast<double>(people.size());
LOG_DEBUG(<< "expectedRateWeight = " << expectedRateWeight);
for (auto& heavyHitter : heavyHitters) {
LOG_DEBUG(<< "*** person = " << people[heavyHitter] << " ***");
std::size_t pid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(people[heavyHitter], pid));
for (std::size_t cid = 0; cid < attributes.size(); ++cid) {
double sampleRateWeight = populationModel->sampleRateWeight(pid, cid);
LOG_DEBUG(<< "attribute = " << populationModel->attributeName(cid)
<< ", sampleRateWeight = " << sampleRateWeight);
BOOST_REQUIRE_CLOSE_ABSOLUTE(expectedRateWeight, sampleRateWeight,
0.15 * expectedRateWeight);
}
}
for (auto& norm : normal) {
LOG_DEBUG(<< "*** person = " << people[norm] << " ***");
std::size_t pid;
BOOST_TEST_REQUIRE(m_Gatherer->personId(people[norm], pid));
for (std::size_t cid = 0; cid < attributes.size(); ++cid) {
double sampleRateWeight = populationModel->sampleRateWeight(pid, cid);
LOG_DEBUG(<< "attribute = " << populationModel->attributeName(cid)
<< ", sampleRateWeight = " << sampleRateWeight);
BOOST_REQUIRE_EQUAL(1.0, sampleRateWeight);
}
}
}
BOOST_FIXTURE_TEST_CASE(testPeriodicity, CTestFixture) {
// Create a daily periodic population and check that the
// periodicity is learned and compensated (approximately).
using TStrDoubleMap = std::map<std::string, double>;
static const core_t::TTime HOUR{3600};
static const core_t::TTime DAY{86400};
const core_t::TTime bucketLength{3600};
TDoubleVec rate{1, 1, 2, 2, 3, 5, 6, 6, 20, 21, 4, 3,
4, 4, 8, 25, 7, 6, 5, 1, 1, 4, 1, 1};
TDoubleStrPrVec attribs{{1.0, "a1"}, {1.5, "a2"}};
const TStrVec people{"p1", "p2", "p3", "p4", "p5",
"p6", "p7", "p8", "p9", "p10"};
test::CRandomNumbers rng;
core_t::TTime startTime{0};
core_t::TTime endTime{604800};
TMessageVec messages;
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
for (const auto& attrib : attribs) {
TUIntVec rates;
rng.generatePoissonSamples(attrib.first * rate[(time % DAY) / HOUR],
people.size(), rates);
std::size_t j{0};
for (const auto& rate_ : rates) {
for (unsigned int t = 0; t < rate_; ++t) {
messages.emplace_back(time + (t * bucketLength) / (rate_ + 1),
people[j], attrib.second);
}
++j;
}
}
}
std::sort(messages.begin(), messages.end());
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
params.s_MinimumModeCount = 24.0;
model_t::TFeatureVec features{model_t::E_PopulationCountByBucketPersonAndAttribute,
model_t::E_PopulationUniquePersonCountByAttribute};
this->makeModel(params, features, startTime);
TStrDoubleMap personProbabilitiesWithoutPeriodicity;
TStrDoubleMap personProbabilitiesWithPeriodicity;
core_t::TTime time{startTime};
for (const auto& message : messages) {
if (message.s_Time >= time + bucketLength) {
m_Model->sample(time, time + bucketLength, m_ResourceMonitor);
for (const auto& person : people) {
std::size_t pid;
if (!m_Gatherer->personId(person, pid)) {
continue;
}
CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING);
SAnnotatedProbability annotatedProbability;
if (m_Model->computeProbability(pid, time, time + bucketLength, partitioningFields,
1, annotatedProbability) == false) {
continue;
}
if (time < startTime + 3 * DAY) {
double& minimumProbability =
personProbabilitiesWithoutPeriodicity
.insert(TStrDoubleMap::value_type(person, 1.0))
.first->second;
minimumProbability = std::min(
minimumProbability, annotatedProbability.s_Probability);
} else if (time > startTime + 5 * DAY) {
double& minimumProbability =
personProbabilitiesWithPeriodicity
.insert(TStrDoubleMap::value_type(person, 1.0))
.first->second;
minimumProbability = std::min(
minimumProbability, annotatedProbability.s_Probability);
}
}
time += bucketLength;
}
this->addArrival(message, m_Gatherer);
}
double totalw{0.0};
double totalwo{0.0};
for (const auto& person : people) {
auto wo = personProbabilitiesWithoutPeriodicity.find(person);
auto w = personProbabilitiesWithPeriodicity.find(person);
LOG_DEBUG(<< "person = " << person);
LOG_DEBUG(<< "minimum probability with periodicity = " << w->second);
LOG_DEBUG(<< "minimum probability without periodicity = " << wo->second);
totalwo += wo->second;
totalw += w->second;
}
LOG_DEBUG(<< "total minimum probability with periodicity = " << totalw);
LOG_DEBUG(<< "total minimum probability without periodicity = " << totalwo);
BOOST_TEST_REQUIRE(totalw > 3.0 * totalwo);
}
BOOST_FIXTURE_TEST_CASE(testSkipSampling, CTestFixture) {
core_t::TTime startTime{100};
std::size_t bucketLength{100};
std::size_t maxAgeBuckets{5};
SModelParams params(bucketLength);
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
CEventRatePopulationModelFactory factory(params, interimBucketCorrector);
factory.features({model_t::E_PopulationCountByBucketPersonAndAttribute});
CModelFactory::SGathererInitializationData gathererNoGapInitData(startTime);
CModelFactory::TDataGathererPtr gathererNoGap(factory.makeDataGatherer(gathererNoGapInitData));
CModelFactory::SModelInitializationData modelNoGapInitData(gathererNoGap);
CAnomalyDetectorModel::TModelPtr modelNoGapHolder(factory.makeModel(modelNoGapInitData));
auto* modelNoGap = dynamic_cast<CEventRatePopulationModel*>(modelNoGapHolder.get());
this->addArrival(SMessage(100, "p1", "a1"), gathererNoGap);
this->addArrival(SMessage(100, "p1", "a2"), gathererNoGap);
this->addArrival(SMessage(100, "p2", "a1"), gathererNoGap);
modelNoGap->sample(100, 200, m_ResourceMonitor);
this->addArrival(SMessage(200, "p1", "a1"), gathererNoGap);
modelNoGap->sample(200, 300, m_ResourceMonitor);
this->addArrival(SMessage(300, "p1", "a1"), gathererNoGap);
modelNoGap->sample(300, 400, m_ResourceMonitor);
CModelFactory::SGathererInitializationData gathererWithGapInitData(startTime);
CModelFactory::TDataGathererPtr gathererWithGap(
factory.makeDataGatherer(gathererWithGapInitData));
CModelFactory::SModelInitializationData modelWithGapInitData(gathererWithGap);
CAnomalyDetectorModel::TModelPtr modelWithGapHolder(factory.makeModel(modelWithGapInitData));
auto* modelWithGap =
dynamic_cast<CEventRatePopulationModel*>(modelWithGapHolder.get());
this->addArrival(SMessage(100, "p1", "a1"), gathererWithGap);
this->addArrival(SMessage(100, "p1", "a2"), gathererWithGap);
this->addArrival(SMessage(100, "p2", "a1"), gathererWithGap);
modelWithGap->sample(100, 200, m_ResourceMonitor);
this->addArrival(SMessage(200, "p1", "a1"), gathererWithGap);
modelWithGap->skipSampling(1000);
LOG_DEBUG(<< "Calling sample over skipped interval should do nothing except print some ERRORs");
modelWithGap->sample(200, 1000, m_ResourceMonitor);
// Check prune does not remove people because last seen times are updated by adding gap duration
modelWithGap->prune(maxAgeBuckets);
BOOST_REQUIRE_EQUAL(2, gathererWithGap->numberActivePeople());
BOOST_REQUIRE_EQUAL(2, gathererWithGap->numberActiveAttributes());
this->addArrival(SMessage(1000, "p1", "a1"), gathererWithGap);
modelWithGap->sample(1000, 1100, m_ResourceMonitor);
this->addArrival(SMessage(1100, "p1", "a1"), gathererWithGap);
modelWithGap->sample(1100, 1200, m_ResourceMonitor);
// Check priors are the same
BOOST_REQUIRE_EQUAL(
static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelWithGap->details()->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 0))
->residualModel()
.checksum(),
static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelNoGap->details()->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 0))
->residualModel()
.checksum());
BOOST_REQUIRE_EQUAL(
static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelWithGap->details()->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 1))
->residualModel()
.checksum(),
static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelNoGap->details()->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 1))
->residualModel()
.checksum());
// Confirm last seen times are only updated by gap duration by forcing p2 and a2 to be pruned
modelWithGap->sample(1200, 1500, m_ResourceMonitor);
modelWithGap->prune(maxAgeBuckets);
// Age at this point will be 500 and since it's equal to maxAge it should still be here
BOOST_REQUIRE_EQUAL(2, gathererWithGap->numberActiveAttributes());
modelWithGap->sample(1500, 1600, m_ResourceMonitor);
modelWithGap->prune(maxAgeBuckets);
// Age at this point will be 600 so it should get pruned
BOOST_REQUIRE_EQUAL(1, gathererWithGap->numberActivePeople());
BOOST_REQUIRE_EQUAL(1, gathererWithGap->numberActiveAttributes());
}
BOOST_FIXTURE_TEST_CASE(testInterimCorrections, CTestFixture) {
core_t::TTime startTime{3600};
std::size_t bucketLength{3600};
SModelParams params(bucketLength);
model_t::TFeatureVec features{model_t::E_PopulationCountByBucketPersonAndAttribute};
this->makeModel(params, features, startTime);
auto* model = dynamic_cast<CEventRatePopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(model);
CCountingModel countingModel(params, m_Gatherer, m_InterimBucketCorrector);
test::CRandomNumbers rng;
core_t::TTime now{startTime};
core_t::TTime endTime = now + 2 * 24 * bucketLength;
TDoubleVec samples(3, 0.0);
while (now < endTime) {
rng.generateUniformSamples(50.0, 70.0, 3, samples);
for (std::size_t i = 0; i < static_cast<std::size_t>(samples[0] + 0.5); ++i) {
this->addArrival(SMessage(now, "p1", "a1"), m_Gatherer);
}
for (std::size_t i = 0; i < static_cast<std::size_t>(samples[1] + 0.5); ++i) {
this->addArrival(SMessage(now, "p2", "a1"), m_Gatherer);
}
for (std::size_t i = 0; i < static_cast<std::size_t>(samples[2] + 0.5); ++i) {
this->addArrival(SMessage(now, "p3", "a2"), m_Gatherer);
}
countingModel.sample(now, now + bucketLength, m_ResourceMonitor);
model->sample(now, now + bucketLength, m_ResourceMonitor);
now += bucketLength;
}
for (std::size_t i = 0; i < 35; ++i) {
this->addArrival(SMessage(now, "p1", "a1"), m_Gatherer);
}
for (std::size_t i = 0; i < 1; ++i) {
this->addArrival(SMessage(now, "p2", "a1"), m_Gatherer);
}
for (std::size_t i = 0; i < 100; ++i) {
this->addArrival(SMessage(now, "p3", "a2"), m_Gatherer);
}
countingModel.sampleBucketStatistics(now, now + bucketLength, m_ResourceMonitor);
model->sampleBucketStatistics(now, now + bucketLength, m_ResourceMonitor);
CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING);
SAnnotatedProbability annotatedProbability1;
annotatedProbability1.s_ResultType.set(model_t::CResultType::E_Interim);
BOOST_TEST_REQUIRE(model->computeProbability(
0 /*pid*/, now, now + bucketLength, partitioningFields, 1, annotatedProbability1));
SAnnotatedProbability annotatedProbability2;
annotatedProbability2.s_ResultType.set(model_t::CResultType::E_Interim);
BOOST_TEST_REQUIRE(model->computeProbability(
1 /*pid*/, now, now + bucketLength, partitioningFields, 1, annotatedProbability2));
SAnnotatedProbability annotatedProbability3;
annotatedProbability3.s_ResultType.set(model_t::CResultType::E_Interim);
BOOST_TEST_REQUIRE(model->computeProbability(
2 /*pid*/, now, now + bucketLength, partitioningFields, 1, annotatedProbability3));
model_t::CResultType type(model_t::CResultType::E_Unconditional |
model_t::CResultType::E_Interim);
TDouble1Vec p1a1Baseline = model->baselineBucketMean(
model_t::E_PopulationCountByBucketPersonAndAttribute, 0, 0, type, NO_CORRELATES, now);
TDouble1Vec p2a1Baseline = model->baselineBucketMean(
model_t::E_PopulationCountByBucketPersonAndAttribute, 0, 0, type, NO_CORRELATES, now);
TDouble1Vec p3a2Baseline = model->baselineBucketMean(
model_t::E_PopulationCountByBucketPersonAndAttribute, 2, 1, type, NO_CORRELATES, now);
LOG_DEBUG(<< "p1 probability = " << annotatedProbability1.s_Probability);
LOG_DEBUG(<< "p2 probability = " << annotatedProbability2.s_Probability);
LOG_DEBUG(<< "p3 probability = " << annotatedProbability3.s_Probability);
LOG_DEBUG(<< "p1a1 baseline = " << p1a1Baseline[0]);
LOG_DEBUG(<< "p2a1 baseline = " << p2a1Baseline[0]);
LOG_DEBUG(<< "p3a2 baseline = " << p3a2Baseline[0]);
BOOST_TEST_REQUIRE(annotatedProbability1.s_Probability > 0.05);
BOOST_TEST_REQUIRE(annotatedProbability2.s_Probability < 0.05);
BOOST_TEST_REQUIRE(annotatedProbability3.s_Probability < 0.05);
BOOST_TEST_REQUIRE(p1a1Baseline[0] > 45.0);
BOOST_TEST_REQUIRE(p1a1Baseline[0] < 46.0);
BOOST_TEST_REQUIRE(p2a1Baseline[0] > 45.0);
BOOST_TEST_REQUIRE(p2a1Baseline[0] < 46.0);
BOOST_TEST_REQUIRE(p3a2Baseline[0] > 59.0);
BOOST_TEST_REQUIRE(p3a2Baseline[0] < 61.0);
}
BOOST_FIXTURE_TEST_CASE(testPersistence, CTestFixture) {
core_t::TTime startTime{1367280000};
const core_t::TTime bucketLength{3600};
TMessageVec messages;
generateTestMessages(startTime, bucketLength, messages);
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
model_t::TFeatureVec features{model_t::E_PopulationCountByBucketPersonAndAttribute,
model_t::E_PopulationUniquePersonCountByAttribute};
this->makeModel(params, features, startTime);
auto* populationModel = dynamic_cast<CEventRatePopulationModel*>(m_Model.get());
BOOST_TEST_REQUIRE(populationModel);
for (const auto& message : messages) {
if (message.s_Time >= startTime + bucketLength) {
m_Model->sample(startTime, startTime + bucketLength, m_ResourceMonitor);
startTime += bucketLength;
}
this->addArrival(message, m_Gatherer);
}
std::ostringstream origJson;
core::CJsonStatePersistInserter::persist(
origJson, [& m_Model = m_Model](core::CJsonStatePersistInserter & inserter) {
m_Model->acceptPersistInserter(inserter);
});
LOG_TRACE(<< "origJson = " << origJson.str());
LOG_DEBUG(<< "origJson size = " << origJson.str().size());
// The traverser expects the state json in a embedded document
std::istringstream origJsonStrm{"{\"topLevel\" : " + origJson.str() + "}"};
core::CJsonStateRestoreTraverser traverser(origJsonStrm);
CAnomalyDetectorModel::TModelPtr restoredModel(
m_Factory->makeModel({m_Gatherer}, traverser));
populationModel = dynamic_cast<CEventRatePopulationModel*>(restoredModel.get());
BOOST_TEST_REQUIRE(populationModel);
// The JSON representation of the new data gatherer should be the same as the
// original
std::ostringstream newJson;
core::CJsonStatePersistInserter::persist(
newJson, [&restoredModel](core::CJsonStatePersistInserter& inserter) {
restoredModel->acceptPersistInserter(inserter);
});
LOG_DEBUG(<< "original checksum = " << m_Model->checksum(false));
LOG_DEBUG(<< "restored checksum = " << restoredModel->checksum(false));
BOOST_REQUIRE_EQUAL(m_Model->checksum(false), restoredModel->checksum(false));
BOOST_REQUIRE_EQUAL(origJson.str(), newJson.str());
}
BOOST_FIXTURE_TEST_CASE(testIgnoreSamplingGivenDetectionRules, CTestFixture) {
// Create 2 models, one of which has a skip sampling rule.
// The skip sampling rule doesn't cause the samples to be completely ignored,
// instead it applies a small multiplicative weighting when the rule applies.
// Feed the same data into both models including the case when the rule will apply
// for one model but not the other.
core_t::TTime startTime{100};
std::size_t bucketLength{100};
// Create a categorical rule to filter out attribute a3
std::string filterJson("[\"a3\"]");
core::CPatternSet valueFilter;
valueFilter.initFromJson(filterJson);
CDetectionRule rule;
rule.action(CDetectionRule::E_SkipModelUpdate);
rule.includeScope("byFieldName", valueFilter);
SModelParams paramsNoRules(bucketLength);
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
CEventRatePopulationModelFactory factory(paramsNoRules, interimBucketCorrector);
model_t::TFeatureVec features{model_t::E_PopulationCountByBucketPersonAndAttribute};
factory.features(features);
factory.fieldNames("partitionFieldName", "", "byFieldName", "", {});
CModelFactory::SGathererInitializationData gathererNoSkipInitData(startTime);
CModelFactory::TDataGathererPtr gathererNoSkip(
factory.makeDataGatherer(gathererNoSkipInitData));
CModelFactory::SModelInitializationData modelNoSkipInitData(gathererNoSkip);
CAnomalyDetectorModel::TModelPtr modelNoSkip(factory.makeModel(modelNoSkipInitData));
SModelParams paramsWithRules(bucketLength);
SModelParams::TDetectionRuleVec rules{rule};
paramsWithRules.s_DetectionRules = SModelParams::TDetectionRuleVecCRef(rules);
auto interimBucketCorrectorWithRules =
std::make_shared<CInterimBucketCorrector>(bucketLength);
CEventRatePopulationModelFactory factoryWithSkipRule(
paramsWithRules, interimBucketCorrectorWithRules);
factoryWithSkipRule.features(features);
factoryWithSkipRule.fieldNames("partitionFieldName", "", "byFieldName", "", {});
CModelFactory::SGathererInitializationData gathererWithSkipInitData(startTime);
CModelFactory::TDataGathererPtr gathererWithSkip(
factoryWithSkipRule.makeDataGatherer(gathererWithSkipInitData));
CModelFactory::SModelInitializationData modelWithSkipInitData(gathererWithSkip);
CAnomalyDetectorModel::TModelPtr modelWithSkip(
factoryWithSkipRule.makeModel(modelWithSkipInitData));
this->addArrival(SMessage(100, "p1", "a1"), gathererNoSkip);
this->addArrival(SMessage(100, "p1", "a1"), gathererWithSkip);
this->addArrival(SMessage(100, "p1", "a2"), gathererNoSkip);
this->addArrival(SMessage(100, "p1", "a2"), gathererWithSkip);
this->addArrival(SMessage(100, "p2", "a1"), gathererNoSkip);
this->addArrival(SMessage(100, "p2", "a1"), gathererWithSkip);
modelNoSkip->sample(100, 200, m_ResourceMonitor);
modelWithSkip->sample(100, 200, m_ResourceMonitor);
BOOST_REQUIRE_EQUAL(modelWithSkip->checksum(), modelNoSkip->checksum());
this->addArrival(SMessage(200, "p1", "a1"), gathererNoSkip);
this->addArrival(SMessage(200, "p1", "a1"), gathererWithSkip);
this->addArrival(SMessage(200, "p1", "a2"), gathererNoSkip);
this->addArrival(SMessage(200, "p1", "a2"), gathererWithSkip);
this->addArrival(SMessage(200, "p2", "a1"), gathererNoSkip);
this->addArrival(SMessage(200, "p2", "a1"), gathererWithSkip);
// These should be added to the "with skip" model but with a small multiplicative weighting
this->addArrival(SMessage(200, "p1", "a3"), gathererWithSkip);
this->addArrival(SMessage(200, "p1", "a3"), gathererNoSkip);
this->addArrival(SMessage(200, "p2", "a3"), gathererWithSkip);
this->addArrival(SMessage(200, "p2", "a3"), gathererNoSkip);
// Add another attribute that must not be skipped
this->addArrival(SMessage(200, "p1", "a4"), gathererNoSkip);
this->addArrival(SMessage(200, "p1", "a4"), gathererWithSkip);
this->addArrival(SMessage(200, "p2", "a4"), gathererNoSkip);
this->addArrival(SMessage(200, "p2", "a4"), gathererWithSkip);
modelNoSkip->sample(200, 300, m_ResourceMonitor);
modelWithSkip->sample(200, 300, m_ResourceMonitor);
// Checksums will be different because of the different weightings applied to the samples for attribute a3
BOOST_TEST_REQUIRE(modelWithSkip->checksum() != modelNoSkip->checksum());
auto modelWithSkipView = modelWithSkip->details();
auto modelNoSkipView = modelNoSkip->details();
// but the underlying models for attributes a1 and a2 are the same
std::uint64_t withSkipChecksum{
modelWithSkipView
->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 0)
->checksum()};
std::uint64_t noSkipChecksum{
modelNoSkipView
->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 0)
->checksum()};
BOOST_REQUIRE_EQUAL(withSkipChecksum, noSkipChecksum);
withSkipChecksum = modelWithSkipView
->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 1)
->checksum();
noSkipChecksum = modelNoSkipView
->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 1)
->checksum();
BOOST_REQUIRE_EQUAL(withSkipChecksum, noSkipChecksum);
// The skip model did see the a3 attribute but, due to the lower weighting given to the samples,
// the model checksums will differ.
withSkipChecksum = modelWithSkipView
->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 2)
->checksum();
noSkipChecksum = modelNoSkipView
->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 2)
->checksum();
BOOST_TEST_REQUIRE(withSkipChecksum != noSkipChecksum);
// The a4 models should be the same.
withSkipChecksum = modelWithSkipView
->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 3)
->checksum();
noSkipChecksum = modelNoSkipView
->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 3)
->checksum();
BOOST_REQUIRE_EQUAL(withSkipChecksum, noSkipChecksum);
// Check the last value times of all the underlying models are the same
const maths::time_series::CUnivariateTimeSeriesModel* timeSeriesModel{
dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelNoSkipView->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 0))};
BOOST_TEST_REQUIRE(timeSeriesModel);
const auto* trendModel = dynamic_cast<const maths::time_series::CTimeSeriesDecomposition*>(
&timeSeriesModel->trendModel());
BOOST_TEST_REQUIRE(trendModel);
core_t::TTime time = trendModel->lastValueTime();
BOOST_REQUIRE_EQUAL(model_t::sampleTime(model_t::E_PopulationCountByBucketPersonAndAttribute,
200, bucketLength),
time);
// The last times of the underlying time series models should all be the same
timeSeriesModel = dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelNoSkipView->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 1));
BOOST_REQUIRE_EQUAL(time, trendModel->lastValueTime());
timeSeriesModel = dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelNoSkipView->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 2));
BOOST_REQUIRE_EQUAL(time, trendModel->lastValueTime());
timeSeriesModel = dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelNoSkipView->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 3));
BOOST_REQUIRE_EQUAL(time, trendModel->lastValueTime());
timeSeriesModel = dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelWithSkipView->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 0));
BOOST_REQUIRE_EQUAL(time, trendModel->lastValueTime());
timeSeriesModel = dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelWithSkipView->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 1));
BOOST_REQUIRE_EQUAL(time, trendModel->lastValueTime());
timeSeriesModel = dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelWithSkipView->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 2));
BOOST_REQUIRE_EQUAL(time, trendModel->lastValueTime());
timeSeriesModel = dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>(
modelWithSkipView->model(model_t::E_PopulationCountByBucketPersonAndAttribute, 3));
BOOST_REQUIRE_EQUAL(time, trendModel->lastValueTime());
}
BOOST_AUTO_TEST_SUITE_END()