lib/model/unittest/CResourceLimitTest.cc (456 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <core/CRegex.h>
#include <maths/common/CIntegerTools.h>
#include <maths/common/CMultivariatePrior.h>
#include <model/CAnomalyDetector.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CDataGatherer.h>
#include <model/CEventRateModel.h>
#include <model/CEventRateModelFactory.h>
#include <model/CEventRatePopulationModel.h>
#include <model/CEventRatePopulationModelFactory.h>
#include <model/CHierarchicalResults.h>
#include <model/CHierarchicalResultsAggregator.h>
#include <model/CHierarchicalResultsPopulator.h>
#include <model/CHierarchicalResultsProbabilityFinalizer.h>
#include <model/CInterimBucketCorrector.h>
#include <model/CLimits.h>
#include <model/CMetricModel.h>
#include <model/CMetricModelFactory.h>
#include <model/CResourceMonitor.h>
#include <boost/test/unit_test.hpp>
#include <fstream>
#include <tuple>
#include <vector>
BOOST_AUTO_TEST_SUITE(CResourceLimitTest)
using namespace ml;
using namespace model;
using TStrVec = std::vector<std::string>;
class CResultWriter : public ml::model::CHierarchicalResultsVisitor {
public:
using TResultsTp =
std::tuple<core_t::TTime, double /* probability */, std::string /* byFieldName*/, std::string /* overFieldName */, std::string /* partitionFieldName */>;
using TResultsVec = std::vector<TResultsTp>;
public:
CResultWriter(const CAnomalyDetectorModelConfig& modelConfig, const CLimits& limits)
: m_ModelConfig(modelConfig), m_Limits(limits) {}
void operator()(CAnomalyDetector& detector, core_t::TTime start, core_t::TTime end) {
CHierarchicalResults results;
detector.buildResults(start, end, results);
results.buildHierarchy();
CHierarchicalResultsAggregator aggregator(m_ModelConfig);
results.bottomUpBreadthFirst(aggregator);
model::CHierarchicalResultsProbabilityFinalizer finalizer;
results.bottomUpBreadthFirst(finalizer);
model::CHierarchicalResultsPopulator populator(m_Limits);
results.bottomUpBreadthFirst(populator);
results.bottomUpBreadthFirst(*this);
}
void visit(const ml::model::CHierarchicalResults& results,
const ml::model::CHierarchicalResults::TNode& node,
bool pivot) override {
if (pivot) {
return;
}
if (!this->shouldWriteResult(m_Limits, results, node, pivot)) {
return;
}
if (this->isSimpleCount(node)) {
return;
}
if (!this->isLeaf(node)) {
return;
}
LOG_DEBUG(<< "Got anomaly @ " << node.s_BucketStartTime << ": "
<< node.probability());
ml::model::SAnnotatedProbability::TAttributeProbability1Vec& attributes =
node.s_AnnotatedProbability.s_AttributeProbabilities;
m_Results.push_back(TResultsTp(
node.s_BucketStartTime, node.probability(),
(attributes.empty() ? "" : *attributes[0].s_Attribute),
*node.s_Spec.s_PersonFieldValue, *node.s_Spec.s_PartitionFieldValue));
}
bool operator()(ml::core_t::TTime time,
const ml::model::CHierarchicalResults::TNode& node,
bool isBucketInfluencer) {
LOG_DEBUG(<< (isBucketInfluencer ? "BucketInfluencer" : "Influencer ")
<< node.s_Spec.print() << " initial score "
<< node.probability() << ", time: " << time);
return true;
}
const TResultsVec& results() const { return m_Results; }
private:
const CAnomalyDetectorModelConfig& m_ModelConfig;
const CLimits& m_Limits;
TResultsVec m_Results;
};
class CMockModelInterface {
public:
CMockModelInterface() = default;
virtual ~CMockModelInterface() = default;
virtual void createNewModels(std::size_t n, std::size_t m) = 0;
virtual void test(core_t::TTime time) = 0;
virtual std::size_t getNewPeople() const = 0;
virtual std::size_t getNewAttributes() const = 0;
};
//! A test wrapper around a real model that tracks calls to createNewModels
//! and simulates taking lots of memory
class CMockEventRateModel : public CEventRateModel, public CMockModelInterface {
public:
CMockEventRateModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TFeatureMathsModelSPtrPrVec& newFeatureModels,
const maths::common::CMultinomialConjugate& personProbabilityPrior,
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators,
CResourceMonitor& resourceMonitor)
: CEventRateModel(params,
dataGatherer,
newFeatureModels,
TFeatureMultivariatePriorSPtrPrVec(),
TFeatureCorrelationsPtrPrVec(),
personProbabilityPrior,
influenceCalculators,
std::make_shared<CInterimBucketCorrector>(params.s_BucketLength)),
m_ResourceMonitor(resourceMonitor), m_NewPeople(0), m_NewAttributes(0) {}
void updateRecycledModels() override {
// Do nothing
}
void createNewModels(std::size_t n, std::size_t m) override {
m_NewPeople += n;
m_NewAttributes += m;
this->CEventRateModel::createNewModels(n, m);
}
void test(core_t::TTime time) override {
m_ResourceMonitor.clearExtraMemory();
this->createUpdateNewModels(time, m_ResourceMonitor);
}
std::size_t getNewPeople() const override { return m_NewPeople; }
std::size_t getNewAttributes() const override { return m_NewAttributes; }
private:
CResourceMonitor& m_ResourceMonitor;
std::size_t m_NewPeople;
std::size_t m_NewAttributes;
};
//! A test wrapper around a real model that tracks calls to createNewModels
//! and simulates taking lots of memory
class CMockMetricModel : public CMetricModel, public CMockModelInterface {
public:
CMockMetricModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TFeatureMathsModelSPtrPrVec& newFeatureModels,
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators,
CResourceMonitor& resourceMonitor)
: CMetricModel(params,
dataGatherer,
newFeatureModels,
TFeatureMultivariatePriorSPtrPrVec(),
TFeatureCorrelationsPtrPrVec(),
influenceCalculators,
std::make_shared<CInterimBucketCorrector>(params.s_BucketLength)),
m_ResourceMonitor(resourceMonitor), m_NewPeople(0), m_NewAttributes(0) {}
void updateRecycledModels() override {
// Do nothing
}
void createNewModels(std::size_t n, std::size_t m) override {
m_NewPeople += n;
m_NewAttributes += m;
this->CMetricModel::createNewModels(n, m);
}
void test(core_t::TTime time) override {
m_ResourceMonitor.clearExtraMemory();
this->createUpdateNewModels(time, m_ResourceMonitor);
}
std::size_t getNewPeople() const override { return m_NewPeople; }
std::size_t getNewAttributes() const override { return m_NewAttributes; }
private:
CResourceMonitor& m_ResourceMonitor;
std::size_t m_NewPeople;
std::size_t m_NewAttributes;
};
void addArrival(core_t::TTime time,
const std::string& p,
CDataGatherer& gatherer,
CResourceMonitor& resourceMonitor) {
CDataGatherer::TStrCPtrVec fields;
fields.push_back(&p);
CEventData result;
result.time(time);
gatherer.addArrival(fields, result, resourceMonitor);
}
void addPersonData(std::size_t start,
std::size_t end,
core_t::TTime time,
CDataGatherer& gatherer,
CResourceMonitor& resourceMonitor) {
for (std::size_t i = start; i < end; i++) {
std::ostringstream ssA;
ssA << "person" << i;
addArrival(time, ssA.str(), gatherer, resourceMonitor);
}
}
const std::string VALUE("23");
void addMetricArrival(core_t::TTime time,
const std::string& p,
CDataGatherer& gatherer,
CResourceMonitor& resourceMonitor) {
CDataGatherer::TStrCPtrVec fields;
fields.push_back(&p);
fields.push_back(&VALUE);
CEventData result;
result.time(time);
gatherer.addArrival(fields, result, resourceMonitor);
}
void addPersonMetricData(std::size_t start,
std::size_t end,
core_t::TTime time,
CDataGatherer& gatherer,
CResourceMonitor& resourceMonitor) {
for (std::size_t i = start; i < end; i++) {
std::ostringstream ssA;
ssA << "person" << i;
addMetricArrival(time, ssA.str(), gatherer, resourceMonitor);
}
}
using TAddPersonDataFunc =
std::function<void(std::size_t, std::size_t, core_t::TTime, CDataGatherer&, CResourceMonitor&)>;
using TMockModelInterfacePtr = std::shared_ptr<CMockModelInterface>;
using TModelFactoryPtr = std::shared_ptr<CModelFactory>;
TAddPersonDataFunc createModel(model_t::EModelType modelType,
const std::string& emptyString,
core_t::TTime firstTime,
core_t::TTime bucketLength,
CResourceMonitor& resourceMonitor,
TMockModelInterfacePtr& model,
CModelFactory::TDataGathererPtr& gatherer,
TModelFactoryPtr& factory) {
switch (modelType) {
case model_t::E_EventRateOnline: {
// Test CEventRateModel::createUpdateNewModels()
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
factory = std::make_shared<CEventRateModelFactory>(params, interimBucketCorrector);
factory->detectorIndex(1);
factory->fieldNames(emptyString, emptyString, "pers", emptyString, TStrVec());
CModelFactory::TFeatureVec features;
features.push_back(model_t::E_IndividualCountByBucketAndPerson);
factory->features(features);
gatherer = factory->makeDataGatherer(firstTime);
const maths::common::CMultinomialConjugate conjugate;
std::shared_ptr<CMockEventRateModel> model_ = std::make_shared<CMockEventRateModel>(
factory->modelParams(), gatherer,
factory->defaultFeatureModels(features, bucketLength, 0.4, true), conjugate,
CAnomalyDetectorModel::TFeatureInfluenceCalculatorCPtrPrVecVec(),
resourceMonitor);
BOOST_REQUIRE_EQUAL(model_t::E_EventRateOnline, model_->category());
BOOST_TEST_REQUIRE(model_->isPopulation() == false);
model = model_;
return addPersonData;
}
case model_t::E_MetricOnline: {
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
factory = std::make_shared<CMetricModelFactory>(params, interimBucketCorrector);
factory->detectorIndex(1);
factory->fieldNames(emptyString, emptyString, "peep", "val", TStrVec());
factory->useNull(true);
CModelFactory::TFeatureVec features;
features.push_back(model_t::E_IndividualMeanByPerson);
features.push_back(model_t::E_IndividualMinByPerson);
features.push_back(model_t::E_IndividualMaxByPerson);
factory->features(features);
gatherer = factory->makeDataGatherer(firstTime);
std::shared_ptr<CMockMetricModel> model_ = std::make_shared<CMockMetricModel>(
factory->modelParams(), gatherer,
factory->defaultFeatureModels(features, bucketLength, 0.4, true),
CAnomalyDetectorModel::TFeatureInfluenceCalculatorCPtrPrVecVec(),
resourceMonitor);
BOOST_REQUIRE_EQUAL(model_t::E_MetricOnline, model_->category());
BOOST_TEST_REQUIRE(model_->isPopulation() == false);
model = model_;
return addPersonMetricData;
}
case model_t::E_Counting:
return nullptr;
}
return nullptr;
}
struct SLargeAllocationTestParams {
bool m_PersistInForeground;
std::size_t m_MemoryLimit;
std::size_t m_NumPeopleToBreachLimit;
std::size_t m_NewPeopleLowerBound;
std::size_t m_NewPeopleUpperBound;
model_t::EModelType m_ModelType;
};
void doTestLargeAllocations(SLargeAllocationTestParams& param) {
const std::string EMPTY_STRING("");
const core_t::TTime FIRST_TIME(358556400);
const core_t::TTime BUCKET_LENGTH(3600);
CResourceMonitor resourceMonitor(param.m_PersistInForeground, 1.0);
resourceMonitor.memoryLimit(std::size_t(param.m_MemoryLimit));
TMockModelInterfacePtr model;
CModelFactory::TDataGathererPtr gatherer;
TModelFactoryPtr factory;
TAddPersonDataFunc personAdder =
createModel(param.m_ModelType, EMPTY_STRING, FIRST_TIME, BUCKET_LENGTH,
resourceMonitor, model, gatherer, factory);
core_t::TTime time = FIRST_TIME;
BOOST_TEST_REQUIRE(resourceMonitor.areAllocationsAllowed());
// Add some people & attributes to the gatherer
// Run a sample
// Check that the models can create the right number of people/attributes
personAdder(0, 400, time, *gatherer, resourceMonitor);
BOOST_REQUIRE_EQUAL(400, gatherer->numberActivePeople());
LOG_DEBUG(<< "Testing for 1st time");
model->test(time);
BOOST_REQUIRE_EQUAL(400, gatherer->numberActivePeople());
BOOST_REQUIRE_EQUAL(400, model->getNewPeople());
BOOST_REQUIRE_EQUAL(0, model->getNewAttributes());
time += BUCKET_LENGTH;
personAdder(400, 1000, time, *gatherer, resourceMonitor);
model->test(time);
// This should add enough people to go over the memory limit
personAdder(1000, param.m_NumPeopleToBreachLimit, time, *gatherer, resourceMonitor);
LOG_DEBUG(<< "Testing for 2nd time");
model->test(time);
LOG_DEBUG(<< "# new people = " << model->getNewPeople());
BOOST_TEST_REQUIRE(model->getNewPeople() > param.m_NewPeopleLowerBound);
BOOST_TEST_REQUIRE(model->getNewPeople() < param.m_NewPeopleUpperBound);
BOOST_REQUIRE_EQUAL(0, model->getNewAttributes());
BOOST_REQUIRE_EQUAL(model->getNewPeople(), gatherer->numberActivePeople());
// Adding a small number of new people should be fine though,
// as they're allowed in
time += BUCKET_LENGTH;
std::size_t oldNumberPeople{model->getNewPeople()};
personAdder(param.m_NumPeopleToBreachLimit, param.m_NumPeopleToBreachLimit + 10,
time, *gatherer, resourceMonitor);
LOG_DEBUG(<< "Testing for 3rd time");
model->test(time);
BOOST_REQUIRE_EQUAL(oldNumberPeople + 10, model->getNewPeople());
BOOST_REQUIRE_EQUAL(0, model->getNewAttributes());
BOOST_REQUIRE_EQUAL(model->getNewPeople(), gatherer->numberActivePeople());
}
class CTestFixture {
protected:
void importCsvDataWithLimiter(core_t::TTime firstTime,
core_t::TTime bucketLength,
CResultWriter& outputResults,
const std::string& fileName,
CAnomalyDetector& detector,
std::size_t limitCutoff,
CResourceMonitor& resourceMonitor) {
using TifstreamPtr = std::shared_ptr<std::ifstream>;
TifstreamPtr ifs(new std::ifstream(fileName.c_str()));
BOOST_TEST_REQUIRE(ifs->is_open());
core::CRegex regex;
BOOST_TEST_REQUIRE(regex.init(","));
std::string line;
// read the header
BOOST_TEST_REQUIRE(std::getline(*ifs, line).good());
core_t::TTime lastBucketTime = firstTime;
std::size_t i = 0;
while (std::getline(*ifs, line)) {
if (i == limitCutoff) {
LOG_INFO(<< "Setting Limit cutoff now");
resourceMonitor.m_ByteLimitHigh = 0;
resourceMonitor.m_ByteLimitLow = 0;
}
LOG_TRACE(<< "Got string: " << line);
core::CRegex::TStrVec tokens;
regex.split(line, tokens);
core_t::TTime time;
BOOST_TEST_REQUIRE(core::CStringUtils::stringToType(tokens[0], time));
for (/**/; lastBucketTime + bucketLength <= time; lastBucketTime += bucketLength) {
outputResults(detector, lastBucketTime, lastBucketTime + bucketLength);
}
CAnomalyDetector::TStrCPtrVec fieldValues;
for (std::size_t t = tokens.size() - 1; t > 0; t--) {
fieldValues.push_back(&tokens[t]);
}
detector.addRecord(time, fieldValues);
++i;
}
outputResults(detector, lastBucketTime, lastBucketTime + bucketLength);
ifs.reset();
}
};
BOOST_FIXTURE_TEST_CASE(testLimitBy, CTestFixture) {
// Check that we can get some results from a test data set, then
// turn on resource limiting and still get the same results
static const core_t::TTime BUCKET_LENGTH(3600);
static const core_t::TTime FIRST_TIME(maths::common::CIntegerTools::ceil(
core_t::TTime(1407428000), BUCKET_LENGTH));
CResultWriter::TResultsVec results;
{
CAnomalyDetectorModelConfig modelConfig =
CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH);
modelConfig.useMultibucketFeatures(false);
CLimits limits;
CSearchKey key(1, // detectorIndex
function_t::E_IndividualMetric, false,
model_t::E_XF_None, "value", "colour");
CAnomalyDetector detector(limits, modelConfig, "", FIRST_TIME,
modelConfig.factory(key));
CResultWriter writer(modelConfig, limits);
importCsvDataWithLimiter(FIRST_TIME, BUCKET_LENGTH, writer,
"testfiles/resource_limits_8_series.csv",
detector, std::numeric_limits<std::size_t>::max(),
limits.resourceMonitor());
results = writer.results();
// expect there to be 2 anomalies
BOOST_REQUIRE_EQUAL(2, results.size());
BOOST_REQUIRE_EQUAL(core_t::TTime(1407571200), std::get<0>(results[0]));
BOOST_REQUIRE_EQUAL(core_t::TTime(1407715200), std::get<0>(results[1]));
BOOST_REQUIRE_EQUAL(8, detector.numberActivePeople());
}
{
// This time, repeat the test but set a resource limit to prevent
// any models from being created.
CAnomalyDetectorModelConfig modelConfig =
CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH);
CLimits limits;
CSearchKey key(1, // detectorIndex
function_t::E_IndividualMetric, false,
model_t::E_XF_None, "value", "colour");
CAnomalyDetector detector(limits, modelConfig, "", FIRST_TIME,
modelConfig.factory(key));
CResultWriter writer(modelConfig, limits);
importCsvDataWithLimiter(FIRST_TIME, BUCKET_LENGTH, writer,
"testfiles/resource_limits_8_series.csv",
detector, 1, limits.resourceMonitor());
const CResultWriter::TResultsVec& secondResults = writer.results();
BOOST_REQUIRE_EQUAL(0, secondResults.size());
}
}
BOOST_FIXTURE_TEST_CASE(testLimitByOver, CTestFixture) {
// Check that we can get some results from a test data set, then
// turn on resource limiting and still get the results from
// non-limited data, but not results from limited data
static const core_t::TTime BUCKET_LENGTH(3600);
static const core_t::TTime FIRST_TIME(maths::common::CIntegerTools::ceil(
core_t::TTime(1407441600), BUCKET_LENGTH));
CResultWriter::TResultsVec results;
{
CAnomalyDetectorModelConfig modelConfig =
CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH);
CLimits limits;
CSearchKey key(1, // detectorIndex
function_t::E_PopulationMetric, false,
model_t::E_XF_None, "value", "colour", "species");
CAnomalyDetector detector(limits, modelConfig, "", FIRST_TIME,
modelConfig.factory(key));
CResultWriter writer(modelConfig, limits);
importCsvDataWithLimiter(FIRST_TIME, BUCKET_LENGTH, writer,
"testfiles/resource_limits_8_2over.csv", detector,
std::numeric_limits<std::size_t>::max(),
limits.resourceMonitor());
results = writer.results();
// check we have the expected 4 anomalies
BOOST_REQUIRE_EQUAL(4, results.size());
BOOST_REQUIRE_EQUAL(2, detector.numberActivePeople());
BOOST_REQUIRE_EQUAL(3, detector.numberActiveAttributes());
}
// Now limit after 1 sample, so only expect no results
CAnomalyDetectorModelConfig modelConfig =
CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH);
CLimits limits;
CSearchKey key(1, // detectorIndex
function_t::E_PopulationMetric, false, model_t::E_XF_None,
"value", "colour", "species");
CAnomalyDetector detector(limits, modelConfig, "", FIRST_TIME,
modelConfig.factory(key));
CResultWriter writer(modelConfig, limits);
importCsvDataWithLimiter(FIRST_TIME, BUCKET_LENGTH, writer,
"testfiles/resource_limits_8_2over.csv", detector,
1, limits.resourceMonitor());
const CResultWriter::TResultsVec& secondResults = writer.results();
// should only have red flowers as results now
BOOST_REQUIRE_EQUAL(0, secondResults.size());
}
BOOST_FIXTURE_TEST_CASE(testLargeAllocations, CTestFixture) {
SLargeAllocationTestParams params[] = {
{false, 70, 3000, 2700, 2900, model_t::E_EventRateOnline},
{true, 70, 5000, 4500, 4700, model_t::E_EventRateOnline},
{false, 100, 4000, 3400, 3700, model_t::E_MetricOnline},
{true, 100, 7000, 5900, 6100, model_t::E_MetricOnline}};
for (auto& param : params) {
doTestLargeAllocations(param);
}
}
BOOST_AUTO_TEST_SUITE_END()