lib/model/unittest/CEventRatePopulationDataGathererTest.cc (692 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <core/CJsonStatePersistInserter.h>
#include <core/CJsonStateRestoreTraverser.h>
#include <core/CLogger.h>
#include <core/CompressUtils.h>
#include <model/CDataGatherer.h>
#include <model/CEventData.h>
#include <model/CEventRateBucketGatherer.h>
#include <model/CResourceMonitor.h>
#include <model/SModelParams.h>
#include <test/BoostTestCloseAbsolute.h>
#include <test/CRandomNumbers.h>
#include "ModelTestHelpers.h"
#include <boost/lexical_cast.hpp>
#include <boost/test/unit_test.hpp>
#include <algorithm>
#include <memory>
#include <ranges>
#include <set>
#include <utility>
#include <vector>
BOOST_AUTO_TEST_SUITE(CEventRatePopulationDataGathererTest)
using namespace ml;
using namespace model;
namespace {
struct SMessage {
SMessage(core_t::TTime time, std::string attribute, std::string person)
: s_Time(time), s_Attribute(std::move(attribute)),
s_Person(std::move(person)) {}
bool operator<(const SMessage& other) const {
return s_Time < other.s_Time;
}
core_t::TTime s_Time;
std::string s_Attribute;
std::string s_Person;
};
using TMessageVec = std::vector<SMessage>;
using TSizeVec = std::vector<std::size_t>;
using TStrVec = std::vector<std::string>;
using TSizeSizePr = std::pair<std::size_t, std::size_t>;
using TSizeSizePrUInt64Map = std::map<TSizeSizePr, std::uint64_t>;
using TSizeSet = std::set<std::size_t>;
using TSizeSizeSetMap = std::map<std::size_t, TSizeSet>;
using TStrSet = std::set<std::string>;
using TSizeStrSetMap = std::map<std::size_t, TStrSet>;
using TFeatureData = SEventRateFeatureData;
using TStrFeatureDataPr = std::pair<std::string, TFeatureData>;
using TStrFeatureDataPrVec = std::vector<TStrFeatureDataPr>;
using TSizeSizePrFeatureDataPr = std::pair<TSizeSizePr, TFeatureData>;
using TSizeSizePrFeatureDataPrVec = std::vector<TSizeSizePrFeatureDataPr>;
using TFeatureSizeSizePrFeatureDataPrVecPr =
std::pair<model_t::EFeature, TSizeSizePrFeatureDataPrVec>;
using TFeatureSizeSizePrFeatureDataPrVecPrVec = std::vector<TFeatureSizeSizePrFeatureDataPrVecPr>;
TStrVec allCategories() {
constexpr std::size_t numberCategories = 30;
TStrVec categories;
for (std::size_t i = 0; i < numberCategories; ++i) {
categories.push_back("c" + boost::lexical_cast<std::string>(i));
}
return categories;
}
TStrVec allPeople() {
constexpr std::size_t numberPeople = 5;
TStrVec people;
for (std::size_t i = 0; i < numberPeople; ++i) {
people.push_back("p" + boost::lexical_cast<std::string>(i));
}
return people;
}
void generateTestMessages(test::CRandomNumbers& rng,
core_t::TTime time,
core_t::TTime bucketLength,
TMessageVec& messages) {
using TUIntVec = std::vector<unsigned int>;
using TDoubleVec = std::vector<double>;
LOG_DEBUG(<< "bucket = [" << time << ", " << time + bucketLength << ")");
TStrVec categories = allCategories();
TStrVec people = allPeople();
const TDoubleVec rates{1.0, 0.3, 10.1, 25.0, 105.0};
TSizeVec bucketCounts;
for (std::size_t j = 0; j < categories.size(); ++j) {
double const rate = rates[j % rates.size()];
TUIntVec sample;
rng.generatePoissonSamples(rate, 1, sample);
bucketCounts.push_back(sample[0]);
}
TDoubleVec personRange;
rng.generateUniformSamples(0.0, static_cast<double>(people.size()) - 1e-3, 2U, personRange);
std::sort(personRange.begin(), personRange.end());
auto const a = static_cast<std::size_t>(personRange[0]);
std::size_t const b = static_cast<std::size_t>(personRange[1]) + 1;
TSizeVec bucketPeople;
for (std::size_t i = a; i < b; ++i) {
bucketPeople.push_back(i);
}
LOG_DEBUG(<< "bucketPeople = " << bucketPeople);
for (std::size_t i = 0; i < categories.size(); ++i) {
TDoubleVec offsets;
rng.generateUniformSamples(0.0, static_cast<double>(bucketLength) - 1.0,
bucketCounts[i], offsets);
for (std::size_t j = 0; j < offsets.size(); ++j) {
messages.emplace_back(time + static_cast<core_t::TTime>(offsets[j]),
categories[i],
people[bucketPeople[j % bucketPeople.size()]]);
}
}
std::sort(messages.begin(), messages.end(), [](const SMessage& lhs, const SMessage& rhs) {
return lhs.s_Time < rhs.s_Time;
});
LOG_DEBUG(<< "Generated " << messages.size() << " messages");
}
const TSizeSizePrFeatureDataPrVec&
extract(const TFeatureSizeSizePrFeatureDataPrVecPrVec& featureData, model_t::EFeature feature) {
for (const auto& i : featureData) {
if (i.first == feature) {
return i.second;
}
}
static const TSizeSizePrFeatureDataPrVec EMPTY;
return EMPTY;
}
const TSizeSizePrFeatureDataPrVec&
extractPeoplePerAttribute(const TFeatureSizeSizePrFeatureDataPrVecPrVec& featureData) {
return extract(featureData, model_t::E_PopulationUniquePersonCountByAttribute);
}
const TSizeSizePrFeatureDataPrVec&
extractNonZeroAttributeCounts(const TFeatureSizeSizePrFeatureDataPrVecPrVec& featureData) {
return extract(featureData, model_t::E_PopulationCountByBucketPersonAndAttribute);
}
const TSizeSizePrFeatureDataPrVec&
extractAttributeIndicator(const TFeatureSizeSizePrFeatureDataPrVecPrVec& featureData) {
return extract(featureData, model_t::E_PopulationIndicatorOfBucketPersonAndAttribute);
}
const TSizeSizePrFeatureDataPrVec&
extractBucketAttributesPerPerson(const TFeatureSizeSizePrFeatureDataPrVecPrVec& featureData) {
return extract(featureData, model_t::E_PopulationUniqueCountByBucketPersonAndAttribute);
}
const TSizeSizePrFeatureDataPrVec&
extractCompressedLengthPerPerson(const TFeatureSizeSizePrFeatureDataPrVecPrVec& featureData) {
return extract(featureData, model_t::E_PopulationInfoContentByBucketPersonAndAttribute);
}
CEventData addArrival(core_t::TTime time,
const std::string& p,
const std::string& a,
CDataGatherer& gatherer,
CResourceMonitor& resourceMonitor) {
CDataGatherer::TStrCPtrVec fields;
fields.push_back(&p);
fields.push_back(&a);
CEventData result;
result.time(time);
gatherer.addArrival(fields, result, resourceMonitor);
return result;
}
CEventData addArrival(core_t::TTime time,
const std::string& p,
const std::string& a,
const std::string& v,
CDataGatherer& gatherer,
CResourceMonitor& resourceMonitor) {
CDataGatherer::TStrCPtrVec fields;
fields.push_back(&p);
fields.push_back(&a);
fields.push_back(&v);
CEventData result;
result.time(time);
gatherer.addArrival(fields, result, resourceMonitor);
return result;
}
CSearchKey searchKey;
const std::string EMPTY_STRING;
}
class CTestFixture {
protected:
CResourceMonitor m_ResourceMonitor;
};
BOOST_FIXTURE_TEST_CASE(testAttributeCounts, CTestFixture) {
// We check that we correctly sample the unique people per
// attribute and (attribute, person) pair counts.
using TStrSizeMap = std::map<std::string, std::size_t>;
constexpr core_t::TTime startTime = 1367280000;
constexpr core_t::TTime bucketLength = 3600;
constexpr std::size_t numberBuckets = 20;
test::CRandomNumbers rng;
CDataGatherer::TFeatureVec features;
features.push_back(model_t::E_PopulationCountByBucketPersonAndAttribute);
features.push_back(model_t::E_PopulationUniquePersonCountByAttribute);
SModelParams const params(bucketLength);
CDataGatherer dataGatherer = CDataGathererBuilder(model_t::E_PopulationEventRate,
features, params, searchKey, startTime)
.build();
BOOST_TEST_REQUIRE(dataGatherer.isPopulation());
BOOST_REQUIRE_EQUAL(startTime, dataGatherer.currentBucketStartTime());
BOOST_REQUIRE_EQUAL(bucketLength, dataGatherer.bucketLength());
TSizeSizeSetMap expectedAttributePeople;
std::size_t attributeOrder = 0;
TStrSizeMap expectedAttributeOrder;
std::size_t personOrder = 0;
TStrSizeMap expectedPeopleOrder;
core_t::TTime time = startTime;
for (std::size_t i = 0; i < numberBuckets; ++i, time += bucketLength) {
TMessageVec messages;
generateTestMessages(rng, time, bucketLength, messages);
TSizeSizePrUInt64Map expectedAttributeCounts;
for (const auto& message : messages) {
addArrival(message.s_Time, message.s_Person, message.s_Attribute,
dataGatherer, m_ResourceMonitor);
std::size_t cid;
dataGatherer.attributeId(message.s_Attribute, cid);
std::size_t pid;
dataGatherer.personId(message.s_Person, pid);
++expectedAttributeCounts[std::make_pair(pid, cid)];
expectedAttributePeople[cid].insert(pid);
if (expectedAttributeOrder
.try_emplace(message.s_Attribute, attributeOrder)
.second) {
++attributeOrder;
}
if (expectedPeopleOrder.try_emplace(message.s_Person, personOrder).second) {
++personOrder;
}
}
BOOST_TEST_REQUIRE(dataGatherer.dataAvailable(time));
BOOST_TEST_REQUIRE(!dataGatherer.dataAvailable(time - 1));
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
dataGatherer.featureData(time, bucketLength, featureData);
const TSizeSizePrFeatureDataPrVec& peoplePerAttribute =
extractPeoplePerAttribute(featureData);
BOOST_REQUIRE_EQUAL(expectedAttributePeople.size(), peoplePerAttribute.size());
TSizeSizePrFeatureDataPrVec expectedPeoplePerAttribute;
for (std::size_t j = 0; j < peoplePerAttribute.size(); ++j) {
expectedPeoplePerAttribute.emplace_back(
std::make_pair(static_cast<size_t>(0), j),
expectedAttributePeople[j].size());
}
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedPeoplePerAttribute),
core::CContainerPrinter::print(peoplePerAttribute));
const TSizeSizePrFeatureDataPrVec& personAttributeCounts =
extractNonZeroAttributeCounts(featureData);
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedAttributeCounts),
core::CContainerPrinter::print(personAttributeCounts));
const TSizeSizePrFeatureDataPrVec& attributeIndicator =
extractAttributeIndicator(featureData);
BOOST_TEST_REQUIRE(attributeIndicator.empty());
const TSizeSizePrFeatureDataPrVec& bucketAttributesPerPerson =
extractBucketAttributesPerPerson(featureData);
BOOST_TEST_REQUIRE(bucketAttributesPerPerson.empty());
dataGatherer.timeNow(time + bucketLength);
}
TStrVec const categories = allCategories();
TSizeVec attributeIds;
for (const auto& category : categories) {
std::size_t cid;
BOOST_TEST_REQUIRE(dataGatherer.attributeId(category, cid));
attributeIds.push_back(cid);
BOOST_REQUIRE_EQUAL(expectedAttributeOrder[category], cid);
}
LOG_DEBUG(<< "attribute ids = " << attributeIds);
LOG_DEBUG(<< "expected attribute ids = " << expectedAttributeOrder);
TStrVec const people = allPeople();
TSizeVec peopleIds;
for (const auto& i : people) {
std::size_t pid;
BOOST_TEST_REQUIRE(dataGatherer.personId(i, pid));
peopleIds.push_back(pid);
BOOST_REQUIRE_EQUAL(expectedPeopleOrder[i], pid);
}
LOG_DEBUG(<< "people ids = " << peopleIds);
LOG_DEBUG(<< "expected people ids = " << expectedPeopleOrder);
}
BOOST_FIXTURE_TEST_CASE(testAttributeIndicator, CTestFixture) {
// We check that we correctly sample the (attribute, person)
// indicator.
constexpr core_t::TTime startTime = 1367280000;
constexpr core_t::TTime bucketLength = 3600;
constexpr std::size_t numberBuckets = 20;
test::CRandomNumbers rng;
CDataGatherer::TFeatureVec features;
features.push_back(model_t::E_PopulationIndicatorOfBucketPersonAndAttribute);
SModelParams const params(bucketLength);
CDataGatherer dataGatherer = CDataGathererBuilder(model_t::E_PopulationEventRate,
features, params, searchKey, startTime)
.build();
core_t::TTime time = startTime;
for (std::size_t i = 0; i < numberBuckets; ++i, time += bucketLength) {
TMessageVec messages;
generateTestMessages(rng, time, bucketLength, messages);
TSizeSizePrUInt64Map expectedAttributeIndicator;
for (auto const& message : messages) {
addArrival(message.s_Time, message.s_Person, message.s_Attribute,
dataGatherer, m_ResourceMonitor);
std::size_t cid;
dataGatherer.attributeId(message.s_Attribute, cid);
std::size_t pid;
dataGatherer.personId(message.s_Person, pid);
expectedAttributeIndicator[std::make_pair(pid, cid)] = 1;
}
BOOST_TEST_REQUIRE(dataGatherer.dataAvailable(time));
BOOST_TEST_REQUIRE(!dataGatherer.dataAvailable(time - 1));
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
dataGatherer.featureData(time, bucketLength, featureData);
const TSizeSizePrFeatureDataPrVec& peoplePerAttribute =
extractPeoplePerAttribute(featureData);
BOOST_TEST_REQUIRE(peoplePerAttribute.empty());
const TSizeSizePrFeatureDataPrVec& attributeIndicator =
extractAttributeIndicator(featureData);
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedAttributeIndicator),
core::CContainerPrinter::print(attributeIndicator));
const TSizeSizePrFeatureDataPrVec& bucketAttributesPerPerson =
extractBucketAttributesPerPerson(featureData);
BOOST_TEST_REQUIRE(bucketAttributesPerPerson.empty());
dataGatherer.timeNow(time + bucketLength);
}
}
BOOST_FIXTURE_TEST_CASE(testUniqueValueCounts, CTestFixture) {
// We check that we correctly sample the unique counts
// of values per person.
constexpr core_t::TTime startTime = 1367280000;
constexpr core_t::TTime bucketLength = 3600;
constexpr std::size_t numberBuckets = 20;
test::CRandomNumbers rng;
CDataGatherer::TFeatureVec features;
features.push_back(model_t::E_PopulationUniqueCountByBucketPersonAndAttribute);
SModelParams const params(bucketLength);
CDataGatherer dataGatherer = CDataGathererBuilder(model_t::E_PopulationEventRate,
features, params, searchKey, startTime)
.valueFieldName("value")
.build();
core_t::TTime time = startTime;
for (std::size_t i = 0; i < numberBuckets; ++i, time += bucketLength) {
TMessageVec messages;
generateTestMessages(rng, time, bucketLength, messages);
TSizeSizePrUInt64Map expectedUniqueCounts;
TSizeSizeSetMap bucketPeopleCategories;
for (auto const& message : messages) {
std::ostringstream ss;
ss << "thing"
<< "_" << time << "_" << i;
std::string const value(ss.str());
addArrival(message.s_Time, message.s_Person, message.s_Attribute,
value, dataGatherer, m_ResourceMonitor);
std::size_t cid;
dataGatherer.attributeId(message.s_Attribute, cid);
std::size_t pid;
dataGatherer.personId(message.s_Person, pid);
expectedUniqueCounts[std::make_pair(pid, cid)] = 1;
bucketPeopleCategories[pid].insert(cid);
}
BOOST_TEST_REQUIRE(dataGatherer.dataAvailable(time));
BOOST_TEST_REQUIRE(!dataGatherer.dataAvailable(time - 1));
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
dataGatherer.featureData(time, bucketLength, featureData);
const TSizeSizePrFeatureDataPrVec& peoplePerAttribute =
extractPeoplePerAttribute(featureData);
BOOST_TEST_REQUIRE(peoplePerAttribute.empty());
const TSizeSizePrFeatureDataPrVec& attributeIndicator =
extractAttributeIndicator(featureData);
BOOST_TEST_REQUIRE(attributeIndicator.empty());
const TSizeSizePrFeatureDataPrVec& bucketAttributesPerPerson =
extractBucketAttributesPerPerson(featureData);
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedUniqueCounts),
core::CContainerPrinter::print(bucketAttributesPerPerson));
dataGatherer.timeNow(time + bucketLength);
}
}
BOOST_FIXTURE_TEST_CASE(testCompressedLength, CTestFixture) {
// We check that we correctly sample the compressed length of unique
// values per person.
constexpr core_t::TTime startTime = 1367280000;
constexpr core_t::TTime bucketLength = 3600;
constexpr std::size_t numberBuckets = 20;
test::CRandomNumbers rng;
CDataGatherer::TFeatureVec features;
features.push_back(model_t::E_PopulationInfoContentByBucketPersonAndAttribute);
SModelParams const params(bucketLength);
CDataGatherer dataGatherer = CDataGathererBuilder(model_t::E_PopulationEventRate,
features, params, searchKey, startTime)
.valueFieldName("value")
.build();
core_t::TTime time = startTime;
for (std::size_t i = 0; i < numberBuckets; ++i, time += bucketLength) {
TMessageVec messages;
generateTestMessages(rng, time, bucketLength, messages);
TSizeStrSetMap bucketPeopleCategories;
for (auto const& message : messages) {
addArrival(message.s_Time, message.s_Person, "attribute",
message.s_Attribute, dataGatherer, m_ResourceMonitor);
std::size_t cid;
dataGatherer.attributeId(message.s_Attribute, cid);
std::size_t pid;
dataGatherer.personId(message.s_Person, pid);
bucketPeopleCategories[pid].insert(message.s_Attribute);
}
BOOST_TEST_REQUIRE(dataGatherer.dataAvailable(time));
BOOST_TEST_REQUIRE(!dataGatherer.dataAvailable(time - 1));
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
dataGatherer.featureData(time, bucketLength, featureData);
const TSizeSizePrFeatureDataPrVec& peoplePerAttribute =
extractPeoplePerAttribute(featureData);
BOOST_TEST_REQUIRE(peoplePerAttribute.empty());
const TSizeSizePrFeatureDataPrVec& attributeIndicator =
extractAttributeIndicator(featureData);
BOOST_TEST_REQUIRE(attributeIndicator.empty());
const TSizeSizePrFeatureDataPrVec& bucketCompressedLengthPerPerson =
extractCompressedLengthPerPerson(featureData);
BOOST_REQUIRE_EQUAL(bucketPeopleCategories.size(),
bucketCompressedLengthPerPerson.size());
TSizeSizePrUInt64Map expectedBucketCompressedLengthPerPerson;
for (auto const& bucketPeopleCategorie : bucketPeopleCategories) {
TSizeSizePr const key(bucketPeopleCategorie.first, 0);
const TStrSet& uniqueValues = bucketPeopleCategorie.second;
core::CDeflator compressor(false);
BOOST_REQUIRE_EQUAL(
uniqueValues.size(),
static_cast<size_t>(std::count_if(
uniqueValues.begin(), uniqueValues.end(),
std::bind_front(&core::CCompressUtil::addString, &compressor))));
std::size_t length(0);
BOOST_TEST_REQUIRE(compressor.length(true, length));
expectedBucketCompressedLengthPerPerson[key] = length;
}
LOG_DEBUG(<< "Time " << time << " bucketCompressedLengthPerPerson "
<< bucketCompressedLengthPerPerson);
BOOST_REQUIRE_EQUAL(expectedBucketCompressedLengthPerPerson.size(),
bucketCompressedLengthPerPerson.size());
for (const auto& j : bucketCompressedLengthPerPerson) {
auto const expectedLength =
static_cast<double>(expectedBucketCompressedLengthPerPerson[j.first]);
auto const actual = static_cast<double>(j.second.s_Count);
BOOST_REQUIRE_CLOSE_ABSOLUTE(expectedLength, actual, expectedLength * 0.1);
}
dataGatherer.timeNow(time + bucketLength);
}
}
BOOST_FIXTURE_TEST_CASE(testRemovePeople, CTestFixture) {
using TStrSizeMap = std::map<std::string, std::size_t>;
using TSizeUInt64Pr = std::pair<std::size_t, std::uint64_t>;
using TSizeUInt64PrVec = std::vector<TSizeUInt64Pr>;
constexpr core_t::TTime startTime = 1367280000;
constexpr core_t::TTime bucketLength = 3600;
constexpr std::size_t numberBuckets = 50;
test::CRandomNumbers rng;
CDataGatherer::TFeatureVec features;
features.push_back(model_t::E_PopulationCountByBucketPersonAndAttribute);
SModelParams const params(bucketLength);
CDataGatherer gatherer = CDataGathererBuilder(model_t::E_PopulationEventRate,
features, params, searchKey, startTime)
.build();
core_t::TTime bucketStart = startTime;
for (std::size_t i = 0; i < numberBuckets; ++i, bucketStart += bucketLength) {
TMessageVec messages;
generateTestMessages(rng, bucketStart, bucketLength, messages);
for (auto const& message : messages) {
addArrival(message.s_Time, message.s_Person, message.s_Attribute,
gatherer, m_ResourceMonitor);
}
}
// Remove people 1 and 4.
TSizeVec peopleToRemove;
peopleToRemove.push_back(1U);
peopleToRemove.push_back(4U);
std::size_t const numberPeople = gatherer.numberActivePeople();
LOG_DEBUG(<< "numberPeople = " << numberPeople);
BOOST_REQUIRE_EQUAL(numberPeople, gatherer.numberOverFieldValues());
TStrVec expectedPersonNames;
TSizeVec expectedPersonIds;
for (std::size_t i = 0; i < numberPeople; ++i) {
if (!std::binary_search(peopleToRemove.begin(), peopleToRemove.end(), i)) {
expectedPersonNames.push_back(gatherer.personName(i));
expectedPersonIds.push_back(i);
} else {
LOG_DEBUG(<< "Removing " << gatherer.personName(i));
}
}
TStrSizeMap expectedNonZeroCounts;
{
TSizeUInt64PrVec nonZeroCounts;
gatherer.personNonZeroCounts(bucketStart - bucketLength, nonZeroCounts);
for (auto const& nonZeroCount : nonZeroCounts) {
if (!std::binary_search(peopleToRemove.begin(),
peopleToRemove.end(), nonZeroCount.first)) {
const std::string& name = gatherer.personName(nonZeroCount.first);
expectedNonZeroCounts[name] = nonZeroCount.second;
}
}
}
LOG_DEBUG(<< "expectedNonZeroCounts = " << expectedNonZeroCounts);
std::string expectedFeatureData;
{
LOG_DEBUG(<< "Expected");
TStrFeatureDataPrVec expected;
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(bucketStart - bucketLength, bucketLength, featureData);
for (auto const& i : featureData) {
const TSizeSizePrFeatureDataPrVec& data = i.second;
for (const auto& j : data) {
if (!std::binary_search(peopleToRemove.begin(),
peopleToRemove.end(), j.first.first)) {
std::string const key = model_t::print(i.first) + " " +
gatherer.personName(j.first.first) + " " +
gatherer.attributeName(j.first.second);
expected.emplace_back(key, j.second);
LOG_DEBUG(<< " " << key << " = " << j.second.s_Count);
}
}
}
expectedFeatureData = core::CContainerPrinter::print(expected);
}
gatherer.recyclePeople(peopleToRemove);
BOOST_REQUIRE_EQUAL(numberPeople - peopleToRemove.size(), gatherer.numberActivePeople());
for (std::size_t i = 0; i < expectedPersonNames.size(); ++i) {
std::size_t pid;
BOOST_TEST_REQUIRE(gatherer.personId(expectedPersonNames[i], pid));
BOOST_REQUIRE_EQUAL(expectedPersonIds[i], pid);
}
TStrSizeMap actualNonZeroCounts;
TSizeUInt64PrVec nonZeroCounts;
gatherer.personNonZeroCounts(bucketStart - bucketLength, nonZeroCounts);
for (auto const& nonZeroCount : nonZeroCounts) {
const std::string& name = gatherer.personName(nonZeroCount.first);
actualNonZeroCounts[name] = nonZeroCount.second;
}
LOG_DEBUG(<< "actualNonZeroCounts = " << actualNonZeroCounts);
BOOST_REQUIRE_EQUAL(core::CContainerPrinter::print(expectedNonZeroCounts),
core::CContainerPrinter::print(actualNonZeroCounts));
std::string actualFeatureData;
{
LOG_DEBUG(<< "Actual");
TStrFeatureDataPrVec actual;
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(bucketStart - bucketLength, bucketLength, featureData);
for (auto const& i : featureData) {
const TSizeSizePrFeatureDataPrVec& data = i.second;
for (const auto& j : data) {
std::string const key = model_t::print(i.first) + " " +
gatherer.personName(j.first.first) + " " +
gatherer.attributeName(j.first.second);
actual.emplace_back(key, j.second);
LOG_DEBUG(<< " " << key << " = " << j.second.s_Count);
}
}
actualFeatureData = core::CContainerPrinter::print(actual);
}
BOOST_REQUIRE_EQUAL(expectedFeatureData, actualFeatureData);
}
BOOST_FIXTURE_TEST_CASE(testRemoveAttributes, CTestFixture) {
constexpr core_t::TTime startTime = 1367280000;
constexpr core_t::TTime bucketLength = 3600;
test::CRandomNumbers rng;
CDataGatherer::TFeatureVec features;
features.push_back(model_t::E_PopulationCountByBucketPersonAndAttribute);
features.push_back(model_t::E_PopulationUniquePersonCountByAttribute);
SModelParams const params(bucketLength);
CDataGatherer gatherer = CDataGathererBuilder(model_t::E_PopulationEventRate,
features, params, searchKey, startTime)
.build();
TMessageVec messages;
generateTestMessages(rng, startTime, bucketLength, messages);
constexpr core_t::TTime bucketStart = startTime;
for (auto const& message : messages) {
addArrival(message.s_Time, message.s_Person, message.s_Attribute,
gatherer, m_ResourceMonitor);
}
// Remove attributes 1, 2, 3 and 15.
TSizeVec attributesToRemove;
attributesToRemove.push_back(1U);
attributesToRemove.push_back(2U);
attributesToRemove.push_back(3U);
attributesToRemove.push_back(15U);
std::size_t const numberAttributes = gatherer.numberActiveAttributes();
LOG_DEBUG(<< "numberAttributes = " << numberAttributes);
BOOST_REQUIRE_EQUAL(numberAttributes, gatherer.numberByFieldValues());
TStrVec expectedAttributeNames;
TSizeVec expectedAttributeIds;
for (std::size_t i = 0; i < numberAttributes; ++i) {
if (!std::binary_search(attributesToRemove.begin(), attributesToRemove.end(), i)) {
expectedAttributeNames.push_back(gatherer.attributeName(i));
expectedAttributeIds.push_back(i);
} else {
LOG_DEBUG(<< "Removing " << gatherer.attributeName(i));
}
}
std::string expectedFeatureData;
{
LOG_DEBUG(<< "Expected");
TStrFeatureDataPrVec expected;
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(bucketStart, bucketLength, featureData);
for (auto const& i : featureData) {
const TSizeSizePrFeatureDataPrVec& data = i.second;
for (const auto& j : data) {
if (!std::binary_search(attributesToRemove.begin(),
attributesToRemove.end(), j.first.second)) {
std::string const key = model_t::print(i.first) + " " +
gatherer.personName(j.first.first) + " " +
gatherer.attributeName(j.first.second);
expected.emplace_back(key, j.second);
LOG_DEBUG(<< " " << key << " = " << j.second.s_Count);
}
}
}
expectedFeatureData = core::CContainerPrinter::print(expected);
}
gatherer.recycleAttributes(attributesToRemove);
BOOST_REQUIRE_EQUAL(numberAttributes - attributesToRemove.size(),
gatherer.numberActiveAttributes());
for (std::size_t i = 0; i < expectedAttributeNames.size(); ++i) {
std::size_t cid;
BOOST_TEST_REQUIRE(gatherer.attributeId(expectedAttributeNames[i], cid));
BOOST_REQUIRE_EQUAL(expectedAttributeIds[i], cid);
}
std::string actualFeatureData;
{
LOG_DEBUG(<< "Actual");
TStrFeatureDataPrVec actual;
TFeatureSizeSizePrFeatureDataPrVecPrVec featureData;
gatherer.featureData(bucketStart, bucketLength, featureData);
for (auto const& i : featureData) {
const TSizeSizePrFeatureDataPrVec& data = i.second;
for (const auto& j : data) {
std::string const key = model_t::print(i.first) + " " +
gatherer.personName(j.first.first) + " " +
gatherer.attributeName(j.first.second);
actual.emplace_back(key, j.second);
LOG_DEBUG(<< " " << key << " = " << j.second.s_Count);
}
}
actualFeatureData = core::CContainerPrinter::print(actual);
}
BOOST_REQUIRE_EQUAL(expectedFeatureData, actualFeatureData);
}
namespace {
void testPersistDataGatherer(const CDataGatherer& origDataGatherer,
const SModelParams& params) {
std::ostringstream origJson;
core::CJsonStatePersistInserter::persist(
origJson, std::bind_front(&CDataGatherer::acceptPersistInserter, &origDataGatherer));
LOG_DEBUG(<< "origJson = " << origJson.str());
// Restore the Json into a new data gatherer
// The traverser expects the state json in a embedded document
std::istringstream origJsonStrm("{\"topLevel\" : " + origJson.str() + "}");
core::CJsonStateRestoreTraverser traverser(origJsonStrm);
CBucketGatherer::SBucketGathererInitData bucketGathererInitData{
EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, {}, 0, 0};
CDataGatherer restoredDataGatherer(model_t::E_PopulationEventRate,
model_t::E_None, params, EMPTY_STRING,
searchKey, bucketGathererInitData, traverser);
// The Json representation of the new data gatherer should be the same as the
// original
std::ostringstream newJson;
core::CJsonStatePersistInserter::persist(
newJson, std::bind_front(&CDataGatherer::acceptPersistInserter, &restoredDataGatherer));
BOOST_REQUIRE_EQUAL(origJson.str(), newJson.str());
}
}
BOOST_FIXTURE_TEST_CASE(testPersistence, CTestFixture) {
constexpr core_t::TTime startTime = 1367280000;
constexpr core_t::TTime bucketLength = 3600;
{
test::CRandomNumbers rng;
CDataGatherer::TFeatureVec features;
features.push_back(model_t::E_PopulationCountByBucketPersonAndAttribute);
features.push_back(model_t::E_PopulationUniquePersonCountByAttribute);
SModelParams const params(bucketLength);
CDataGatherer origDataGatherer =
CDataGathererBuilder(model_t::E_PopulationEventRate, features,
params, searchKey, startTime)
.build();
TMessageVec messages;
generateTestMessages(rng, startTime, bucketLength, messages);
for (auto const& message : messages) {
addArrival(message.s_Time, message.s_Person, message.s_Attribute,
origDataGatherer, m_ResourceMonitor);
}
testPersistDataGatherer(origDataGatherer, params);
}
{
// Check feature data for model_t::E_UniqueValues
constexpr std::size_t numberBuckets = 20;
test::CRandomNumbers rng;
CDataGatherer::TFeatureVec features;
features.push_back(model_t::E_PopulationInfoContentByBucketPersonAndAttribute);
SModelParams const params(bucketLength);
CDataGatherer dataGatherer =
CDataGathererBuilder(model_t::E_PopulationEventRate, features,
params, searchKey, startTime)
.valueFieldName("value")
.build();
core_t::TTime time = startTime;
for (std::size_t i = 0; i < numberBuckets; ++i, time += bucketLength) {
TMessageVec messages;
generateTestMessages(rng, time, bucketLength, messages);
for (auto const& message : messages) {
addArrival(message.s_Time, message.s_Person, "attribute",
message.s_Attribute, dataGatherer, m_ResourceMonitor);
std::size_t cid;
dataGatherer.attributeId(message.s_Attribute, cid);
std::size_t pid;
dataGatherer.personId(message.s_Person, pid);
}
BOOST_TEST_REQUIRE(dataGatherer.dataAvailable(time));
BOOST_TEST_REQUIRE(!dataGatherer.dataAvailable(time - 1));
dataGatherer.timeNow(time + bucketLength);
}
testPersistDataGatherer(dataGatherer, params);
}
}
BOOST_AUTO_TEST_SUITE_END()