lib/model/unittest/CMetricPopulationDataGathererTest.cc (920 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <core/CContainerPrinter.h>
#include <core/CJsonStatePersistInserter.h>
#include <core/CJsonStateRestoreTraverser.h>
#include <core/CoreTypes.h>
#include <core/UnwrapRef.h>
#include <model/CDataGatherer.h>
#include <model/CEventData.h>
#include <model/CInterimBucketCorrector.h>
#include <model/CMetricPopulationModelFactory.h>
#include <model/CResourceMonitor.h>
#include <model/CSampleGatherer.h>
#include <model/ModelTypes.h>
#include <test/BoostTestCloseAbsolute.h>
#include <test/CRandomNumbers.h>
#include <algorithm>
#include <boost/lexical_cast.hpp>
#include <boost/test/unit_test.hpp>
#include <map>
#include <ranges>
#include <string>
#include <utility>
#include <vector>
#include "ModelTestHelpers.h"
BOOST_AUTO_TEST_SUITE(CMetricPopulationDataGathererTest)
using namespace ml;
using namespace model;
namespace {
using TDoubleVec = std::vector<double>;
using TStrVec = std::vector<std::string>;
using TStrStrPr = std::pair<std::string, std::string>;
using TStrStrPrDoubleMap = std::map<TStrStrPr, double>;
using TSizeSizePr = std::pair<std::size_t, std::size_t>;
using TSizeSizePrFeatureDataPr = std::pair<TSizeSizePr, SMetricFeatureData>;
using TSizeSizePrFeatureDataPrVec = std::vector<TSizeSizePrFeatureDataPr>;
using TFeatureSizeSizePrFeatureDataPrVecPr =
std::pair<model_t::EFeature, TSizeSizePrFeatureDataPrVec>;
using TFeatureSizeSizePrFeatureDataPrVecPrVec = std::vector<TFeatureSizeSizePrFeatureDataPrVecPr>;
struct SMessage {
SMessage(const core_t::TTime& time,
std::string person,
std::string attribute,
const double& value,
TStrVec influences = TStrVec())
: s_Time(time), s_Person(std::move(person)),
s_Attribute(std::move(attribute)), s_Value(value),
s_Influences(std::move(influences)) {}
core_t::TTime s_Time;
std::string s_Person;
std::string s_Attribute;
double s_Value;
TStrVec s_Influences;
};
using TMessageVec = std::vector<SMessage>;
TStrVec vec(const std::string& s1, const std::string& s2) {
TStrVec result(1, s1);
result.push_back(s2);
return result;
}
void generateTestMessages(const core_t::TTime& startTime, TMessageVec& result) {
constexpr std::size_t numberMessages = 100000;
constexpr std::size_t numberPeople = 40;
constexpr std::size_t numberCategories = 10;
constexpr std::array locations = {1.0, 2.0, 5.0, 15.0, 3.0,
0.5, 10.0, 17.0, 8.5, 1.5};
constexpr std::array scales = {1.0, 1.0, 3.0, 2.0, 0.5,
0.5, 2.0, 3.0, 4.0, 1.0};
result.clear();
result.reserve(numberMessages);
test::CRandomNumbers rng;
TDoubleVec times;
rng.generateUniformSamples(0.0, 86399.99, numberMessages, times);
std::sort(times.begin(), times.end());
TDoubleVec people;
rng.generateUniformSamples(0.0, static_cast<double>(numberPeople) - 0.01,
numberMessages, people);
TDoubleVec categories;
rng.generateUniformSamples(0.0, static_cast<double>(numberCategories) - 0.01,
numberMessages, categories);
for (std::size_t i = 0; i < numberMessages; ++i) {
core_t::TTime const time = startTime + static_cast<core_t::TTime>(times[i]);
auto const person = static_cast<std::size_t>(people[i]);
auto const attribute = static_cast<std::size_t>(categories[i]);
TDoubleVec value;
rng.generateNormalSamples(locations[attribute], scales[attribute], 1U, value);
result.emplace_back(
time, std::string("p") + boost::lexical_cast<std::string>(person),
std::string("c") + boost::lexical_cast<std::string>(attribute), value[0]);
}
}
void addArrival(const SMessage& message, CDataGatherer& gatherer, CResourceMonitor& resourceMonitor) {
CDataGatherer::TStrCPtrVec fields;
fields.push_back(&message.s_Person);
fields.push_back(&message.s_Attribute);
for (const auto& s_Influence : message.s_Influences) {
if (s_Influence.empty()) {
fields.push_back(static_cast<std::string*>(nullptr));
} else {
fields.push_back(&s_Influence);
}
}
std::string const value = core::CStringUtils::typeToStringPrecise(
message.s_Value, core::CIEEE754::E_DoublePrecision);
fields.push_back(&value);
CEventData result;
result.time(message.s_Time);
gatherer.addArrival(fields, result, resourceMonitor);
}
const CSearchKey searchKey;
const std::string EMPTY_STRING;
} // unnamed::
class CTestFixture {
protected:
CResourceMonitor m_ResourceMonitor;
};
BOOST_FIXTURE_TEST_CASE(testMean, CTestFixture) {
// Test that we correctly sample the bucket means.
using TMeanAccumulator = maths::common::CBasicStatistics::SSampleMean<double>::TAccumulator;
using TStrStrPrMeanAccumulatorMap = std::map<TStrStrPr, TMeanAccumulator>;
constexpr core_t::TTime startTime = 1373932800;
constexpr core_t::TTime bucketLength = 3600;
TMessageVec messages;
generateTestMessages(startTime, messages);
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
CMetricPopulationModelFactory factory(params, interimBucketCorrector);
model_t::TFeatureVec features{model_t::E_PopulationMeanByPersonAndAttribute};
factory.features(features);
CModelFactory::SGathererInitializationData const initData(startTime);
CModelFactory::TDataGathererPtr const gathererPtr(factory.makeDataGatherer(initData));
CDataGatherer& gatherer(*gathererPtr);
BOOST_TEST_REQUIRE(gatherer.isPopulation());
TStrStrPrMeanAccumulatorMap accumulators;
core_t::TTime bucketStart = startTime;
for (auto& message : messages) {
if (message.s_Time >= bucketStart + bucketLength) {
LOG_DEBUG(<< "Processing bucket [" << bucketStart << ", "
<< bucketStart + bucketLength << ")");
TFeatureSizeSizePrFeatureDataPrVecPrVec tmp;
gatherer.featureData(bucketStart, bucketLength, tmp);
BOOST_REQUIRE_EQUAL(features.size(), tmp.size());
BOOST_REQUIRE_EQUAL(features[0], tmp[0].first);
const TSizeSizePrFeatureDataPrVec& data = tmp[0].second;
BOOST_REQUIRE_EQUAL(accumulators.size(), data.size());
TStrStrPrDoubleMap means;
for (const auto& j : data) {
if (j.second.s_BucketValue) {
means[TStrStrPr(gatherer.personName(j.first.first),
gatherer.attributeName(j.first.second))] =
j.second.s_BucketValue->value()[0];
}
}
TStrStrPrDoubleMap expectedMeans;
for (const auto & [ fst, snd ] : accumulators) {
expectedMeans[fst] = maths::common::CBasicStatistics::mean(snd);
}
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedMeans),
core::CContainerPrinter::print(means));
bucketStart += bucketLength;
accumulators.clear();
}
addArrival(message, gatherer, m_ResourceMonitor);
accumulators[TStrStrPr(message.s_Person, message.s_Attribute)].add(
message.s_Value);
}
}
BOOST_FIXTURE_TEST_CASE(testMin, CTestFixture) {
// Test that we correctly sample the bucket minimums.
using TMinAccumulator =
maths::common::CBasicStatistics::COrderStatisticsStack<double, 1U>;
using TStrStrPrMinAccumulatorMap = std::map<TStrStrPr, TMinAccumulator>;
constexpr core_t::TTime startTime = 1373932800;
constexpr core_t::TTime bucketLength = 3600;
TMessageVec messages;
generateTestMessages(startTime, messages);
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
CMetricPopulationModelFactory factory(params, interimBucketCorrector);
model_t::TFeatureVec features{model_t::E_PopulationMinByPersonAndAttribute};
factory.features(features);
CModelFactory::SGathererInitializationData const initData(startTime);
CModelFactory::TDataGathererPtr const gathererPtr(factory.makeDataGatherer(initData));
CDataGatherer& gatherer(*gathererPtr);
TStrStrPrMinAccumulatorMap accumulators;
core_t::TTime bucketStart = startTime;
for (auto& message : messages) {
if (message.s_Time >= bucketStart + bucketLength) {
LOG_DEBUG(<< "Processing bucket [" << bucketStart << ", "
<< bucketStart + bucketLength << ")");
TFeatureSizeSizePrFeatureDataPrVecPrVec tmp;
gatherer.featureData(bucketStart, bucketLength, tmp);
BOOST_REQUIRE_EQUAL(features.size(), tmp.size());
BOOST_REQUIRE_EQUAL(features[0], tmp[0].first);
const TSizeSizePrFeatureDataPrVec& data = tmp[0].second;
BOOST_REQUIRE_EQUAL(accumulators.size(), data.size());
TStrStrPrDoubleMap mins;
for (const auto& j : data) {
if (j.second.s_BucketValue) {
mins[TStrStrPr(gatherer.personName(j.first.first),
gatherer.attributeName(j.first.second))] =
j.second.s_BucketValue->value()[0];
}
}
TStrStrPrDoubleMap expectedMins;
for (auto& accumulator : accumulators) {
expectedMins[accumulator.first] = accumulator.second[0];
}
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedMins),
core::CContainerPrinter::print(mins));
bucketStart += bucketLength;
accumulators.clear();
}
addArrival(message, gatherer, m_ResourceMonitor);
accumulators[TStrStrPr(message.s_Person, message.s_Attribute)].add(
message.s_Value);
}
}
BOOST_FIXTURE_TEST_CASE(testMax, CTestFixture) {
// Test that we correctly sample the bucket maximums.
using TMaxAccumulator =
maths::common::CBasicStatistics::COrderStatisticsStack<double, 1U, std::greater<>>;
using TStrStrPrMaxAccumulatorMap = std::map<TStrStrPr, TMaxAccumulator>;
constexpr core_t::TTime startTime = 1373932800;
constexpr core_t::TTime bucketLength = 3600;
TMessageVec messages;
generateTestMessages(startTime, messages);
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
CMetricPopulationModelFactory factory(params, interimBucketCorrector);
model_t::TFeatureVec features{model_t::E_PopulationMaxByPersonAndAttribute};
factory.features(features);
CModelFactory::SGathererInitializationData const initData(startTime);
CModelFactory::TDataGathererPtr const gathererPtr(factory.makeDataGatherer(initData));
CDataGatherer& gatherer(*gathererPtr);
TStrStrPrMaxAccumulatorMap accumulators;
core_t::TTime bucketStart = startTime;
for (auto& message : messages) {
if (message.s_Time >= bucketStart + bucketLength) {
LOG_DEBUG(<< "Processing bucket [" << bucketStart << ", "
<< bucketStart + bucketLength << ")");
TFeatureSizeSizePrFeatureDataPrVecPrVec tmp;
gatherer.featureData(bucketStart, bucketLength, tmp);
BOOST_REQUIRE_EQUAL(features.size(), tmp.size());
BOOST_REQUIRE_EQUAL(features[0], tmp[0].first);
const TSizeSizePrFeatureDataPrVec& data = tmp[0].second;
BOOST_REQUIRE_EQUAL(accumulators.size(), data.size());
TStrStrPrDoubleMap maxs;
for (const auto& j : data) {
if (j.second.s_BucketValue) {
maxs[TStrStrPr(gatherer.personName(j.first.first),
gatherer.attributeName(j.first.second))] =
j.second.s_BucketValue->value()[0];
}
}
TStrStrPrDoubleMap expectedMaxs;
for (auto& accumulator : accumulators) {
expectedMaxs[accumulator.first] = accumulator.second[0];
}
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedMaxs),
core::CContainerPrinter::print(maxs));
bucketStart += bucketLength;
accumulators.clear();
}
addArrival(message, gatherer, m_ResourceMonitor);
accumulators[TStrStrPr(message.s_Person, message.s_Attribute)].add(
message.s_Value);
}
}
BOOST_FIXTURE_TEST_CASE(testSum, CTestFixture) {
// Test that we correctly sample the bucket sums.
constexpr core_t::TTime startTime = 1373932800;
constexpr core_t::TTime bucketLength = 3600;
TMessageVec messages;
generateTestMessages(startTime, messages);
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
CMetricPopulationModelFactory factory(params, interimBucketCorrector);
model_t::TFeatureVec features{model_t::E_PopulationSumByBucketPersonAndAttribute};
factory.features(features);
CModelFactory::SGathererInitializationData const initData(startTime);
CModelFactory::TDataGathererPtr const gathererPtr(factory.makeDataGatherer(initData));
CDataGatherer& gatherer(*gathererPtr);
TStrStrPrDoubleMap expectedSums;
core_t::TTime bucketStart = startTime;
for (auto& message : messages) {
if (message.s_Time >= bucketStart + bucketLength) {
LOG_DEBUG(<< "Processing bucket [" << bucketStart << ", "
<< bucketStart + bucketLength << ")");
TFeatureSizeSizePrFeatureDataPrVecPrVec tmp;
gatherer.featureData(bucketStart, bucketLength, tmp);
BOOST_REQUIRE_EQUAL(features.size(), tmp.size());
BOOST_REQUIRE_EQUAL(features[0], tmp[0].first);
const TSizeSizePrFeatureDataPrVec& data = tmp[0].second;
BOOST_REQUIRE_EQUAL(expectedSums.size(), data.size());
TStrStrPrDoubleMap sums;
for (const auto& j : data) {
if (j.second.s_BucketValue) {
sums[TStrStrPr(gatherer.personName(j.first.first),
gatherer.attributeName(j.first.second))] =
j.second.s_BucketValue->value()[0];
}
}
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedSums),
core::CContainerPrinter::print(sums));
bucketStart += bucketLength;
expectedSums.clear();
}
addArrival(message, gatherer, m_ResourceMonitor);
expectedSums[TStrStrPr(message.s_Person, message.s_Attribute)] +=
message.s_Value;
}
}
BOOST_FIXTURE_TEST_CASE(testSampleCount, CTestFixture) {
// Test that we set sensible sample counts for each attribute.
constexpr core_t::TTime startTime = 1373932800;
constexpr core_t::TTime bucketLength = 3600;
SModelParams const params(bucketLength);
const std::string attribute("c1");
const std::string person("p1");
constexpr std::size_t numberBuckets = 40;
constexpr std::array personMessageCount = {
11, 11, 11, 11, 110, 110, 110, 110, 110, 110, 110, 110, 110, 110,
110, 110, 110, 110, 110, 110, 110, 110, 110, 110, 110, 110, 110, 110,
110, 110, 110, 97, 97, 97, 97, 97, 97, 97, 97, 97};
constexpr std::array expectedSampleCounts = {
0.0, 0.0, 0.0, 11.0, 11.0, 11.0, 11.0, 11.0,
11.0, 11.0, 11.0, 11.0, 11.0, 11.0, 11.0, 11.0,
11.0, 11.0, 11.0, 11.0, 11.0, 11.0, 11.0, 11.0,
11.0, 11.0, 11.0, 11.0, 11.0, 11.0, 11.3597, 11.7164,
12.0701, 12.421, 12.7689, 13.114, 13.4562, 13.7957, 14.1325, 14.4665};
constexpr double tolerance = 5e-4;
TMessageVec messages;
for (std::size_t bucket = 0; bucket < numberBuckets; ++bucket) {
core_t::TTime const bucketStart =
startTime + (static_cast<core_t::TTime>(bucket) * bucketLength);
std::size_t const n = personMessageCount[bucket];
for (std::size_t i = 0; i < n; ++i) {
core_t::TTime const time =
bucketStart + (bucketLength * static_cast<core_t::TTime>(i) /
static_cast<core_t::TTime>(n));
messages.emplace_back(time, person, attribute, 1.0);
}
}
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
CMetricPopulationModelFactory factory(params, interimBucketCorrector);
factory.features({model_t::E_PopulationMeanByPersonAndAttribute});
CModelFactory::SGathererInitializationData const initData(startTime);
CModelFactory::TDataGathererPtr const gathererPtr(factory.makeDataGatherer(initData));
CDataGatherer& gatherer(*gathererPtr);
std::size_t bucket = 0;
for (const auto& message : messages) {
if (core_t::TTime const bucketStart =
startTime + (static_cast<core_t::TTime>(bucket) * bucketLength);
message.s_Time >= bucketStart + bucketLength) {
gatherer.sampleNow(bucketStart);
LOG_DEBUG(<< gatherer.effectiveSampleCount(0));
BOOST_REQUIRE_CLOSE_ABSOLUTE(expectedSampleCounts[bucket],
gatherer.effectiveSampleCount(0), tolerance);
++bucket;
}
addArrival(message, gatherer, m_ResourceMonitor);
}
core_t::TTime const bucketStart =
startTime + (static_cast<core_t::TTime>(bucket) * bucketLength);
gatherer.sampleNow(bucketStart);
BOOST_REQUIRE_CLOSE_ABSOLUTE(expectedSampleCounts[bucket],
gatherer.effectiveSampleCount(0), tolerance);
}
BOOST_FIXTURE_TEST_CASE(testFeatureData, CTestFixture) {
// Test we correctly sample the mean, minimum and maximum statistics.
using TMeanAccumulator = maths::common::CBasicStatistics::SSampleMean<double>::TAccumulator;
using TStrStrPrMeanAccumulatorMap = std::map<TStrStrPr, TMeanAccumulator>;
using TMinAccumulator =
maths::common::CBasicStatistics::COrderStatisticsStack<double, 1U>;
using TStrStrPrMinAccumulatorMap = std::map<TStrStrPr, TMinAccumulator>;
using TMaxAccumulator =
maths::common::CBasicStatistics::COrderStatisticsStack<double, 1U, std::greater<>>;
using TStrStrPrMaxAccumulatorMap = std::map<TStrStrPr, TMaxAccumulator>;
using TStrStrPrDoubleVecMap = std::map<TStrStrPr, TDoubleVec>;
constexpr core_t::TTime startTime = 1373932800;
constexpr core_t::TTime bucketLength = 3600;
TMessageVec messages;
generateTestMessages(startTime, messages);
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
CMetricPopulationModelFactory factory(params, interimBucketCorrector);
model_t::TFeatureVec features;
features.push_back(model_t::E_PopulationMeanByPersonAndAttribute);
features.push_back(model_t::E_PopulationMinByPersonAndAttribute);
features.push_back(model_t::E_PopulationMaxByPersonAndAttribute);
factory.features(features);
CModelFactory::SGathererInitializationData const initData(startTime);
CModelFactory::TDataGathererPtr const gathererPtr(factory.makeDataGatherer(initData));
CDataGatherer& gatherer(*gathererPtr);
TStrStrPrMeanAccumulatorMap bucketMeanAccumulators;
TStrStrPrMeanAccumulatorMap sampleMeanAccumulators;
TStrStrPrDoubleVecMap expectedMeanSamples;
TStrStrPrMinAccumulatorMap bucketMinAccumulators;
TStrStrPrMinAccumulatorMap sampleMinAccumulators;
TStrStrPrDoubleVecMap expectedMinSamples;
TStrStrPrMaxAccumulatorMap bucketMaxAccumulators;
TStrStrPrMaxAccumulatorMap sampleMaxAccumulators;
TStrStrPrDoubleVecMap expectedMaxSamples;
core_t::TTime bucketStart = startTime;
for (auto& message : messages) {
if (message.s_Time >= bucketStart + bucketLength) {
LOG_DEBUG(<< "Processing bucket [" << bucketStart << ", "
<< bucketStart + bucketLength << ")");
gatherer.sampleNow(bucketStart);
TFeatureSizeSizePrFeatureDataPrVecPrVec tmp;
gatherer.featureData(bucketStart, bucketLength, tmp);
BOOST_REQUIRE_EQUAL(static_cast<std::size_t>(3), tmp.size());
BOOST_REQUIRE_EQUAL(model_t::E_PopulationMeanByPersonAndAttribute,
tmp[0].first);
TStrStrPrDoubleMap means;
TStrStrPrDoubleVecMap meanSamples;
for (const auto& data : tmp[0].second) {
TStrStrPr const key(gatherer.personName(data.first.first),
gatherer.attributeName(data.first.second));
if (data.second.s_BucketValue) {
means[key] = data.second.s_BucketValue->value()[0];
}
TDoubleVec& samples = meanSamples[key];
for (const auto& k : core::unwrap_ref(data.second.s_Samples)) {
samples.push_back(k.value()[0]);
}
}
BOOST_REQUIRE_EQUAL(model_t::E_PopulationMinByPersonAndAttribute,
tmp[1].first);
TStrStrPrDoubleMap mins;
TStrStrPrDoubleVecMap minSamples;
for (const auto& data : tmp[1].second) {
TStrStrPr const key(gatherer.personName(data.first.first),
gatherer.attributeName(data.first.second));
if (data.second.s_BucketValue) {
mins[key] = data.second.s_BucketValue->value()[0];
}
TDoubleVec& samples = minSamples[key];
for (const auto& k : core::unwrap_ref(data.second.s_Samples)) {
samples.push_back(k.value()[0]);
}
}
BOOST_REQUIRE_EQUAL(model_t::E_PopulationMaxByPersonAndAttribute,
tmp[2].first);
TStrStrPrDoubleMap maxs;
TStrStrPrDoubleVecMap maxSamples;
for (const auto& data : tmp[2].second) {
TStrStrPr const key(gatherer.personName(data.first.first),
gatherer.attributeName(data.first.second));
if (data.second.s_BucketValue) {
maxs[key] = data.second.s_BucketValue->value()[0];
}
TDoubleVec& samples = maxSamples[key];
for (const auto& k : core::unwrap_ref(data.second.s_Samples)) {
samples.push_back(k.value()[0]);
}
}
TStrStrPrDoubleMap expectedMeans;
for (const auto & [ key, values ] : bucketMeanAccumulators) {
expectedMeans[key] = maths::common::CBasicStatistics::mean(values);
}
TStrStrPrDoubleMap expectedMins;
for (const auto & [ key, values ] : bucketMinAccumulators) {
expectedMins[key] = values[0];
}
TStrStrPrDoubleMap expectedMaxs;
for (const auto & [ key, values ] : bucketMaxAccumulators) {
expectedMaxs[key] = values[0];
}
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedMeans),
core::CContainerPrinter::print(means));
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedMins),
core::CContainerPrinter::print(mins));
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedMaxs),
core::CContainerPrinter::print(maxs));
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedMeanSamples),
core::CContainerPrinter::print(meanSamples));
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedMinSamples),
core::CContainerPrinter::print(minSamples));
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedMaxSamples),
core::CContainerPrinter::print(maxSamples));
bucketStart += bucketLength;
bucketMeanAccumulators.clear();
expectedMeanSamples.clear();
bucketMinAccumulators.clear();
expectedMinSamples.clear();
bucketMaxAccumulators.clear();
expectedMaxSamples.clear();
}
addArrival(message, gatherer, m_ResourceMonitor);
TStrStrPr const key(message.s_Person, message.s_Attribute);
bucketMeanAccumulators[key].add(message.s_Value);
bucketMinAccumulators[key].add(message.s_Value);
bucketMaxAccumulators[key].add(message.s_Value);
expectedMeanSamples.insert(TStrStrPrDoubleVecMap::value_type(key, TDoubleVec()));
expectedMinSamples.insert(TStrStrPrDoubleVecMap::value_type(key, TDoubleVec()));
expectedMaxSamples.insert(TStrStrPrDoubleVecMap::value_type(key, TDoubleVec()));
std::size_t cid;
BOOST_TEST_REQUIRE(gatherer.attributeId(message.s_Attribute, cid));
double const sampleCount = gatherer.effectiveSampleCount(cid);
if (sampleCount > 0.0) {
sampleMeanAccumulators[key].add(message.s_Value);
sampleMinAccumulators[key].add(message.s_Value);
sampleMaxAccumulators[key].add(message.s_Value);
if (maths::common::CBasicStatistics::count(sampleMeanAccumulators[key]) ==
std::floor(sampleCount + 0.5)) {
expectedMeanSamples[key].push_back(
maths::common::CBasicStatistics::mean(sampleMeanAccumulators[key]));
expectedMinSamples[key].push_back(sampleMinAccumulators[key][0]);
expectedMaxSamples[key].push_back(sampleMaxAccumulators[key][0]);
sampleMeanAccumulators[key] = TMeanAccumulator();
sampleMinAccumulators[key] = TMinAccumulator();
sampleMaxAccumulators[key] = TMaxAccumulator();
}
}
}
}
BOOST_FIXTURE_TEST_CASE(testRemovePeople, CTestFixture) {
// Check that all the state is correctly updated when some
// people are removed.
using TSizeVec = std::vector<std::size_t>;
using TSizeUInt64Pr = std::pair<std::size_t, std::uint64_t>;
using TSizeUInt64PrVec = std::vector<TSizeUInt64Pr>;
using TStrFeatureDataPr = std::pair<std::string, SMetricFeatureData>;
using TStrFeatureDataPrVec = std::vector<TStrFeatureDataPr>;
using TStrSizeMap = std::map<std::string, std::uint64_t>;
constexpr core_t::TTime startTime = 1367280000;
constexpr core_t::TTime bucketLength = 3600;
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
model_t::TFeatureVec features;
features.push_back(model_t::E_PopulationMeanByPersonAndAttribute);
features.push_back(model_t::E_PopulationMinByPersonAndAttribute);
features.push_back(model_t::E_PopulationMaxByPersonAndAttribute);
features.push_back(model_t::E_PopulationSumByBucketPersonAndAttribute);
CDataGatherer gatherer = CDataGathererBuilder(model_t::E_PopulationMetric, features,
params, searchKey, startTime)
.build();
TMessageVec messages;
generateTestMessages(startTime, messages);
core_t::TTime bucketStart = startTime;
for (const auto& message : messages) {
if (message.s_Time >= bucketStart + bucketLength) {
LOG_DEBUG(<< "Processing bucket [" << bucketStart << ", "
<< bucketStart + bucketLength << ")");
gatherer.sampleNow(bucketStart);
bucketStart += bucketLength;
}
addArrival(message, gatherer, m_ResourceMonitor);
}
// Remove people 0, 1, 9, 10, 11 and 39 (i.e. last).
TSizeVec peopleToRemove;
peopleToRemove.push_back(0U);
peopleToRemove.push_back(1U);
peopleToRemove.push_back(9U);
peopleToRemove.push_back(10U);
peopleToRemove.push_back(11U);
peopleToRemove.push_back(39U);
std::size_t const numberPeople = gatherer.numberActivePeople();
BOOST_REQUIRE_EQUAL(numberPeople, gatherer.numberOverFieldValues());
TStrVec expectedPersonNames;
TSizeVec expectedPersonIds;
for (std::size_t i = 0; i < numberPeople; ++i) {
if (!std::binary_search(peopleToRemove.begin(), peopleToRemove.end(), i)) {
expectedPersonNames.push_back(gatherer.personName(i));
expectedPersonIds.push_back(i);
} else {
LOG_DEBUG(<< "Removing " << gatherer.personName(i));
}
}
TStrSizeMap expectedNonZeroCounts;
{
TSizeUInt64PrVec nonZeroCounts;
gatherer.personNonZeroCounts(bucketStart, nonZeroCounts);
for (const auto& nonZeroCount : nonZeroCounts) {
if (!std::binary_search(peopleToRemove.begin(),
peopleToRemove.end(), nonZeroCount.first)) {
const std::string& name = gatherer.personName(nonZeroCount.first);
expectedNonZeroCounts[name] = nonZeroCount.second;
}
}
}
LOG_DEBUG(<< "expectedNonZeroCounts = " << expectedNonZeroCounts);
LOG_TRACE(<< "Expected");
TStrFeatureDataPrVec expectedFeatureData;
{
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(bucketStart, bucketLength, featureData);
for (const auto & [ featureType, data ] : featureData) {
for (const auto& j : data) {
if (!std::binary_search(peopleToRemove.begin(),
peopleToRemove.end(), j.first.first)) {
std::string const key = model_t::print(featureType) + " " +
gatherer.personName(j.first.first) + " " +
gatherer.attributeName(j.first.second);
expectedFeatureData.emplace_back(key, j.second);
LOG_TRACE(<< " " << key);
LOG_TRACE(<< " " << j.second.print());
}
}
}
}
gatherer.recyclePeople(peopleToRemove);
BOOST_REQUIRE_EQUAL(numberPeople - peopleToRemove.size(), gatherer.numberActivePeople());
for (std::size_t i = 0; i < expectedPersonNames.size(); ++i) {
std::size_t pid;
BOOST_TEST_REQUIRE(gatherer.personId(expectedPersonNames[i], pid));
BOOST_REQUIRE_EQUAL(expectedPersonIds[i], pid);
}
TStrSizeMap actualNonZeroCounts;
TSizeUInt64PrVec nonZeroCounts;
gatherer.personNonZeroCounts(bucketStart, nonZeroCounts);
for (const auto & [ id, count ] : nonZeroCounts) {
const std::string& name = gatherer.personName(id);
actualNonZeroCounts[name] = count;
}
LOG_DEBUG(<< "actualNonZeroCounts = " << actualNonZeroCounts);
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedNonZeroCounts),
core::CContainerPrinter::print(actualNonZeroCounts));
LOG_TRACE(<< "Actual");
TStrFeatureDataPrVec actualFeatureData;
{
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(bucketStart, bucketLength, featureData);
for (const auto & [ featureType, data ] : featureData) {
for (const auto& j : data) {
std::string const key = model_t::print(featureType) + " " +
gatherer.personName(j.first.first) + " " +
gatherer.attributeName(j.first.second);
actualFeatureData.emplace_back(key, j.second);
LOG_TRACE(<< " " << key);
LOG_TRACE(<< " " << j.second.print());
}
}
}
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedFeatureData),
core::CContainerPrinter::print(actualFeatureData));
}
BOOST_FIXTURE_TEST_CASE(testRemoveAttributes, CTestFixture) {
// Check that all the state is correctly updated when some
// attributes are removed.
using TSizeVec = std::vector<std::size_t>;
using TStrFeatureDataPr = std::pair<std::string, SMetricFeatureData>;
using TStrFeatureDataPrVec = std::vector<TStrFeatureDataPr>;
constexpr core_t::TTime startTime = 1367280000;
constexpr core_t::TTime bucketLength = 3600;
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
model_t::TFeatureVec features;
features.push_back(model_t::E_PopulationMeanByPersonAndAttribute);
features.push_back(model_t::E_PopulationMinByPersonAndAttribute);
features.push_back(model_t::E_PopulationMaxByPersonAndAttribute);
features.push_back(model_t::E_PopulationSumByBucketPersonAndAttribute);
CDataGatherer gatherer = CDataGathererBuilder(model_t::E_PopulationMetric, features,
params, searchKey, startTime)
.build();
TMessageVec messages;
generateTestMessages(startTime, messages);
core_t::TTime bucketStart = startTime;
for (const auto& message : messages) {
if (message.s_Time >= bucketStart + bucketLength) {
LOG_DEBUG(<< "Processing bucket [" << bucketStart << ", "
<< bucketStart + bucketLength << ")");
gatherer.sampleNow(bucketStart);
bucketStart += bucketLength;
}
addArrival(message, gatherer, m_ResourceMonitor);
}
// Remove attributes 0, 2, 3, 9 (i.e. last).
TSizeVec attributesToRemove;
attributesToRemove.push_back(0U);
attributesToRemove.push_back(2U);
attributesToRemove.push_back(3U);
attributesToRemove.push_back(9U);
std::size_t numberAttributes = gatherer.numberActiveAttributes();
BOOST_REQUIRE_EQUAL(numberAttributes, gatherer.numberByFieldValues());
TStrVec expectedAttributeNames;
TSizeVec expectedAttributeIds;
TDoubleVec expectedSampleCounts;
for (std::size_t i = 0; i < numberAttributes; ++i) {
if (!std::binary_search(attributesToRemove.begin(), attributesToRemove.end(), i)) {
expectedAttributeNames.push_back(gatherer.attributeName(i));
expectedAttributeIds.push_back(i);
expectedSampleCounts.push_back(gatherer.effectiveSampleCount(i));
} else {
LOG_DEBUG(<< "Removing " << gatherer.attributeName(i));
}
}
std::string expectedFeatureData;
{
LOG_TRACE(<< "Expected");
TStrFeatureDataPrVec expected;
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(bucketStart, bucketLength, featureData);
for (const auto & [ featureType, data ] : featureData) {
for (const auto& j : data) {
if (!std::binary_search(attributesToRemove.begin(),
attributesToRemove.end(), j.first.second)) {
std::string const key = model_t::print(featureType) + " " +
gatherer.personName(j.first.first) + " " +
gatherer.attributeName(j.first.second);
expected.emplace_back(key, j.second);
LOG_TRACE(<< " " << key);
LOG_TRACE(<< " " << j.second.print());
}
}
}
expectedFeatureData = core::CContainerPrinter::print(expected);
}
gatherer.recycleAttributes(attributesToRemove);
BOOST_REQUIRE_EQUAL(numberAttributes - attributesToRemove.size(),
gatherer.numberActiveAttributes());
for (std::size_t i = 0; i < expectedAttributeNames.size(); ++i) {
std::size_t cid;
BOOST_TEST_REQUIRE(gatherer.attributeId(expectedAttributeNames[i], cid));
BOOST_REQUIRE_EQUAL(expectedAttributeIds[i], cid);
}
numberAttributes = gatherer.numberActiveAttributes();
TDoubleVec actualSampleCounts;
for (std::size_t i = 0; i < numberAttributes; ++i) {
actualSampleCounts.push_back(gatherer.effectiveSampleCount(expectedAttributeIds[i]));
}
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedSampleCounts),
core::CContainerPrinter::print(actualSampleCounts));
std::string actualFeatureData;
{
LOG_TRACE(<< "Actual");
TStrFeatureDataPrVec actual;
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(bucketStart, bucketLength, featureData);
for (const auto & [ featureType, data ] : featureData) {
for (const auto& j : data) {
std::string const key = model_t::print(featureType) + " " +
gatherer.personName(j.first.first) + " " +
gatherer.attributeName(j.first.second);
actual.emplace_back(key, j.second);
LOG_TRACE(<< " " << key);
LOG_TRACE(<< " " << j.second.print());
}
}
actualFeatureData = core::CContainerPrinter::print(actual);
}
BOOST_REQUIRE_EQUAL(expectedFeatureData, actualFeatureData);
}
BOOST_FIXTURE_TEST_CASE(testInfluenceStatistics, CTestFixture) {
using TDoubleDoublePr = std::pair<double, double>;
using TStrDoubleDoublePrPr = std::pair<std::string, TDoubleDoublePr>;
using TStrDoubleDoublePrPrVec = std::vector<TStrDoubleDoublePrPr>;
constexpr core_t::TTime startTime = 0;
constexpr core_t::TTime bucketLength = 600;
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
constexpr std::array influencerNames_ = {"i1", "i2"};
std::array<std::array<std::string, 3>, 2> const influencerValues = {
{{"i11", "i12", "i13"}, {"i21", "i22", "i23"}}};
const std::array data = {
SMessage(1, "p1", "", 1.0, vec(influencerValues[0][0], influencerValues[1][0])), // Bucket 1
SMessage(150, "p1", "", 5.0, vec(influencerValues[0][1], influencerValues[1][1])),
SMessage(150, "p1", "", 3.0, vec(influencerValues[0][2], influencerValues[1][2])),
SMessage(550, "p2", "", 2.0, vec(influencerValues[0][0], influencerValues[1][0])),
SMessage(551, "p2", "", 2.1, vec(influencerValues[0][1], influencerValues[1][1])),
SMessage(552, "p2", "", 4.0, vec(influencerValues[0][2], influencerValues[1][2])),
SMessage(554, "p2", "", 2.2, vec(influencerValues[0][2], influencerValues[1][2])),
SMessage(600, "p1", "", 3.0, vec(influencerValues[0][1], influencerValues[1][0])), // Bucket 2
SMessage(660, "p2", "", 3.0, vec(influencerValues[0][0], influencerValues[1][2])),
SMessage(690, "p1", "", 7.3, vec(influencerValues[0][1], "")),
SMessage(700, "p2", "", 4.0, vec(influencerValues[0][0], influencerValues[1][2])),
SMessage(800, "p1", "", 2.2, vec(influencerValues[0][2], influencerValues[1][0])),
SMessage(900, "p2", "", 2.5, vec(influencerValues[0][1], influencerValues[1][0])),
SMessage(1000, "p1", "", 5.0, vec(influencerValues[0][1], influencerValues[1][0])),
SMessage(1200, "p2", "", 6.4, vec("", influencerValues[1][2])), // Bucket 3
SMessage(1210, "p2", "", 6.0, vec("", influencerValues[1][2])),
SMessage(1240, "p2", "", 7.0, vec("", influencerValues[1][1])),
SMessage(1600, "p2", "", 11.0, vec("", influencerValues[1][0])),
SMessage(1800, "p1", "", 11.0, vec("", "")) // Sentinel
};
std::array expectedStatistics = {
"[(i11, (1, 1)), (i12, (5, 1)), (i13, (3, 1)), (i21, (1, 1)), (i22, (5, 1)), (i23, (3, 1))]",
"[(i11, (2, 1)), (i12, (2.1, 1)), (i13, (3.1, 2)), (i21, (2, 1)), (i22, (2.1, 1)), (i23, (3.1, 2))]",
"[(i11, (1, 1)), (i12, (5, 1)), (i13, (3, 1)), (i21, (1, 1)), (i22, (5, 1)), (i23, (3, 1))]",
"[(i11, (2, 1)), (i12, (2.1, 1)), (i13, (2.2, 1)), (i21, (2, 1)), (i22, (2.1, 1)), (i23, (2.2, 1))]",
"[(i11, (1, 1)), (i12, (5, 1)), (i13, (3, 1)), (i21, (1, 1)), (i22, (5, 1)), (i23, (3, 1))]",
"[(i11, (2, 1)), (i12, (2.1, 1)), (i13, (4, 1)), (i21, (2, 1)), (i22, (2.1, 1)), (i23, (4, 1))]",
"[(i11, (1, 1)), (i12, (5, 1)), (i13, (3, 1)), (i21, (1, 1)), (i22, (5, 1)), (i23, (3, 1))]",
"[(i11, (2, 1)), (i12, (2.1, 1)), (i13, (6.2, 1)), (i21, (2, 1)), (i22, (2.1, 1)), (i23, (6.2, 1))]",
"[(i12, (5.1, 3)), (i13, (2.2, 1)), (i21, (3.4, 3))]",
"[(i11, (3.5, 2)), (i12, (2.5, 1)), (i21, (2.5, 1)), (i23, (3.5, 2))]",
"[(i12, (3, 1)), (i13, (2.2, 1)), (i21, (2.2, 1))]",
"[(i11, (3, 1)), (i12, (2.5, 1)), (i21, (2.5, 1)), (i23, (3, 1))]",
"[(i12, (7.3, 1)), (i13, (2.2, 1)), (i21, (5, 1))]",
"[(i11, (4, 1)), (i12, (2.5, 1)), (i21, (2.5, 1)), (i23, (4, 1))]",
"[(i12, (15.3, 1)), (i13, (2.2, 1)), (i21, (10.2, 1))]",
"[(i11, (7, 1)), (i12, (2.5, 1)), (i21, (2.5, 1)), (i23, (7, 1))]",
"[(i21, (11, 1)), (i22, (7, 1)), (i23, (6.2, 2))]",
"[(i21, (11, 1)), (i22, (7, 1)), (i23, (6, 1))]",
"[(i21, (11, 1)), (i22, (7, 1)), (i23, (6.4, 1))]",
"[(i21, (11, 1)), (i22, (7, 1)), (i23, (12.4, 1))]"};
const char** expected = expectedStatistics.data();
model_t::TFeatureVec features;
features.push_back(model_t::E_PopulationMeanByPersonAndAttribute);
features.push_back(model_t::E_PopulationMinByPersonAndAttribute);
features.push_back(model_t::E_PopulationMaxByPersonAndAttribute);
features.push_back(model_t::E_PopulationHighSumByBucketPersonAndAttribute);
TStrVec const influencerNames(std::begin(influencerNames_), std::end(influencerNames_));
CDataGatherer gatherer = CDataGathererBuilder(model_t::E_PopulationMetric, features,
params, searchKey, startTime)
.influenceFieldNames(influencerNames)
.build();
core_t::TTime bucketStart = startTime;
for (const auto& i : data) {
if (i.s_Time >= bucketStart + bucketLength) {
LOG_DEBUG(<< "*** processing bucket ***");
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(bucketStart, bucketLength, featureData);
for (const auto & [ feature, data_ ] : featureData) {
LOG_DEBUG(<< "feature = " << model_t::print(feature));
for (std::size_t k = 0; k < data_.size(); ++k) {
TStrDoubleDoublePrPrVec statistics;
for (const auto& s_InfluenceValue : data_[k].second.s_InfluenceValues) {
for (const auto& n : s_InfluenceValue) {
statistics.emplace_back(
n.first,
TDoubleDoublePr(n.second.first[0], n.second.second));
}
}
std::sort(statistics.begin(), statistics.end(),
maths::common::COrderings::SFirstLess());
LOG_DEBUG(<< "statistics = " << statistics);
LOG_DEBUG(<< "expected = " << *expected);
BOOST_REQUIRE_EQUAL(*(expected++),
core::CContainerPrinter::print(statistics));
}
}
bucketStart += bucketLength;
}
addArrival(i, gatherer, m_ResourceMonitor);
}
}
BOOST_FIXTURE_TEST_CASE(testPersistence, CTestFixture) {
constexpr core_t::TTime startTime = 1367280000;
constexpr core_t::TTime bucketLength = 3600;
SModelParams params(bucketLength);
params.s_DecayRate = 0.001;
model_t::TFeatureVec features;
features.push_back(model_t::E_PopulationMeanByPersonAndAttribute);
features.push_back(model_t::E_PopulationMinByPersonAndAttribute);
features.push_back(model_t::E_PopulationMaxByPersonAndAttribute);
features.push_back(model_t::E_PopulationHighSumByBucketPersonAndAttribute);
CDataGatherer origDataGatherer =
CDataGathererBuilder(model_t::E_PopulationMetric, features, params, searchKey, startTime)
.build();
TMessageVec messages;
generateTestMessages(startTime, messages);
core_t::TTime bucketStart = startTime;
for (const auto& message : messages) {
if (message.s_Time >= bucketStart + bucketLength) {
LOG_DEBUG(<< "Processing bucket [" << bucketStart << ", "
<< bucketStart + bucketLength << ")");
origDataGatherer.sampleNow(bucketStart);
bucketStart += bucketLength;
}
addArrival(message, origDataGatherer, m_ResourceMonitor);
}
origDataGatherer.sampleNow(bucketStart);
std::ostringstream origJson;
core::CJsonStatePersistInserter::persist(
origJson, [&origDataGatherer](core::CJsonStatePersistInserter& inserter) {
origDataGatherer.acceptPersistInserter(inserter);
});
LOG_DEBUG(<< "origJson length = " << origJson.str().size());
BOOST_TEST_REQUIRE(origJson.str().size() < 292000);
// Restore the JSON into a new data gatherer
// The traverser expects the state json in a embedded document
std::stringstream origJsonStrm{"{\"topLevel\" : " + origJson.str() + "}"};
core::CJsonStateRestoreTraverser traverser(origJsonStrm);
CBucketGatherer::SBucketGathererInitData bucketGathererInitData{
EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, {}, 0, 0};
CDataGatherer restoredDataGatherer(model_t::E_PopulationMetric, model_t::E_None,
params, EMPTY_STRING, searchKey,
bucketGathererInitData, traverser);
// The JSON representation of the new data gatherer should be the same as the
// original
std::ostringstream newJson;
core::CJsonStatePersistInserter::persist(
newJson, [&restoredDataGatherer](core::CJsonStatePersistInserter& inserter) {
restoredDataGatherer.acceptPersistInserter(inserter);
});
LOG_DEBUG(<< "newJson length = " << newJson.str().size());
BOOST_REQUIRE_EQUAL(origJson.str(), newJson.str());
}
BOOST_FIXTURE_TEST_CASE(testReleaseMemory, CTestFixture) {
constexpr core_t::TTime startTime = 1373932800;
constexpr core_t::TTime bucketLength = 3600;
SModelParams params(bucketLength);
params.s_LatencyBuckets = 3;
auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength);
CMetricPopulationModelFactory factory(params, interimBucketCorrector);
factory.features({model_t::E_PopulationMeanByPersonAndAttribute});
CModelFactory::SGathererInitializationData const initData(startTime);
CModelFactory::TDataGathererPtr const gathererPtr(factory.makeDataGatherer(initData));
CDataGatherer& gatherer(*gathererPtr);
BOOST_TEST_REQUIRE(gatherer.isPopulation());
core_t::TTime bucketStart = startTime;
// Add a few buckets with count of 10 so that sample count gets estimated
for (std::size_t i = 0; i < 10; ++i) {
// Add 10 events
for (std::size_t j = 0; j < 10; ++j) {
addArrival(SMessage(bucketStart, "p1", "", 10.0), gatherer, m_ResourceMonitor);
addArrival(SMessage(bucketStart, "p2", "", 10.0), gatherer, m_ResourceMonitor);
}
gatherer.sampleNow(bucketStart - (params.s_LatencyBuckets * bucketLength));
bucketStart += bucketLength;
}
// Add a bucket with not enough data to sample for p2
for (std::size_t j = 0; j < 10; ++j) {
addArrival(SMessage(bucketStart, "p1", "", 10.0), gatherer, m_ResourceMonitor);
}
addArrival(SMessage(bucketStart, "p2", "", 10.0), gatherer, m_ResourceMonitor);
gatherer.sampleNow(bucketStart - (params.s_LatencyBuckets * bucketLength));
bucketStart += bucketLength;
std::size_t const mem = gatherer.memoryUsage();
// Add 48 + 1 buckets ( > 2 days) to force incomplete samples out of consideration for p2
for (std::size_t i = 0; i < 49 + 1; ++i) {
for (std::size_t j = 0; j < 10; ++j) {
addArrival(SMessage(bucketStart, "p1", "", 10.0), gatherer, m_ResourceMonitor);
}
gatherer.sampleNow(bucketStart - (params.s_LatencyBuckets * bucketLength));
bucketStart += bucketLength;
gatherer.releaseMemory(bucketStart - params.s_SamplingAgeCutoff);
if (i <= 40) {
BOOST_TEST_REQUIRE(gatherer.memoryUsage() >= mem - 1000);
}
}
BOOST_TEST_REQUIRE(gatherer.memoryUsage() < mem - 1000);
}
BOOST_AUTO_TEST_SUITE_END()