lib/api/unittest/CAnomalyJobLimitTest.cc (392 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <core/CJsonOutputStreamWrapper.h>
#include <core/CoreTypes.h>
#include <maths/common/CIntegerTools.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CLimits.h>
#include <api/CAnomalyJobConfig.h>
#include <api/CCsvInputParser.h>
#include <api/CHierarchicalResultsWriter.h>
#include <api/CJsonOutputWriter.h>
#include <test/BoostTestCloseAbsolute.h>
#include <test/CRandomNumbers.h>
#include "CTestAnomalyJob.h"
#include <boost/test/unit_test.hpp>
#include <fstream>
#include <set>
#include <sstream>
#include <string>
BOOST_TEST_DONT_PRINT_LOG_VALUE(json::object::iterator)
BOOST_AUTO_TEST_SUITE(CAnomalyJobLimitTest)
using namespace ml;
std::set<std::string> getUniqueValues(const std::string& key, const std::string& output) {
std::set<std::string> values;
json::error_code ec;
LOG_DEBUG(<< "Parsing: [ " << output << " ]");
json::value doc = json::parse(output, ec);
BOOST_TEST_REQUIRE(ec.failed() == false);
BOOST_TEST_REQUIRE(doc.is_array());
size_t i = 0;
while (true) {
doc.find_pointer("/" + std::to_string(i), ec);
if (ec.failed() == false) {
size_t j = 0;
while (true) {
json::value* p2 = doc.find_pointer(
"/" + std::to_string(i) + "/records/" + std::to_string(j), ec);
if (p2 != nullptr) {
size_t k = 0;
while (true) {
json::value* p3 = doc.find_pointer(
"/" + std::to_string(i) + "/records/" + std::to_string(j) +
"/causes/" + std::to_string(k) + "/" + key,
ec);
if (p3 != nullptr) {
values.insert(std::string(p3->as_string()));
} else {
break;
}
++k;
}
} else {
break;
}
++j;
}
} else {
break;
}
++i;
}
return values;
}
BOOST_AUTO_TEST_CASE(testAccuracy) {
// Check that the amount of memory used when we go over the
// resource limit is close enough to the limit that we specified
std::size_t nonLimitedUsage{0};
std::size_t limitedUsage{0};
{
// Without limits, this data set should make the models around
// 1230000 bytes
// Run the data once to find out what the current platform uses
ml::api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig(
"metric", "value", "colour", "species", "greenhouse");
model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(3600);
std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);
model::CLimits limits;
//limits.resourceMonitor().m_ByteLimitHigh = 100000;
//limits.resourceMonitor().m_ByteLimitLow = 90000;
{
LOG_TRACE(<< "Setting up job");
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream);
std::ifstream inputStrm("testfiles/resource_accuracy.csv");
BOOST_TEST_REQUIRE(inputStrm.is_open());
api::CCsvInputParser parser(inputStrm);
LOG_TRACE(<< "Reading file");
BOOST_TEST_REQUIRE(parser.readStreamIntoMaps(
[&job](const CTestAnomalyJob::TStrStrUMap& dataRowFields) {
return job.handleRecord(dataRowFields);
}));
LOG_TRACE(<< "Checking results");
BOOST_REQUIRE_EQUAL(uint64_t(18630), job.numRecordsHandled());
nonLimitedUsage = limits.resourceMonitor().totalMemory();
}
}
{
// Now run the data with limiting
ml::api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig(
"metric", "value", "colour", "species", "greenhouse");
model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(3600);
model::CLimits limits;
std::stringstream outputStrm;
{
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);
limits.resourceMonitor().m_ByteLimitHigh = nonLimitedUsage / 10;
limits.resourceMonitor().m_ByteLimitLow =
limits.resourceMonitor().m_ByteLimitHigh - 1024;
LOG_TRACE(<< "Setting up job");
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream);
std::ifstream inputStrm("testfiles/resource_accuracy.csv");
BOOST_TEST_REQUIRE(inputStrm.is_open());
api::CCsvInputParser parser(inputStrm);
LOG_TRACE(<< "Reading file");
BOOST_TEST_REQUIRE(parser.readStreamIntoMaps(
[&job](const CTestAnomalyJob::TStrStrUMap& dataRowFields) {
return job.handleRecord(dataRowFields);
}));
LOG_TRACE(<< "Checking results");
BOOST_REQUIRE_EQUAL(uint64_t(18630), job.numRecordsHandled());
// TODO this limit must be tightened once there is more granular
// control over the model memory creation
limitedUsage = limits.resourceMonitor().totalMemory();
}
LOG_TRACE(<< outputStrm.str());
LOG_DEBUG(<< "Non-limited usage: " << nonLimitedUsage << "; limited: " << limitedUsage);
BOOST_TEST_REQUIRE(limitedUsage < nonLimitedUsage);
}
}
BOOST_AUTO_TEST_CASE(testLimit) {
using TStrSet = std::set<std::string>;
std::stringstream outputStrm;
{
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);
// Run the data without any resource limits and check that
// all the expected fields are in the results set
model::CLimits limits;
api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig(
"metric", "value", "colour", "species", "greenhouse");
model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(3600);
LOG_TRACE(<< "Setting up job");
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream);
std::ifstream inputStrm("testfiles/resource_limits_3_2over_3partition.csv");
BOOST_TEST_REQUIRE(inputStrm.is_open());
api::CCsvInputParser parser(inputStrm);
LOG_TRACE(<< "Reading file");
BOOST_TEST_REQUIRE(parser.readStreamIntoMaps(
[&job](const CTestAnomalyJob::TStrStrUMap& dataRowFields) {
return job.handleRecord(dataRowFields);
}));
LOG_TRACE(<< "Checking results");
BOOST_REQUIRE_EQUAL(uint64_t(1176), job.numRecordsHandled());
}
std::string out = outputStrm.str();
TStrSet partitions = getUniqueValues("partition_field_value", out);
TStrSet people = getUniqueValues("over_field_value", out);
TStrSet attributes = getUniqueValues("by_field_value", out);
BOOST_REQUIRE_EQUAL(3, partitions.size());
BOOST_REQUIRE_EQUAL(2, people.size());
BOOST_REQUIRE_EQUAL(2, attributes.size());
outputStrm.str("");
outputStrm.clear();
{
// Run the data with some resource limits after the first 4 records and
// check that we get only anomalies from the first 2 partitions
model::CLimits limits;
ml::api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig(
"metric", "value", "colour", "species", "greenhouse");
model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(3600);
//::CMockOutputWriter resultsHandler;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);
LOG_TRACE(<< "Setting up job");
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream);
std::ifstream inputStrm("testfiles/resource_limits_3_2over_3partition_first8.csv");
BOOST_TEST_REQUIRE(inputStrm.is_open());
api::CCsvInputParser parser(inputStrm);
LOG_TRACE(<< "Reading file");
BOOST_TEST_REQUIRE(parser.readStreamIntoMaps(
[&job](const CTestAnomalyJob::TStrStrUMap& dataRowFields) {
return job.handleRecord(dataRowFields);
}));
// Now turn on the resource limiting
limits.resourceMonitor().m_ByteLimitHigh = 0;
limits.resourceMonitor().m_ByteLimitLow = 0;
limits.resourceMonitor().m_AllowAllocations = false;
std::ifstream inputStrm2("testfiles/resource_limits_3_2over_3partition_last1169.csv");
BOOST_TEST_REQUIRE(inputStrm2.is_open());
api::CCsvInputParser parser2(inputStrm2);
LOG_TRACE(<< "Reading second file");
BOOST_TEST_REQUIRE(parser2.readStreamIntoMaps(
[&job](const CTestAnomalyJob::TStrStrUMap& dataRowFields) {
return job.handleRecord(dataRowFields);
}));
LOG_TRACE(<< "Checking results");
BOOST_REQUIRE_EQUAL(uint64_t(1180), job.numRecordsHandled());
}
out = outputStrm.str();
partitions = getUniqueValues("partition_field_value", out);
people = getUniqueValues("over_field_value", out);
attributes = getUniqueValues("by_field_value", out);
BOOST_REQUIRE_EQUAL(1, partitions.size());
BOOST_REQUIRE_EQUAL(2, people.size());
BOOST_REQUIRE_EQUAL(1, attributes.size());
}
BOOST_AUTO_TEST_CASE(testModelledEntityCountForFixedMemoryLimit) {
using TOptionalDouble = std::optional<double>;
using TDoubleVec = std::vector<double>;
using TSizeVec = std::vector<std::size_t>;
using TGenerator = std::function<TOptionalDouble(core_t::TTime)>;
using TGeneratorVec = std::vector<TGenerator>;
test::CRandomNumbers rng;
// Generators for a variety of data characteristics.
TGenerator periodic = [&rng](core_t::TTime time) {
TDoubleVec noise;
rng.generateNormalSamples(0.0, 3.0, 1, noise);
return TOptionalDouble{20.0 * std::sin(2.0 * boost::math::double_constants::pi *
static_cast<double>(time) /
static_cast<double>(core::constants::DAY)) +
noise[0]};
};
TGenerator tradingDays = [&periodic](core_t::TTime time) {
double amplitude[]{1.0, 1.0, 0.7, 0.8, 1.0, 0.1, 0.1};
return TOptionalDouble{amplitude[(time % core::constants::WEEK) / core::constants::DAY] *
*periodic(time)};
};
TGenerator level = [&rng](core_t::TTime) {
TDoubleVec noise;
rng.generateNormalSamples(10.0, 5.0, 1, noise);
return TOptionalDouble{noise[0]};
};
TGenerator ramp = [&rng](core_t::TTime time) {
TDoubleVec noise;
rng.generateNormalSamples(0.0, 1.0, 1, noise);
return TOptionalDouble{static_cast<double>(time) /
static_cast<double>(core::constants::DAY) +
noise[0]};
};
TGenerator sparse = [&rng, &level](core_t::TTime time) {
TDoubleVec uniform01;
rng.generateUniformSamples(0.0, 1.0, 1, uniform01);
return uniform01[0] < 0.1 ? level(time) : std::nullopt;
};
// We assert on the number of by, partition and over fields we can
// create for a small(ish) memory limit to catch large changes in
// the memory used per partition of the data.
struct STestParams {
core_t::TTime s_BucketLength;
std::size_t s_ExpectedByFields;
std::size_t s_ExpectedOverFields;
std::size_t s_ExpectedPartitionFields;
std::size_t s_ExpectedByMemoryUsageRelativeErrorDivisor;
std::size_t s_ExpectedPartitionUsageRelativeErrorDivisor;
std::size_t s_ExpectedOverUsageRelativeErrorDivisor;
} testParams[]{{600, 500, 5200, 290, 27, 25, 2},
{3600, 500, 5500, 280, 27, 25, 2},
{172800, 65, 850, 50, 6, 5, 2}};
for (const auto& testParam : testParams) {
TGeneratorVec generators{periodic, tradingDays, level, ramp, sparse};
std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);
CTestAnomalyJob::TStrStrUMap dataRows;
TSizeVec generator;
LOG_DEBUG(<< "**** Test by with bucketLength = " << testParam.s_BucketLength << " ****");
{
std::size_t memoryLimit{10 /*MB*/};
model::CLimits limits;
limits.resourceMonitor().memoryLimit(memoryLimit);
ml::api::CAnomalyJobConfig jobConfig =
CTestAnomalyJob::makeSimpleJobConfig("mean", "foo", "bar", "", "");
model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(testParam.s_BucketLength);
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream);
core_t::TTime startTime{1495110323};
core_t::TTime endTime{1495260323};
core_t::TTime time{startTime};
double reportProgress{0.0};
for (/**/; time < endTime; time += testParam.s_BucketLength) {
double progress{static_cast<double>(time - startTime) /
static_cast<double>(endTime - startTime)};
if (progress >= reportProgress) {
LOG_DEBUG(<< "Processed " << std::floor(100.0 * progress) << "%");
reportProgress += 0.1;
}
for (std::size_t i = 0; i < 900; ++i) {
rng.generateUniformSamples(0, generators.size(), 1, generator);
TOptionalDouble value{generators[generator[0]](time)};
if (value) {
dataRows["time"] = core::CStringUtils::typeToString(time);
dataRows["foo"] = core::CStringUtils::typeToString(*value);
dataRows["bar"] = "b" + core::CStringUtils::typeToString(i + 1);
BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
}
}
}
core_t::TTime startOfBucket{
maths::common::CIntegerTools::floor(time, testParam.s_BucketLength)};
auto used = limits.resourceMonitor().createMemoryUsageReport(startOfBucket);
LOG_DEBUG(<< "# by = " << used.s_ByFields);
LOG_DEBUG(<< "# partition = " << used.s_PartitionFields);
LOG_DEBUG(<< "Memory status = " << used.s_MemoryStatus);
LOG_DEBUG(<< "Memory usage bytes = " << used.s_Usage);
LOG_DEBUG(<< "Memory limit bytes = "
<< memoryLimit * core::constants::BYTES_IN_MEGABYTES);
BOOST_TEST_REQUIRE(used.s_ByFields > testParam.s_ExpectedByFields);
BOOST_TEST_REQUIRE(used.s_ByFields < 800);
BOOST_REQUIRE_EQUAL(2, used.s_PartitionFields);
BOOST_REQUIRE_CLOSE_ABSOLUTE(
memoryLimit * core::constants::BYTES_IN_MEGABYTES / 2, used.s_Usage,
memoryLimit * core::constants::BYTES_IN_MEGABYTES /
testParam.s_ExpectedByMemoryUsageRelativeErrorDivisor);
}
LOG_DEBUG(<< "**** Test partition with bucketLength = " << testParam.s_BucketLength
<< " ****");
{
std::size_t memoryLimit{10 /*MB*/};
model::CLimits limits;
limits.resourceMonitor().memoryLimit(memoryLimit);
ml::api::CAnomalyJobConfig jobConfig =
CTestAnomalyJob::makeSimpleJobConfig("mean", "foo", "", "", "bar");
model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(testParam.s_BucketLength);
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream);
core_t::TTime startTime{1495110323};
core_t::TTime endTime{1495260323};
core_t::TTime time{startTime};
double reportProgress{0.0};
for (/**/; time < endTime; time += testParam.s_BucketLength) {
double progress{static_cast<double>(time - startTime) /
static_cast<double>(endTime - startTime)};
if (progress >= reportProgress) {
LOG_DEBUG(<< "Processed " << std::floor(100.0 * progress) << "%");
reportProgress += 0.1;
}
for (std::size_t i = 0; i < 500; ++i) {
rng.generateUniformSamples(0, generators.size(), 1, generator);
TOptionalDouble value{generators[generator[0]](time)};
if (value) {
dataRows["time"] = core::CStringUtils::typeToString(time);
dataRows["foo"] = core::CStringUtils::typeToString(*value);
dataRows["bar"] = "b" + core::CStringUtils::typeToString(i + 1);
BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
}
}
}
core_t::TTime startOfBucket{
maths::common::CIntegerTools::floor(time, testParam.s_BucketLength)};
auto used = limits.resourceMonitor().createMemoryUsageReport(startOfBucket);
LOG_DEBUG(<< "# by = " << used.s_ByFields);
LOG_DEBUG(<< "# partition = " << used.s_PartitionFields);
LOG_DEBUG(<< "Memory status = " << used.s_MemoryStatus);
LOG_DEBUG(<< "Memory usage = " << used.s_Usage);
LOG_DEBUG(<< "Memory limit bytes = " << memoryLimit * 1024 * 1024);
BOOST_TEST_REQUIRE(used.s_PartitionFields >= testParam.s_ExpectedPartitionFields);
BOOST_TEST_REQUIRE(used.s_PartitionFields < 450);
BOOST_TEST_REQUIRE(static_cast<double>(used.s_ByFields) >
0.96 * static_cast<double>(used.s_PartitionFields));
BOOST_REQUIRE_CLOSE_ABSOLUTE(
memoryLimit * core::constants::BYTES_IN_MEGABYTES / 2, used.s_Usage,
memoryLimit * core::constants::BYTES_IN_MEGABYTES /
testParam.s_ExpectedPartitionUsageRelativeErrorDivisor);
}
LOG_DEBUG(<< "**** Test over with bucketLength = " << testParam.s_BucketLength
<< " ****");
{
std::size_t memoryLimit{20 /*MB*/};
model::CLimits limits;
limits.resourceMonitor().memoryLimit(memoryLimit);
ml::api::CAnomalyJobConfig jobConfig =
CTestAnomalyJob::makeSimpleJobConfig("mean", "foo", "", "bar", "");
model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(testParam.s_BucketLength);
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream);
core_t::TTime startTime{1495110323};
core_t::TTime endTime{1495230323};
core_t::TTime time{startTime};
double reportProgress{0.0};
for (/**/; time < endTime; time += testParam.s_BucketLength) {
double progress{static_cast<double>(time - startTime) /
static_cast<double>(endTime - startTime)};
if (progress >= reportProgress) {
LOG_DEBUG(<< "Processed " << std::floor(100.0 * progress) << "%");
reportProgress += 0.1;
}
for (std::size_t i = 0; i < 9000; ++i) {
TOptionalDouble value{sparse(time)};
if (value) {
dataRows["time"] = core::CStringUtils::typeToString(time);
dataRows["foo"] = core::CStringUtils::typeToString(*value);
dataRows["bar"] = "b" + core::CStringUtils::typeToString(i + 1);
BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
}
}
}
core_t::TTime startOfBucket{
maths::common::CIntegerTools::floor(time, testParam.s_BucketLength)};
auto used = limits.resourceMonitor().createMemoryUsageReport(startOfBucket);
LOG_DEBUG(<< "# over = " << used.s_OverFields);
LOG_DEBUG(<< "Memory status = " << used.s_MemoryStatus);
LOG_DEBUG(<< "Memory usage = " << used.s_Usage);
LOG_DEBUG(<< "Memory limit bytes = " << memoryLimit * 1024 * 1024);
BOOST_TEST_REQUIRE(used.s_OverFields > testParam.s_ExpectedOverFields);
BOOST_TEST_REQUIRE(used.s_OverFields <= 9000);
BOOST_REQUIRE_CLOSE_ABSOLUTE(
memoryLimit * core::constants::BYTES_IN_MEGABYTES / 2, used.s_Usage,
memoryLimit * core::constants::BYTES_IN_MEGABYTES /
testParam.s_ExpectedOverUsageRelativeErrorDivisor);
}
}
}
BOOST_AUTO_TEST_SUITE_END()