lib/model/CEventRateBucketGatherer.cc (1,547 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CEventRateBucketGatherer.h>
#include <core/CFunctional.h>
#include <core/CMemoryDefStd.h>
#include <core/CProgramCounters.h>
#include <core/CStatePersistInserter.h>
#include <core/CStateRestoreTraverser.h>
#include <core/CompressUtils.h>
#include <core/Constants.h>
#include <core/RestoreMacros.h>
#include <maths/common/CBasicStatistics.h>
#include <maths/common/CBasicStatisticsPersist.h>
#include <maths/common/CChecksum.h>
#include <maths/common/COrderings.h>
#include <model/CDataGatherer.h>
#include <model/CEventData.h>
#include <model/CResourceMonitor.h>
#include <model/FunctionTypes.h>
#include <boost/unordered_set.hpp>
#include <algorithm>
#include <atomic>
#include <functional>
#include <map>
#include <string>
#include <utility>
namespace ml {
namespace model {
namespace {
using TSizeVec = std::vector<std::size_t>;
using TStrVec = std::vector<std::string>;
using TStrUInt64Map = std::map<std::string, std::uint64_t>;
using TSizeSizePr = std::pair<std::size_t, std::size_t>;
using TSizeSizePrVec = std::vector<TSizeSizePr>;
using TUInt64Vec = std::vector<std::uint64_t>;
using TSizeUSet = boost::unordered_set<std::size_t>;
using TSizeUSetCItr = TSizeUSet::const_iterator;
using TSizeUSetVec = std::vector<TSizeUSet>;
using TMeanAccumulator = maths::common::CBasicStatistics::SSampleMean<double>::TAccumulator;
using TSizeSizePrMeanAccumulatorUMap = boost::unordered_map<TSizeSizePr, TMeanAccumulator>;
using TSizeSizePrUInt64Map = std::map<TSizeSizePr, std::uint64_t>;
using TSizeSizePrMeanAccumulatorUMapQueue = CBucketQueue<TSizeSizePrMeanAccumulatorUMap>;
using TCategoryAnyMap = CEventRateBucketGatherer::TCategoryAnyMap;
using TSizeSizePrStrDataUMap = boost::unordered_map<TSizeSizePr, CUniqueStringFeatureData>;
using TSizeSizePrStrDataUMapQueue = CBucketQueue<TSizeSizePrStrDataUMap>;
using TOptionalStrVec = CBucketGatherer::TOptionalStrVec;
// We use short field names to reduce the state size
const std::string BASE_TAG("a");
const std::string ATTRIBUTE_PEOPLE_TAG("b");
const std::string UNIQUE_VALUES_TAG("c");
const std::string TIMES_OF_DAY_TAG("d");
const std::string EMPTY_STRING;
// Nested tags.
const std::string ATTRIBUTE_TAG("a");
const std::string PERSON_TAG("b");
const std::string STRING_ITEM_TAG("h");
const std::string MEAN_TIMES_TAG("i");
// Unique strings tags.
const std::string INFLUENCER_UNIQUE_STRINGS_TAG("a");
const std::string UNIQUE_STRINGS_TAG("b");
//! \brief Manages persistence of time-of-day feature data maps.
struct STimesBucketSerializer {
void operator()(const TSizeSizePrMeanAccumulatorUMap& times,
core::CStatePersistInserter& inserter) {
std::vector<TSizeSizePrMeanAccumulatorUMap::const_iterator> ordered;
ordered.reserve(times.size());
for (auto i = times.begin(); i != times.end(); ++i) {
ordered.push_back(i);
}
std::sort(ordered.begin(), ordered.end(),
core::CFunctional::SDereference<maths::common::COrderings::SFirstLess>());
for (std::size_t i = 0; i < ordered.size(); ++i) {
inserter.insertValue(PERSON_TAG, CDataGatherer::extractPersonId(*ordered[i]));
inserter.insertValue(ATTRIBUTE_TAG,
CDataGatherer::extractAttributeId(*ordered[i]));
inserter.insertValue(MEAN_TIMES_TAG,
CDataGatherer::extractData(*ordered[i]).toDelimited());
}
}
bool operator()(TSizeSizePrMeanAccumulatorUMap& times,
core::CStateRestoreTraverser& traverser) const {
std::size_t pid = 0;
std::size_t cid = 0;
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(PERSON_TAG, pid)
RESTORE_BUILT_IN(ATTRIBUTE_TAG, cid)
RESTORE(MEAN_TIMES_TAG,
times[TSizeSizePr(pid, cid)].fromDelimited(traverser.value()))
} while (traverser.next());
return true;
}
};
//! \brief Manages persistence of unique string feature data maps.
struct SStrDataBucketSerializer {
void operator()(const TSizeSizePrStrDataUMap& strings,
core::CStatePersistInserter& inserter) {
std::vector<TSizeSizePrStrDataUMap::const_iterator> ordered;
ordered.reserve(strings.size());
for (auto i = strings.begin(); i != strings.end(); ++i) {
ordered.push_back(i);
}
std::sort(ordered.begin(), ordered.end(),
core::CFunctional::SDereference<maths::common::COrderings::SFirstLess>());
for (std::size_t i = 0; i != ordered.size(); ++i) {
inserter.insertValue(PERSON_TAG, CDataGatherer::extractPersonId(*ordered[i]));
inserter.insertValue(ATTRIBUTE_TAG,
CDataGatherer::extractAttributeId(*ordered[i]));
inserter.insertLevel(
STRING_ITEM_TAG,
std::bind(&CUniqueStringFeatureData::acceptPersistInserter,
std::cref(CDataGatherer::extractData(*ordered[i])),
std::placeholders::_1));
}
}
bool operator()(TSizeSizePrStrDataUMap& map, core::CStateRestoreTraverser& traverser) const {
std::size_t pid = 0;
std::size_t cid = 0;
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(PERSON_TAG, pid)
RESTORE_BUILT_IN(ATTRIBUTE_TAG, cid)
RESTORE(STRING_ITEM_TAG,
traverser.traverseSubLevel(std::bind(
&CUniqueStringFeatureData::acceptRestoreTraverser,
std::ref(map[TSizeSizePr(pid, cid)]), std::placeholders::_1)))
} while (traverser.next());
return true;
}
};
//! Serialize \p data.
void persistAttributePeopleData(const TSizeUSetVec& data,
core::CStatePersistInserter& inserter) {
// Persist the vector in reverse order, because it means we'll
// find out the correct size more efficiently on restore.
std::size_t index = data.size();
while (index > 0) {
--index;
inserter.insertValue(ATTRIBUTE_TAG, index);
const TSizeUSet& people = data[index];
// Persist the person identifiers in sorted order to make
// it easier to compare state records.
TSizeVec orderedPeople(people.begin(), people.end());
std::sort(orderedPeople.begin(), orderedPeople.end());
for (std::size_t i = 0; i < orderedPeople.size(); ++i) {
inserter.insertValue(PERSON_TAG, orderedPeople[i]);
}
}
}
//! Serialize \p featureData.
void persistFeatureData(const TCategoryAnyMap& featureData,
core::CStatePersistInserter& inserter) {
for (const auto& data_ : featureData) {
model_t::EEventRateCategory category = data_.first;
const std::any& data = data_.second;
try {
switch (category) {
case model_t::E_DiurnalTimes:
inserter.insertLevel(
TIMES_OF_DAY_TAG,
std::bind<void>(
TSizeSizePrMeanAccumulatorUMapQueue::CSerializer<STimesBucketSerializer>(),
std::cref(std::any_cast<const TSizeSizePrMeanAccumulatorUMapQueue&>(data)),
std::placeholders::_1));
break;
case model_t::E_MeanArrivalTimes:
// TODO
break;
case model_t::E_AttributePeople:
inserter.insertLevel(
ATTRIBUTE_PEOPLE_TAG,
std::bind(&persistAttributePeopleData,
std::cref(std::any_cast<const TSizeUSetVec&>(data)),
std::placeholders::_1));
break;
case model_t::E_UniqueValues:
inserter.insertLevel(
UNIQUE_VALUES_TAG,
std::bind<void>(
TSizeSizePrStrDataUMapQueue::CSerializer<SStrDataBucketSerializer>(),
std::cref(std::any_cast<const TSizeSizePrStrDataUMapQueue&>(data)),
std::placeholders::_1));
break;
}
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to serialize data for " << category << ": " << e.what());
}
}
}
//! Extract \p data from a state document.
bool restoreAttributePeopleData(core::CStateRestoreTraverser& traverser, TSizeUSetVec& data) {
size_t lastCid = 0;
bool seenCid = false;
do {
const std::string& name = traverser.name();
if (name == ATTRIBUTE_TAG) {
if (core::CStringUtils::stringToType(traverser.value(), lastCid) == false) {
LOG_ERROR(<< "Invalid attribute ID in " << traverser.value());
return false;
}
seenCid = true;
if (lastCid >= data.size()) {
data.resize(lastCid + 1);
}
} else if (name == PERSON_TAG) {
if (seenCid == false) {
LOG_ABORT(<< "Incorrect format - person ID before attribute ID in "
<< traverser.value());
}
std::size_t pid = 0;
if (core::CStringUtils::stringToType(traverser.value(), pid) == false) {
LOG_ERROR(<< "Invalid person ID in " << traverser.value());
return false;
}
data[lastCid].insert(pid);
}
} while (traverser.next());
return true;
}
//! Extract \p featureData from a state document.
bool restoreFeatureData(core::CStateRestoreTraverser& traverser,
TCategoryAnyMap& featureData,
std::size_t latencyBuckets,
core_t::TTime bucketLength,
core_t::TTime currentBucketStartTime) {
const std::string& name = traverser.name();
if (name == ATTRIBUTE_PEOPLE_TAG) {
auto* data{std::any_cast<TSizeUSetVec>(
&featureData.emplace(model_t::E_AttributePeople, TSizeUSetVec())
.first->second)};
if (traverser.traverseSubLevel(std::bind(&restoreAttributePeopleData, std::placeholders::_1,
std::ref(*data))) == false) {
LOG_ERROR(<< "Invalid attribute/people mapping in " << traverser.value());
return false;
}
} else if (name == UNIQUE_VALUES_TAG) {
featureData.erase(model_t::E_UniqueValues);
auto* data{std::any_cast<TSizeSizePrStrDataUMapQueue>(
&featureData
.emplace(model_t::E_UniqueValues,
TSizeSizePrStrDataUMapQueue(latencyBuckets, bucketLength, currentBucketStartTime,
TSizeSizePrStrDataUMap(1)))
.first->second)};
if (traverser.traverseSubLevel(std::bind<bool>(
TSizeSizePrStrDataUMapQueue::CSerializer<SStrDataBucketSerializer>(
TSizeSizePrStrDataUMap(1)),
std::ref(*data), std::placeholders::_1)) == false) {
LOG_ERROR(<< "Invalid unique value mapping in " << traverser.value());
return false;
}
} else if (name == TIMES_OF_DAY_TAG) {
featureData.erase(model_t::E_DiurnalTimes);
auto* data{std::any_cast<TSizeSizePrMeanAccumulatorUMapQueue>(
&featureData
.emplace(model_t::E_DiurnalTimes,
TSizeSizePrMeanAccumulatorUMapQueue(latencyBuckets, bucketLength, currentBucketStartTime))
.first->second)};
if (traverser.traverseSubLevel(std::bind<bool>(
TSizeSizePrMeanAccumulatorUMapQueue::CSerializer<STimesBucketSerializer>(),
std::ref(*data), std::placeholders::_1)) == false) {
LOG_ERROR(<< "Invalid times mapping in " << traverser.value());
return false;
}
}
return true;
}
//! Get the by field name.
const std::string& byField(bool population, const TStrVec& fieldNames) {
return population ? fieldNames[1] : fieldNames[0];
}
//! Get the over field name.
const std::string& overField(bool population, const TStrVec& fieldNames) {
return population ? fieldNames[0] : EMPTY_STRING;
}
template<typename ITR, typename T>
struct SMaybeConst {};
template<typename T>
struct SMaybeConst<TCategoryAnyMap::iterator, T> {
using TRef = T&;
};
template<typename T>
struct SMaybeConst<TCategoryAnyMap::const_iterator, T> {
using TRef = const T&;
};
//! Apply a function \p f to all the data held in [\p begin, \p end).
template<typename ITR, typename F>
void applyFunc(ITR begin, ITR end, const F& f) {
for (ITR itr = begin; itr != end; ++itr) {
model_t::EEventRateCategory category = itr->first;
try {
switch (category) {
case model_t::E_DiurnalTimes: {
f(std::any_cast<typename SMaybeConst<ITR, TSizeSizePrMeanAccumulatorUMapQueue>::TRef>(
itr->second));
break;
}
case model_t::E_MeanArrivalTimes: {
// TODO
break;
}
case model_t::E_AttributePeople: {
f(std::any_cast<typename SMaybeConst<ITR, TSizeUSetVec>::TRef>(itr->second));
break;
}
case model_t::E_UniqueValues:
f(std::any_cast<typename SMaybeConst<ITR, TSizeSizePrStrDataUMapQueue>::TRef>(
itr->second));
break;
}
} catch (const std::exception& e) {
LOG_ERROR(<< "Apply failed for " << category << ": " << e.what());
}
}
}
//! Apply a function \p f to all the data held in \p featureData.
template<typename T, typename F>
void applyFunc(T& featureData, const F& f) {
applyFunc(featureData.begin(), featureData.end(), f);
}
//! \brief Removes people from the feature data.
struct SRemovePeople {
void operator()(TSizeUSetVec& attributePeople,
std::size_t lowestPersonToRemove,
std::size_t endPeople) const {
for (std::size_t cid = 0; cid < attributePeople.size(); ++cid) {
for (std::size_t pid = lowestPersonToRemove; pid < endPeople; ++pid) {
attributePeople[cid].erase(pid);
}
}
}
void operator()(TSizeUSetVec& attributePeople, const TSizeVec& peopleToRemove) const {
for (std::size_t cid = 0; cid < attributePeople.size(); ++cid) {
for (std::size_t i = 0; i < peopleToRemove.size(); ++i) {
attributePeople[cid].erase(peopleToRemove[i]);
}
}
}
void operator()(TSizeSizePrStrDataUMapQueue& peopleAttributeUniqueValues,
std::size_t lowestPersonToRemove,
std::size_t endPeople) const {
for (auto& bucket : peopleAttributeUniqueValues) {
for (auto i = bucket.begin(); i != bucket.end(); /**/) {
if (CDataGatherer::extractPersonId(*i) >= lowestPersonToRemove &&
CDataGatherer::extractPersonId(*i) < endPeople) {
i = bucket.erase(i);
} else {
++i;
}
}
}
}
void operator()(TSizeSizePrStrDataUMapQueue& peopleAttributeUniqueValues,
const TSizeVec& peopleToRemove) const {
CBucketGatherer::remove(peopleToRemove, CDataGatherer::SExtractPersonId(),
peopleAttributeUniqueValues);
}
void operator()(TSizeSizePrMeanAccumulatorUMapQueue& arrivalTimes,
std::size_t lowestPersonToRemove,
std::size_t endPeople) const {
for (auto& bucket : arrivalTimes) {
for (auto i = bucket.begin(); i != bucket.end(); /**/) {
if (CDataGatherer::extractPersonId(*i) >= lowestPersonToRemove &&
CDataGatherer::extractPersonId(*i) < endPeople) {
i = bucket.erase(i);
} else {
++i;
}
}
}
}
void operator()(TSizeSizePrMeanAccumulatorUMapQueue& arrivalTimes,
const TSizeVec& peopleToRemove) const {
CBucketGatherer::remove(peopleToRemove, CDataGatherer::SExtractPersonId(), arrivalTimes);
}
};
//! \brief Removes attributes from the feature data.
struct SRemoveAttributes {
void operator()(TSizeUSetVec& attributePeople, std::size_t lowestAttributeToRemove) const {
if (lowestAttributeToRemove < attributePeople.size()) {
attributePeople.erase(attributePeople.begin() + lowestAttributeToRemove,
attributePeople.end());
}
}
void operator()(TSizeUSetVec& attributePeople, const TSizeVec& attributesToRemove) const {
for (std::size_t i = 0; i < attributesToRemove.size(); ++i) {
attributePeople[attributesToRemove[i]].clear();
}
}
void operator()(TSizeSizePrStrDataUMapQueue& peopleAttributeUniqueValues,
std::size_t lowestAttributeToRemove) const {
for (auto& bucket : peopleAttributeUniqueValues) {
for (auto i = bucket.begin(); i != bucket.end(); /**/) {
if (CDataGatherer::extractAttributeId(*i) >= lowestAttributeToRemove) {
i = bucket.erase(i);
} else {
++i;
}
}
}
}
void operator()(TSizeSizePrStrDataUMapQueue& peopleAttributeUniqueValues,
const TSizeVec& attributesToRemove) const {
CBucketGatherer::remove(attributesToRemove, CDataGatherer::SExtractAttributeId(),
peopleAttributeUniqueValues);
}
void operator()(TSizeSizePrMeanAccumulatorUMapQueue& arrivalTimes,
std::size_t lowestAttributeToRemove) const {
for (auto& bucket : arrivalTimes) {
for (auto i = bucket.begin(); i != bucket.end(); /**/) {
if (CDataGatherer::extractAttributeId(*i) >= lowestAttributeToRemove) {
i = bucket.erase(i);
} else {
++i;
}
}
}
}
void operator()(TSizeSizePrMeanAccumulatorUMapQueue& arrivalTimes,
const TSizeVec& attributesToRemove) const {
CBucketGatherer::remove(attributesToRemove,
CDataGatherer::SExtractAttributeId(), arrivalTimes);
}
};
//! \brief Computes a checksum for the feature data.
struct SChecksum {
void operator()(const TSizeUSetVec& attributePeople,
const CDataGatherer& gatherer,
TStrUInt64Map& hashes) const {
using TStrCRef = std::reference_wrapper<const std::string>;
using TStrCRefVec = std::vector<TStrCRef>;
for (std::size_t cid = 0; cid < attributePeople.size(); ++cid) {
if (gatherer.isAttributeActive(cid)) {
TStrCRefVec people;
people.reserve(attributePeople[cid].size());
for (const auto& person : attributePeople[cid]) {
if (gatherer.isPersonActive(person)) {
people.emplace_back(gatherer.personName(person));
}
}
std::sort(people.begin(), people.end(),
maths::common::COrderings::SReferenceLess());
std::uint64_t& hash = hashes[gatherer.attributeName(cid)];
hash = maths::common::CChecksum::calculate(hash, people);
}
}
}
void operator()(const TSizeSizePrStrDataUMapQueue& peopleAttributeUniqueValues,
const CDataGatherer& gatherer,
TStrUInt64Map& hashes) const {
for (const auto& uniques : peopleAttributeUniqueValues) {
this->checksum(uniques, gatherer, hashes);
}
}
void operator()(const TSizeSizePrMeanAccumulatorUMapQueue& arrivalTimes,
const CDataGatherer& gatherer,
TStrUInt64Map& hashes) const {
for (const auto& time : arrivalTimes) {
this->checksum(time, gatherer, hashes);
}
}
template<typename DATA>
void checksum(const boost::unordered_map<TSizeSizePr, DATA>& bucket,
const CDataGatherer& gatherer,
TStrUInt64Map& hashes) const {
using TSizeUInt64VecUMap = boost::unordered_map<std::size_t, TUInt64Vec>;
TSizeUInt64VecUMap attributeHashes;
for (const auto& value : bucket) {
std::size_t pid = CDataGatherer::extractPersonId(value);
std::size_t cid = CDataGatherer::extractAttributeId(value);
if (gatherer.isPersonActive(pid) && gatherer.isAttributeActive(cid)) {
attributeHashes[cid].push_back(
maths::common::CChecksum::calculate(0, value.second));
}
}
for (auto& hash_ : attributeHashes) {
std::sort(hash_.second.begin(), hash_.second.end());
std::uint64_t& hash = hashes[gatherer.attributeName(hash_.first)];
hash = maths::common::CChecksum::calculate(hash, hash_.second);
}
}
};
//! \brief Resize the feature data to accommodate a specified
//! person and attribute identifier.
struct SResize {
void operator()(TSizeUSetVec& attributePeople, std::size_t /*pid*/, std::size_t cid) const {
if (cid >= attributePeople.size()) {
attributePeople.resize(cid + 1);
}
}
void operator()(TSizeSizePrStrDataUMapQueue& /*data*/,
std::size_t /*pid*/,
std::size_t /*cid*/) const {
// Not needed
}
void operator()(const TSizeSizePrMeanAccumulatorUMapQueue& /*arrivalTimes*/,
std::size_t /*pid*/,
std::size_t /*cid*/) const {
// Not needed
}
};
//! \brief Updates the feature data with some aggregated records.
struct SAddValue {
void operator()(TSizeUSetVec& attributePeople,
std::size_t pid,
std::size_t cid,
core_t::TTime /*time*/,
std::size_t /*count*/,
const CEventData::TDouble1VecArray& /*values*/,
const CEventData::TOptionalStr& /*uniqueStrings*/,
const TOptionalStrVec& /*influences*/) const {
attributePeople[cid].insert(pid);
}
void operator()(TSizeSizePrStrDataUMapQueue& personAttributeUniqueCounts,
std::size_t pid,
std::size_t cid,
core_t::TTime time,
std::size_t /*count*/,
const CEventData::TDouble1VecArray& /*values*/,
const CEventData::TOptionalStr& uniqueString,
const TOptionalStrVec& influences) const {
if (!uniqueString) {
return;
}
if (time > personAttributeUniqueCounts.latestBucketEnd()) {
LOG_ERROR(<< "No queue item for time " << time << ", end of latest bucket "
<< personAttributeUniqueCounts.latestBucketEnd() << ", bucket length "
<< personAttributeUniqueCounts.bucketLength());
personAttributeUniqueCounts.push(TSizeSizePrStrDataUMap(1), time);
}
TSizeSizePrStrDataUMap& counts = personAttributeUniqueCounts.get(time);
counts[{pid, cid}].insert(*uniqueString, influences);
}
void operator()(TSizeSizePrMeanAccumulatorUMapQueue& arrivalTimes,
std::size_t pid,
std::size_t cid,
core_t::TTime time,
std::size_t count,
const CEventData::TDouble1VecArray& values,
const CEventData::TOptionalStr& /*uniqueStrings*/,
const TOptionalStrVec& /*influences*/) const {
if (time > arrivalTimes.latestBucketEnd()) {
LOG_ERROR(<< "No queue item for time " << time << ", end of latest bucket "
<< arrivalTimes.latestBucketEnd() << ", bucket length "
<< arrivalTimes.bucketLength());
arrivalTimes.push(TSizeSizePrMeanAccumulatorUMap(1), time);
}
TSizeSizePrMeanAccumulatorUMap& times = arrivalTimes.get(time);
for (std::size_t i = 0; i < count; i++) {
times[{pid, cid}].add(values[i][0]);
}
}
};
//! \brief Updates the feature data for the start of a new bucket.
struct SNewBucket {
void operator()(TSizeUSetVec& /*attributePeople*/, core_t::TTime /*time*/) const {}
void operator()(TSizeSizePrStrDataUMapQueue& personAttributeUniqueCounts,
core_t::TTime time) const {
if (time > personAttributeUniqueCounts.latestBucketEnd()) {
personAttributeUniqueCounts.push(TSizeSizePrStrDataUMap(1), time);
} else {
personAttributeUniqueCounts.get(time).clear();
}
}
void operator()(TSizeSizePrMeanAccumulatorUMapQueue& arrivalTimes,
core_t::TTime time) const {
if (time > arrivalTimes.latestBucketEnd()) {
arrivalTimes.push(TSizeSizePrMeanAccumulatorUMap(1), time);
} else {
arrivalTimes.get(time).clear();
}
}
};
//! Nested tags.
const std::string DICTIONARY_WORD_TAG("a");
const std::string UNIQUE_WORD_TAG("b");
//! Persist a collection of unique strings.
void persistUniqueStrings(const CUniqueStringFeatureData::TWordStringUMap& map,
core::CStatePersistInserter& inserter) {
using TWordVec = std::vector<CUniqueStringFeatureData::TWord>;
if (!map.empty()) {
// Order the map keys to ensure consistent persistence
TWordVec keys;
keys.reserve(map.size());
for (const auto& value : map) {
keys.push_back(value.first);
}
std::sort(keys.begin(), keys.end());
for (const auto& key : keys) {
inserter.insertValue(DICTIONARY_WORD_TAG, key.toDelimited());
inserter.insertValue(UNIQUE_WORD_TAG, map.at(key));
}
}
}
//! Restore a collection of unique strings.
bool restoreUniqueStrings(core::CStateRestoreTraverser& traverser,
CUniqueStringFeatureData::TWordStringUMap& map) {
CUniqueStringFeatureData::TWord word;
do {
const std::string& name = traverser.name();
RESTORE(DICTIONARY_WORD_TAG, word.fromDelimited(traverser.value()))
RESTORE_NO_ERROR(UNIQUE_WORD_TAG, map[word] = traverser.value())
} while (traverser.next());
return true;
}
//! Persist influencer collections of unique strings.
void persistInfluencerUniqueStrings(const CUniqueStringFeatureData::TOptionalStrWordSetUMap& map,
core::CStatePersistInserter& inserter) {
if (!map.empty()) {
// Order the map keys to ensure consistent persistence
TOptionalStrVec keys;
keys.reserve(map.size());
for (const auto& influence : map) {
keys.push_back(influence.first);
}
std::sort(keys.begin(), keys.end(), maths::common::COrderings::SLess());
for (const auto& key : keys) {
inserter.insertValue(DICTIONARY_WORD_TAG, *key);
for (const auto& word : map.at(key)) {
inserter.insertValue(UNIQUE_WORD_TAG, word.toDelimited());
}
}
}
}
//! Restore influencer collections of unique strings.
bool restoreInfluencerUniqueStrings(core::CStateRestoreTraverser& traverser,
CUniqueStringFeatureData::TOptionalStrWordSetUMap& data) {
std::string key;
do {
const std::string& name = traverser.name();
if (name == DICTIONARY_WORD_TAG) {
key = traverser.value();
} else if (name == UNIQUE_WORD_TAG) {
CUniqueStringFeatureData::TWord value;
if (value.fromDelimited(traverser.value()) == false) {
LOG_ERROR(<< "Failed to restore word " << traverser.value());
return false;
}
auto i = data.begin();
for (/**/; i != data.end(); ++i) {
if (*i->first == key) {
i->second.insert(value);
break;
}
}
if (i == data.end()) {
data[key].insert(value);
}
}
} while (traverser.next());
return true;
}
//! Register the callbacks for computing the size of feature data gatherers
//! with \p visitor.
template<typename VISITOR>
void registerMemoryCallbacks(VISITOR& visitor) {
visitor.template registerCallback<TSizeUSetVec>();
visitor.template registerCallback<TSizeSizePrStrDataUMapQueue>();
visitor.template registerCallback<TSizeSizePrMeanAccumulatorUMapQueue>();
}
//! Register the callbacks for computing the size of feature data gatherers.
void registerMemoryCallbacks() {
static std::atomic_flag once = ATOMIC_FLAG_INIT;
if (once.test_and_set() == false) {
registerMemoryCallbacks(core::memory::anyVisitor());
registerMemoryCallbacks(core::memory_debug::anyVisitor());
}
}
} // unnamed::
CEventRateBucketGatherer::CEventRateBucketGatherer(CDataGatherer& dataGatherer,
const SBucketGathererInitData& initData)
: CBucketGatherer(dataGatherer, initData) {
this->initializeFieldNames(initData);
this->initializeFeatureData();
}
CEventRateBucketGatherer::CEventRateBucketGatherer(CDataGatherer& dataGatherer,
const SBucketGathererInitData& initData,
core::CStateRestoreTraverser& traverser)
: CBucketGatherer(dataGatherer, initData) {
this->initializeFieldNames(initData);
if (traverser.traverseSubLevel(std::bind(&CEventRateBucketGatherer::acceptRestoreTraverser,
this, std::placeholders::_1)) == false) {
traverser.setBadState();
}
}
CEventRateBucketGatherer::CEventRateBucketGatherer(bool isForPersistence,
const CEventRateBucketGatherer& other)
: CBucketGatherer(isForPersistence, other), m_FieldNames(other.m_FieldNames),
m_BeginInfluencingFields(other.m_BeginInfluencingFields),
m_BeginValueField(other.m_BeginValueField),
m_BeginSummaryFields(other.m_BeginSummaryFields),
m_FeatureData(other.m_FeatureData) {
if (!isForPersistence) {
LOG_ABORT(<< "This constructor only creates clones for persistence");
}
}
bool CEventRateBucketGatherer::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
this->clear();
do {
const std::string& name = traverser.name();
RESTORE(BASE_TAG,
traverser.traverseSubLevel(std::bind(&CBucketGatherer::baseAcceptRestoreTraverser,
this, std::placeholders::_1)))
if (restoreFeatureData(traverser, m_FeatureData, m_DataGatherer.params().s_LatencyBuckets,
this->bucketLength(), this->currentBucketStartTime()) == false) {
LOG_ERROR(<< "Invalid feature data in " << traverser.value());
return false;
}
} while (traverser.next());
return true;
}
void CEventRateBucketGatherer::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertLevel(BASE_TAG, std::bind(&CBucketGatherer::baseAcceptPersistInserter,
this, std::placeholders::_1));
persistFeatureData(m_FeatureData, inserter);
}
CBucketGatherer* CEventRateBucketGatherer::cloneForPersistence() const {
return new CEventRateBucketGatherer(true, *this);
}
const std::string& CEventRateBucketGatherer::persistenceTag() const {
return CBucketGatherer::EVENTRATE_BUCKET_GATHERER_TAG;
}
const std::string& CEventRateBucketGatherer::personFieldName() const {
return m_FieldNames[0];
}
const std::string& CEventRateBucketGatherer::attributeFieldName() const {
return m_DataGatherer.isPopulation() ? m_FieldNames[1] : EMPTY_STRING;
}
const std::string& CEventRateBucketGatherer::valueFieldName() const {
return m_BeginValueField != m_BeginSummaryFields ? m_FieldNames[m_BeginValueField]
: EMPTY_STRING;
}
CEventRateBucketGatherer::TStrVecCItr CEventRateBucketGatherer::beginInfluencers() const {
return m_FieldNames.begin() + m_BeginInfluencingFields;
}
CEventRateBucketGatherer::TStrVecCItr CEventRateBucketGatherer::endInfluencers() const {
return m_FieldNames.begin() + m_BeginValueField;
}
const CEventRateBucketGatherer::TStrVec& CEventRateBucketGatherer::fieldsOfInterest() const {
return m_FieldNames;
}
std::string CEventRateBucketGatherer::description() const {
return function_t::name(function_t::function(m_DataGatherer.features())) +
(m_BeginValueField == m_BeginSummaryFields
? ""
: (" " + m_FieldNames[m_BeginValueField])) +
(byField(m_DataGatherer.isPopulation(), m_FieldNames).empty() ? "" : " by ") +
byField(m_DataGatherer.isPopulation(), m_FieldNames) +
(overField(m_DataGatherer.isPopulation(), m_FieldNames).empty() ? "" : " over ") +
overField(m_DataGatherer.isPopulation(), m_FieldNames) +
(m_DataGatherer.partitionFieldName().empty() ? "" : " partition=") +
m_DataGatherer.partitionFieldName();
}
bool CEventRateBucketGatherer::processFields(const TStrCPtrVec& fieldValues,
CEventData& result,
CResourceMonitor& resourceMonitor) {
using TOptionalSize = std::optional<std::size_t>;
using TOptionalStr = std::optional<std::string>;
if (fieldValues.size() != m_FieldNames.size()) {
LOG_ERROR(<< "Unexpected field values: " << fieldValues
<< ", for field names: " << m_FieldNames);
return false;
}
const std::string* person = (fieldValues[0] == nullptr && m_DataGatherer.useNull())
? &EMPTY_STRING
: fieldValues[0];
if (person == nullptr) {
// Just ignore: the "person" field wasn't present in the
// record. Note that we don't warn here since we'll permit
// a small fraction of records to having missing field
// values.
return false;
}
for (std::size_t i = m_DataGatherer.isPopulation() ? 2 : 1; i < m_BeginValueField; ++i) {
result.addInfluence(fieldValues[i] ? TOptionalStr(*fieldValues[i]) : std::nullopt);
}
if (m_BeginValueField != m_BeginSummaryFields) {
if (const std::string* value = fieldValues[m_BeginValueField]) {
result.stringValue(*value);
}
}
std::size_t count = 1;
if (m_DataGatherer.summaryMode() != model_t::E_None) {
if (m_DataGatherer.extractCountFromField(m_FieldNames[m_BeginSummaryFields],
fieldValues[m_BeginSummaryFields],
count) == false) {
result.addValue();
return true;
}
}
if (count == CDataGatherer::EXPLICIT_NULL_SUMMARY_COUNT) {
result.setExplicitNull();
} else {
model_t::EFeature feature = m_DataGatherer.feature(0);
if ((feature == model_t::E_IndividualTimeOfDayByBucketAndPerson) ||
(feature == model_t::E_PopulationTimeOfDayByBucketPersonAndAttribute)) {
double t = static_cast<double>(result.time() % core::constants::DAY);
result.addValue(TDouble1Vec(1, t));
} else if ((feature == model_t::E_IndividualTimeOfWeekByBucketAndPerson) ||
(feature == model_t::E_PopulationTimeOfWeekByBucketPersonAndAttribute)) {
double t = static_cast<double>(result.time() % core::constants::WEEK);
result.addValue(TDouble1Vec(1, t));
} else {
result.addCountStatistic(count);
}
}
bool addedPerson = false;
std::size_t personId = CDynamicStringIdRegistry::INVALID_ID;
if (result.isExplicitNull()) {
m_DataGatherer.personId(*person, personId);
} else {
personId = m_DataGatherer.addPerson(*person, resourceMonitor, addedPerson);
}
if (personId == CDynamicStringIdRegistry::INVALID_ID) {
if (!result.isExplicitNull()) {
LOG_TRACE(<< "Couldn't create a person, over memory limit");
}
return false;
}
if (addedPerson) {
resourceMonitor.addExtraMemory(m_DataGatherer.isPopulation()
? CDataGatherer::ESTIMATED_MEM_USAGE_PER_OVER_FIELD
: CDataGatherer::ESTIMATED_MEM_USAGE_PER_BY_FIELD);
++(m_DataGatherer.isPopulation()
? core::CProgramCounters::counter(counter_t::E_TSADNumberOverFields)
: core::CProgramCounters::counter(counter_t::E_TSADNumberByFields));
}
if (!result.person(personId)) {
LOG_ERROR(<< "Bad by field value: " << *person);
return false;
}
if (m_DataGatherer.isPopulation()) {
const std::string* attribute =
(fieldValues[1] == nullptr && m_DataGatherer.useNull()) ? &EMPTY_STRING
: fieldValues[1];
if (attribute == nullptr) {
// Just ignore: the "by" field wasn't present in the
// record. This doesn't necessarily stop us processing
// the record by other models so we don't return false.
// Note that we don't warn here since we'll permit a
// small fraction of records to having missing field
// values.
result.addAttribute();
return true;
}
bool addedAttribute = false;
std::size_t newAttribute = CDynamicStringIdRegistry::INVALID_ID;
if (result.isExplicitNull()) {
m_DataGatherer.attributeId(*attribute, newAttribute);
} else {
newAttribute = m_DataGatherer.addAttribute(*attribute, resourceMonitor,
addedAttribute);
}
result.addAttribute(TOptionalSize(newAttribute));
if (addedAttribute) {
resourceMonitor.addExtraMemory(CDataGatherer::ESTIMATED_MEM_USAGE_PER_BY_FIELD);
++core::CProgramCounters::counter(counter_t::E_TSADNumberByFields);
}
} else {
result.addAttribute(std::size_t(0));
}
return true;
}
void CEventRateBucketGatherer::recyclePeople(const TSizeVec& peopleToRemove) {
if (peopleToRemove.empty()) {
return;
}
applyFunc(m_FeatureData, [&, remove = SRemovePeople{} ](auto& data) {
remove(data, peopleToRemove);
});
this->CBucketGatherer::recyclePeople(peopleToRemove);
}
void CEventRateBucketGatherer::removePeople(std::size_t lowestPersonToRemove) {
applyFunc(m_FeatureData, [&, remove = SRemovePeople{} ](auto& data) {
remove(data, lowestPersonToRemove, m_DataGatherer.numberPeople());
});
this->CBucketGatherer::removePeople(lowestPersonToRemove);
}
void CEventRateBucketGatherer::recycleAttributes(const TSizeVec& attributesToRemove) {
if (attributesToRemove.empty()) {
return;
}
applyFunc(m_FeatureData, [&, remove = SRemoveAttributes{} ](auto& data) {
remove(data, attributesToRemove);
});
this->CBucketGatherer::recycleAttributes(attributesToRemove);
}
void CEventRateBucketGatherer::removeAttributes(std::size_t lowestAttributeToRemove) {
applyFunc(m_FeatureData, [&, remove = SRemoveAttributes{} ](auto& data) {
remove(data, lowestAttributeToRemove);
});
this->CBucketGatherer::removeAttributes(lowestAttributeToRemove);
}
std::uint64_t CEventRateBucketGatherer::checksum() const {
std::uint64_t seed = this->CBucketGatherer::checksum();
TStrUInt64Map hashes;
applyFunc(m_FeatureData, [&, checksum = SChecksum{} ](const auto& data) {
checksum(data, m_DataGatherer, hashes);
});
LOG_TRACE(<< "seed = " << seed);
LOG_TRACE(<< "hashes = " << hashes);
core::CHashing::CSafeMurmurHash2String64 hasher;
return core::CHashing::hashCombine(seed, hasher(core::CContainerPrinter::print(hashes)));
}
void CEventRateBucketGatherer::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
registerMemoryCallbacks();
mem->setName("CPopulationEventRateDataGatherer");
CBucketGatherer::debugMemoryUsage(mem->addChild());
core::memory_debug::dynamicSize("m_FieldNames", m_FieldNames, mem);
core::memory_debug::dynamicSize("m_FeatureData", m_FeatureData, mem);
}
std::size_t CEventRateBucketGatherer::memoryUsage() const {
registerMemoryCallbacks();
std::size_t mem = CBucketGatherer::memoryUsage();
mem += core::memory::dynamicSize(m_FieldNames);
mem += core::memory::dynamicSize(m_FeatureData);
return mem;
}
std::size_t CEventRateBucketGatherer::staticSize() const {
return sizeof(*this);
}
void CEventRateBucketGatherer::clear() {
this->CBucketGatherer::clear();
m_FeatureData.clear();
this->initializeFeatureData();
}
bool CEventRateBucketGatherer::resetBucket(core_t::TTime bucketStart) {
return this->CBucketGatherer::resetBucket(bucketStart);
}
void CEventRateBucketGatherer::releaseMemory(core_t::TTime /*samplingCutoffTime*/) {
// Nothing to release
}
void CEventRateBucketGatherer::sample(core_t::TTime /*time*/) {
// Nothing to sample
}
void CEventRateBucketGatherer::featureData(core_t::TTime time,
core_t::TTime /*bucketLength*/,
TFeatureAnyPrVec& result) const {
result.clear();
if (!this->dataAvailable(time) ||
time >= this->currentBucketStartTime() + this->bucketLength()) {
LOG_DEBUG(<< "No data available at " << time
<< ", current bucket = " << this->printCurrentBucket()
<< ", bucket length = " << this->bucketLength());
return;
}
for (std::size_t i = 0, n = m_DataGatherer.numberFeatures(); i < n; ++i) {
const model_t::EFeature feature = m_DataGatherer.feature(i);
switch (feature) {
case model_t::E_IndividualCountByBucketAndPerson:
this->personCounts(feature, time, result);
break;
case model_t::E_IndividualNonZeroCountByBucketAndPerson:
case model_t::E_IndividualTotalBucketCountByPerson:
this->nonZeroPersonCounts(feature, time, result);
break;
case model_t::E_IndividualIndicatorOfBucketPerson:
this->personIndicator(feature, time, result);
break;
case model_t::E_IndividualLowCountsByBucketAndPerson:
case model_t::E_IndividualHighCountsByBucketAndPerson:
this->personCounts(feature, time, result);
break;
case model_t::E_IndividualArrivalTimesByPerson:
case model_t::E_IndividualLongArrivalTimesByPerson:
case model_t::E_IndividualShortArrivalTimesByPerson:
this->personArrivalTimes(feature, time, result);
break;
case model_t::E_IndividualLowNonZeroCountByBucketAndPerson:
case model_t::E_IndividualHighNonZeroCountByBucketAndPerson:
this->nonZeroPersonCounts(feature, time, result);
break;
case model_t::E_IndividualUniqueCountByBucketAndPerson:
case model_t::E_IndividualLowUniqueCountByBucketAndPerson:
case model_t::E_IndividualHighUniqueCountByBucketAndPerson:
this->bucketUniqueValuesPerPerson(feature, time, result);
break;
case model_t::E_IndividualInfoContentByBucketAndPerson:
case model_t::E_IndividualHighInfoContentByBucketAndPerson:
case model_t::E_IndividualLowInfoContentByBucketAndPerson:
this->bucketCompressedLengthPerPerson(feature, time, result);
break;
case model_t::E_IndividualTimeOfDayByBucketAndPerson:
case model_t::E_IndividualTimeOfWeekByBucketAndPerson:
this->bucketMeanTimesPerPerson(feature, time, result);
break;
CASE_INDIVIDUAL_METRIC:
LOG_ERROR(<< "Unexpected feature = " << model_t::print(feature));
break;
case model_t::E_PopulationAttributeTotalCountByPerson:
case model_t::E_PopulationCountByBucketPersonAndAttribute:
this->nonZeroAttributeCounts(feature, time, result);
break;
case model_t::E_PopulationIndicatorOfBucketPersonAndAttribute:
this->attributeIndicator(feature, time, result);
break;
case model_t::E_PopulationUniquePersonCountByAttribute:
this->peoplePerAttribute(feature, result);
break;
case model_t::E_PopulationUniqueCountByBucketPersonAndAttribute:
case model_t::E_PopulationLowUniqueCountByBucketPersonAndAttribute:
case model_t::E_PopulationHighUniqueCountByBucketPersonAndAttribute:
this->bucketUniqueValuesPerPersonAttribute(feature, time, result);
break;
case model_t::E_PopulationLowCountsByBucketPersonAndAttribute:
case model_t::E_PopulationHighCountsByBucketPersonAndAttribute:
this->nonZeroAttributeCounts(feature, time, result);
break;
case model_t::E_PopulationInfoContentByBucketPersonAndAttribute:
case model_t::E_PopulationLowInfoContentByBucketPersonAndAttribute:
case model_t::E_PopulationHighInfoContentByBucketPersonAndAttribute:
this->bucketCompressedLengthPerPersonAttribute(feature, time, result);
break;
case model_t::E_PopulationTimeOfDayByBucketPersonAndAttribute:
case model_t::E_PopulationTimeOfWeekByBucketPersonAndAttribute:
this->bucketMeanTimesPerPersonAttribute(feature, time, result);
break;
CASE_POPULATION_METRIC:
LOG_ERROR(<< "Unexpected feature = " << model_t::print(feature));
break;
}
}
}
void CEventRateBucketGatherer::personCounts(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
if (m_DataGatherer.isPopulation()) {
LOG_ERROR(<< "Function does not support population analysis.");
return;
}
result_.emplace_back(feature, TSizeFeatureDataPrVec());
auto& result = *std::any_cast<TSizeFeatureDataPrVec>(&result_.back().second);
result.reserve(m_DataGatherer.numberActivePeople());
for (std::size_t pid = 0, n = m_DataGatherer.numberPeople(); pid < n; ++pid) {
if (!m_DataGatherer.isPersonActive(pid) ||
this->hasExplicitNullsOnly(time, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID)) {
continue;
}
result.emplace_back(pid, 0);
}
for (const auto& count_ : this->bucketCounts(time)) {
std::uint64_t& count = std::lower_bound(result.begin(), result.end(),
CDataGatherer::extractPersonId(count_),
maths::common::COrderings::SFirstLess())
->second.s_Count;
count += CDataGatherer::extractData(count_);
}
this->addInfluencerCounts(time, result);
}
void CEventRateBucketGatherer::nonZeroPersonCounts(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeFeatureDataPrVec());
auto& result = *std::any_cast<TSizeFeatureDataPrVec>(&result_.back().second);
const TSizeSizePrUInt64UMap& personAttributeCounts = this->bucketCounts(time);
result.reserve(personAttributeCounts.size());
for (const auto& count : personAttributeCounts) {
result.emplace_back(CDataGatherer::extractPersonId(count),
CDataGatherer::extractData(count));
}
std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess());
this->addInfluencerCounts(time, result);
}
void CEventRateBucketGatherer::personIndicator(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeFeatureDataPrVec());
auto& result = *std::any_cast<TSizeFeatureDataPrVec>(&result_.back().second);
const TSizeSizePrUInt64UMap& personAttributeCounts = this->bucketCounts(time);
result.reserve(personAttributeCounts.size());
for (const auto& count : personAttributeCounts) {
result.emplace_back(CDataGatherer::extractPersonId(count), 1);
}
std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess());
this->addInfluencerCounts(time, result);
}
void CEventRateBucketGatherer::personArrivalTimes(model_t::EFeature feature,
core_t::TTime /*time*/,
TFeatureAnyPrVec& result_) const {
// TODO
result_.emplace_back(feature, TSizeFeatureDataPrVec());
}
void CEventRateBucketGatherer::nonZeroAttributeCounts(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeSizePrFeatureDataPrVec());
auto& result = *std::any_cast<TSizeSizePrFeatureDataPrVec>(&result_.back().second);
const TSizeSizePrUInt64UMap& personAttributeCounts = this->bucketCounts(time);
result.reserve(personAttributeCounts.size());
for (const auto& count : personAttributeCounts) {
if (CDataGatherer::extractData(count) > 0) {
result.emplace_back(count.first, CDataGatherer::extractData(count));
}
}
std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess());
this->addInfluencerCounts(time, result);
}
void CEventRateBucketGatherer::peoplePerAttribute(model_t::EFeature feature,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeSizePrFeatureDataPrVec());
auto& result = *std::any_cast<TSizeSizePrFeatureDataPrVec>(&result_.back().second);
auto i = m_FeatureData.find(model_t::E_AttributePeople);
if (i == m_FeatureData.end()) {
return;
}
try {
const auto& attributePeople = std::any_cast<const TSizeUSetVec&>(i->second);
result.reserve(attributePeople.size());
for (std::size_t cid = 0; cid < attributePeople.size(); ++cid) {
if (m_DataGatherer.isAttributeActive(cid)) {
result.emplace_back(TSizeSizePr(0, cid), attributePeople[cid].size());
}
}
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to extract "
<< model_t::print(model_t::E_PopulationUniquePersonCountByAttribute)
<< ": " << e.what());
}
}
void CEventRateBucketGatherer::attributeIndicator(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeSizePrFeatureDataPrVec());
auto& result = *std::any_cast<TSizeSizePrFeatureDataPrVec>(&result_.back().second);
const TSizeSizePrUInt64UMap& counts = this->bucketCounts(time);
result.reserve(counts.size());
for (const auto& count : counts) {
if (CDataGatherer::extractData(count) > 0) {
result.emplace_back(count.first, 1);
}
}
std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess());
this->addInfluencerCounts(time, result);
for (std::size_t i = 0; i < result.size(); ++i) {
SEventRateFeatureData& data = result[i].second;
for (std::size_t j = 0; j < data.s_InfluenceValues.size(); ++j) {
for (std::size_t k = 0; k < data.s_InfluenceValues[j].size(); ++k) {
data.s_InfluenceValues[j][k].second.first = TDoubleVec{1.0};
}
}
}
}
void CEventRateBucketGatherer::bucketUniqueValuesPerPerson(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeFeatureDataPrVec());
auto& result = *std::any_cast<TSizeFeatureDataPrVec>(&result_.back().second);
auto i = m_FeatureData.find(model_t::E_UniqueValues);
if (i == m_FeatureData.end()) {
return;
}
try {
const auto& personAttributeUniqueValues =
std::any_cast<const TSizeSizePrStrDataUMapQueue&>(i->second).get(time);
result.reserve(personAttributeUniqueValues.size());
for (const auto& uniques : personAttributeUniqueValues) {
result.emplace_back(CDataGatherer::extractPersonId(uniques), 0);
CDataGatherer::extractData(uniques).populateDistinctCountFeatureData(
result.back().second);
}
std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess());
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to extract "
<< model_t::print(model_t::E_IndividualUniqueCountByBucketAndPerson)
<< ": " << e.what());
}
}
void CEventRateBucketGatherer::bucketUniqueValuesPerPersonAttribute(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeSizePrFeatureDataPrVec());
auto& result = *std::any_cast<TSizeSizePrFeatureDataPrVec>(&result_.back().second);
auto i = m_FeatureData.find(model_t::E_UniqueValues);
if (i == m_FeatureData.end()) {
return;
}
try {
const auto& personAttributeUniqueValues =
std::any_cast<const TSizeSizePrStrDataUMapQueue&>(i->second).get(time);
result.reserve(personAttributeUniqueValues.size());
for (const auto& uniques : personAttributeUniqueValues) {
result.emplace_back(uniques.first, 0);
CDataGatherer::extractData(uniques).populateDistinctCountFeatureData(
result.back().second);
}
std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess());
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to extract "
<< model_t::print(model_t::E_PopulationUniqueCountByBucketPersonAndAttribute)
<< ": " << e.what());
}
}
void CEventRateBucketGatherer::bucketCompressedLengthPerPerson(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeFeatureDataPrVec());
auto& result = *std::any_cast<TSizeFeatureDataPrVec>(&result_.back().second);
auto i = m_FeatureData.find(model_t::E_UniqueValues);
if (i == m_FeatureData.end()) {
return;
}
try {
const auto& personAttributeUniqueValues =
std::any_cast<const TSizeSizePrStrDataUMapQueue&>(i->second).get(time);
result.reserve(personAttributeUniqueValues.size());
for (const auto& uniques : personAttributeUniqueValues) {
result.emplace_back(CDataGatherer::extractPersonId(uniques), 0);
CDataGatherer::extractData(uniques).populateInfoContentFeatureData(
result.back().second);
}
std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess());
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to extract "
<< model_t::print(model_t::E_IndividualInfoContentByBucketAndPerson)
<< ": " << e.what());
}
}
void CEventRateBucketGatherer::bucketCompressedLengthPerPersonAttribute(
model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeSizePrFeatureDataPrVec());
auto& result = *std::any_cast<TSizeSizePrFeatureDataPrVec>(&result_.back().second);
auto i = m_FeatureData.find(model_t::E_UniqueValues);
if (i == m_FeatureData.end()) {
return;
}
try {
const auto& personAttributeUniqueValues =
std::any_cast<const TSizeSizePrStrDataUMapQueue&>(i->second).get(time);
result.reserve(personAttributeUniqueValues.size());
for (const auto& uniques : personAttributeUniqueValues) {
result.emplace_back(uniques.first, 0);
CDataGatherer::extractData(uniques).populateInfoContentFeatureData(
result.back().second);
}
std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess());
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to extract "
<< model_t::print(model_t::E_PopulationInfoContentByBucketPersonAndAttribute)
<< ": " << e.what());
}
}
void CEventRateBucketGatherer::bucketMeanTimesPerPerson(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeFeatureDataPrVec());
auto& result = *std::any_cast<TSizeFeatureDataPrVec>(&result_.back().second);
auto i = m_FeatureData.find(model_t::E_DiurnalTimes);
if (i == m_FeatureData.end()) {
return;
}
try {
const auto& arrivalTimes =
std::any_cast<const TSizeSizePrMeanAccumulatorUMapQueue&>(i->second).get(time);
result.reserve(arrivalTimes.size());
for (const auto& time_ : arrivalTimes) {
result.emplace_back(CDataGatherer::extractPersonId(time_),
static_cast<std::uint64_t>(maths::common::CBasicStatistics::mean(
CDataGatherer::extractData(time_))));
}
std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess());
// We don't bother to gather the influencer bucket means
// so the best we can do is use the person and attribute
// bucket mean.
this->addInfluencerCounts(time, result);
for (std::size_t j = 0; j < result.size(); ++j) {
SEventRateFeatureData& data = result[j].second;
for (std::size_t k = 0; k < data.s_InfluenceValues.size(); ++k) {
for (std::size_t l = 0; l < data.s_InfluenceValues[k].size(); ++l) {
data.s_InfluenceValues[k][l].second.first =
TDouble1Vec{static_cast<double>(data.s_Count)};
}
}
}
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to extract "
<< model_t::print(model_t::E_DiurnalTimes) << ": " << e.what());
}
}
void CEventRateBucketGatherer::bucketMeanTimesPerPersonAttribute(model_t::EFeature feature,
core_t::TTime time,
TFeatureAnyPrVec& result_) const {
result_.emplace_back(feature, TSizeSizePrFeatureDataPrVec());
auto& result = *std::any_cast<TSizeSizePrFeatureDataPrVec>(&result_.back().second);
auto i = m_FeatureData.find(model_t::E_DiurnalTimes);
if (i == m_FeatureData.end()) {
return;
}
try {
const auto& arrivalTimes =
std::any_cast<const TSizeSizePrMeanAccumulatorUMapQueue&>(i->second).get(time);
result.reserve(arrivalTimes.size());
for (const auto& time_ : arrivalTimes) {
result.emplace_back(time_.first,
static_cast<std::uint64_t>(maths::common::CBasicStatistics::mean(
CDataGatherer::extractData(time_))));
}
std::sort(result.begin(), result.end(), maths::common::COrderings::SFirstLess());
// We don't bother to gather the influencer bucket means
// so the best we can do is use the person and attribute
// bucket mean.
this->addInfluencerCounts(time, result);
for (std::size_t j = 0; j < result.size(); ++j) {
SEventRateFeatureData& data = result[j].second;
for (std::size_t k = 0; k < data.s_InfluenceValues.size(); ++k) {
for (std::size_t l = 0; l < data.s_InfluenceValues[k].size(); ++l) {
data.s_InfluenceValues[k][l].second.first =
TDouble1Vec{static_cast<double>(data.s_Count)};
}
}
}
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to extract "
<< model_t::print(model_t::E_DiurnalTimes) << ": " << e.what());
}
}
void CEventRateBucketGatherer::resize(std::size_t pid, std::size_t cid) {
applyFunc(m_FeatureData,
[&, resize = SResize{} ](auto& data) { resize(data, pid, cid); });
}
void CEventRateBucketGatherer::addValue(std::size_t pid,
std::size_t cid,
core_t::TTime time,
const CEventData::TDouble1VecArray& values,
std::size_t count,
const CEventData::TOptionalStr& stringValue,
const TOptionalStrVec& influences) {
// Check that we are correctly sized - a person/attribute might have been added
this->resize(pid, cid);
applyFunc(m_FeatureData, [&, addValue = SAddValue{} ](auto& data) {
addValue(data, pid, cid, time, count, values, stringValue, influences);
});
}
void CEventRateBucketGatherer::startNewBucket(core_t::TTime time, bool /*skipUpdates*/) {
applyFunc(m_FeatureData, [&, newBucket = SNewBucket{} ](auto& data) {
newBucket(data, time);
});
}
void CEventRateBucketGatherer::initializeFieldNames(const CBucketGatherer::SBucketGathererInitData& initData) {
m_FieldNames.push_back(initData.s_PersonFieldName);
if (m_DataGatherer.isPopulation()) {
m_FieldNames.push_back(initData.s_AttributeFieldName);
}
m_BeginInfluencingFields = m_FieldNames.size();
m_FieldNames.insert(m_FieldNames.end(), initData.s_InfluenceFieldNames.begin(),
initData.s_InfluenceFieldNames.end());
m_BeginValueField = m_FieldNames.size();
if (!initData.s_ValueFieldName.empty()) {
m_FieldNames.push_back(initData.s_ValueFieldName);
}
m_BeginSummaryFields = m_FieldNames.size();
switch (m_DataGatherer.summaryMode()) {
case model_t::E_None:
break;
case model_t::E_Manual:
m_FieldNames.push_back(initData.s_SummaryCountFieldName);
break;
}
m_FieldNames.shrink_to_fit();
}
void CEventRateBucketGatherer::initializeFeatureData() {
for (std::size_t i = 0, n = m_DataGatherer.numberFeatures(); i < n; ++i) {
switch (m_DataGatherer.feature(i)) {
case model_t::E_IndividualCountByBucketAndPerson:
case model_t::E_IndividualNonZeroCountByBucketAndPerson:
case model_t::E_IndividualTotalBucketCountByPerson:
case model_t::E_IndividualIndicatorOfBucketPerson:
case model_t::E_IndividualLowCountsByBucketAndPerson:
case model_t::E_IndividualHighCountsByBucketAndPerson:
// We always gather person counts.
break;
case model_t::E_IndividualArrivalTimesByPerson:
case model_t::E_IndividualLongArrivalTimesByPerson:
case model_t::E_IndividualShortArrivalTimesByPerson:
// TODO
break;
case model_t::E_IndividualTimeOfDayByBucketAndPerson:
case model_t::E_IndividualTimeOfWeekByBucketAndPerson:
m_FeatureData[model_t::E_DiurnalTimes] = TSizeSizePrMeanAccumulatorUMapQueue(
m_DataGatherer.params().s_LatencyBuckets, this->bucketLength(),
this->currentBucketStartTime());
break;
case model_t::E_IndividualLowNonZeroCountByBucketAndPerson:
case model_t::E_IndividualHighNonZeroCountByBucketAndPerson:
// We always gather person counts.
break;
case model_t::E_IndividualUniqueCountByBucketAndPerson:
case model_t::E_IndividualLowUniqueCountByBucketAndPerson:
case model_t::E_IndividualHighUniqueCountByBucketAndPerson:
case model_t::E_IndividualInfoContentByBucketAndPerson:
case model_t::E_IndividualHighInfoContentByBucketAndPerson:
case model_t::E_IndividualLowInfoContentByBucketAndPerson:
m_FeatureData[model_t::E_UniqueValues] = TSizeSizePrStrDataUMapQueue(
m_DataGatherer.params().s_LatencyBuckets, this->bucketLength(),
this->currentBucketStartTime(), TSizeSizePrStrDataUMap(1));
break;
case model_t::E_PopulationAttributeTotalCountByPerson:
case model_t::E_PopulationCountByBucketPersonAndAttribute:
case model_t::E_PopulationIndicatorOfBucketPersonAndAttribute:
case model_t::E_PopulationLowCountsByBucketPersonAndAttribute:
case model_t::E_PopulationHighCountsByBucketPersonAndAttribute:
// We always gather person attribute counts.
break;
case model_t::E_PopulationUniquePersonCountByAttribute:
m_FeatureData[model_t::E_AttributePeople] = TSizeUSetVec();
break;
case model_t::E_PopulationUniqueCountByBucketPersonAndAttribute:
case model_t::E_PopulationLowUniqueCountByBucketPersonAndAttribute:
case model_t::E_PopulationHighUniqueCountByBucketPersonAndAttribute:
case model_t::E_PopulationInfoContentByBucketPersonAndAttribute:
case model_t::E_PopulationLowInfoContentByBucketPersonAndAttribute:
case model_t::E_PopulationHighInfoContentByBucketPersonAndAttribute:
m_FeatureData[model_t::E_UniqueValues] = TSizeSizePrStrDataUMapQueue(
m_DataGatherer.params().s_LatencyBuckets, this->bucketLength(),
this->currentBucketStartTime(), TSizeSizePrStrDataUMap(1));
break;
case model_t::E_PopulationTimeOfDayByBucketPersonAndAttribute:
case model_t::E_PopulationTimeOfWeekByBucketPersonAndAttribute:
m_FeatureData[model_t::E_DiurnalTimes] = TSizeSizePrMeanAccumulatorUMapQueue(
m_DataGatherer.params().s_LatencyBuckets, this->bucketLength(),
this->currentBucketStartTime());
break;
CASE_INDIVIDUAL_METRIC:
CASE_POPULATION_METRIC:
LOG_ERROR(<< "Unexpected feature = "
<< model_t::print(m_DataGatherer.feature(i)));
break;
}
}
}
void CEventRateBucketGatherer::addInfluencerCounts(core_t::TTime time,
TSizeFeatureDataPrVec& result) const {
const TSizeSizePrOptionalStrPrUInt64UMapVec& influencers = this->influencerCounts(time);
if (influencers.empty()) {
return;
}
for (std::size_t i = 0; i < result.size(); ++i) {
result[i].second.s_InfluenceValues.resize(influencers.size());
}
for (std::size_t i = 0; i < influencers.size(); ++i) {
for (const auto& influence : influencers[i]) {
std::size_t pid = CDataGatherer::extractPersonId(influence.first);
auto k = std::lower_bound(result.begin(), result.end(), pid,
maths::common::COrderings::SFirstLess());
if (k == result.end() || k->first != pid) {
LOG_ERROR(<< "Missing feature data for person "
<< m_DataGatherer.personName(pid));
continue;
}
k->second.s_InfluenceValues[i].emplace_back(
TStrCRef(CDataGatherer::extractData(influence.first)),
TDouble1VecDoublePr(TDouble1Vec{static_cast<double>(influence.second)}, 1.0));
}
}
}
void CEventRateBucketGatherer::addInfluencerCounts(core_t::TTime time,
TSizeSizePrFeatureDataPrVec& result) const {
const TSizeSizePrOptionalStrPrUInt64UMapVec& influencers = this->influencerCounts(time);
if (influencers.empty()) {
return;
}
for (std::size_t i = 0; i < result.size(); ++i) {
result[i].second.s_InfluenceValues.resize(influencers.size());
}
for (std::size_t i = 0; i < influencers.size(); ++i) {
for (const auto& influence : influencers[i]) {
auto k = std::lower_bound(result.begin(), result.end(),
influence.first.first,
maths::common::COrderings::SFirstLess());
if (k == result.end() || k->first != influence.first.first) {
std::size_t pid = CDataGatherer::extractPersonId(influence.first);
std::size_t cid = CDataGatherer::extractAttributeId(influence.first);
LOG_ERROR(<< "Missing feature data for person "
<< m_DataGatherer.personName(pid) << " and attribute "
<< m_DataGatherer.attributeName(cid));
continue;
}
k->second.s_InfluenceValues[i].emplace_back(
TStrCRef(CDataGatherer::extractData(influence.first)),
TDouble1VecDoublePr(TDouble1Vec{static_cast<double>(influence.second)}, 1.0));
}
}
}
////// CUniqueStringFeatureData //////
void CUniqueStringFeatureData::insert(const std::string& value,
const TOptionalStrVec& influences) {
TWord valueHash = m_Dictionary1.word(value);
m_UniqueStrings.emplace(valueHash, value);
if (influences.size() > m_InfluencerUniqueStrings.size()) {
m_InfluencerUniqueStrings.resize(influences.size());
}
for (std::size_t i = 0; i < influences.size(); ++i) {
// The influence strings are optional.
if (influences[i]) {
m_InfluencerUniqueStrings[i][influences[i]].insert(valueHash);
}
}
}
void CUniqueStringFeatureData::populateDistinctCountFeatureData(SEventRateFeatureData& featureData) const {
featureData.s_Count = m_UniqueStrings.size();
featureData.s_InfluenceValues.clear();
featureData.s_InfluenceValues.resize(m_InfluencerUniqueStrings.size());
for (std::size_t i = 0; i < m_InfluencerUniqueStrings.size(); ++i) {
TStrCRefDouble1VecDoublePrPrVec& data = featureData.s_InfluenceValues[i];
data.reserve(m_InfluencerUniqueStrings[i].size());
for (const auto& influence : m_InfluencerUniqueStrings[i]) {
data.emplace_back(
TStrCRef(*influence.first),
TDouble1VecDoublePr(
TDouble1Vec{static_cast<double>(influence.second.size())}, 1.0));
}
}
}
void CUniqueStringFeatureData::populateInfoContentFeatureData(SEventRateFeatureData& featureData) const {
using TStrCRefVec = std::vector<TStrCRef>;
featureData.s_InfluenceValues.clear();
core::CDeflator compressor(true);
try {
TStrCRefVec strings;
strings.reserve(m_UniqueStrings.size());
for (const auto& string : m_UniqueStrings) {
strings.emplace_back(string.second);
}
std::sort(strings.begin(), strings.end(), maths::common::COrderings::SLess());
std::for_each(strings.begin(), strings.end(), [&compressor](const std::string& string) {
compressor.addString(string);
});
std::size_t length = 0;
if (compressor.length(true, length) == false) {
LOG_ERROR(<< "Failed to get compressed length");
compressor.reset();
}
featureData.s_Count = length;
featureData.s_InfluenceValues.reserve(m_InfluencerUniqueStrings.size());
for (std::size_t i = 0; i < m_InfluencerUniqueStrings.size(); ++i) {
featureData.s_InfluenceValues.push_back(TStrCRefDouble1VecDoublePrPrVec());
TStrCRefDouble1VecDoublePrPrVec& data =
featureData.s_InfluenceValues.back();
for (const auto& influence : m_InfluencerUniqueStrings[i]) {
strings.clear();
strings.reserve(influence.second.size());
for (const auto& word : influence.second) {
strings.emplace_back(m_UniqueStrings.at(word));
}
std::sort(strings.begin(), strings.end(),
maths::common::COrderings::SLess());
std::for_each(strings.begin(), strings.end(),
[&compressor](const std::string& string) {
compressor.addString(string);
});
length = 0;
if (compressor.length(true, length) == false) {
LOG_ERROR(<< "Failed to get compressed length");
compressor.reset();
}
data.emplace_back(
TStrCRef(*influence.first),
TDouble1VecDoublePr(TDouble1Vec{static_cast<double>(length)}, 1.0));
}
}
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed to get info content: " << e.what());
}
}
void CUniqueStringFeatureData::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertLevel(UNIQUE_STRINGS_TAG, std::bind(&persistUniqueStrings,
std::cref(m_UniqueStrings),
std::placeholders::_1));
for (std::size_t i = 0; i < m_InfluencerUniqueStrings.size(); ++i) {
inserter.insertLevel(INFLUENCER_UNIQUE_STRINGS_TAG,
std::bind(&persistInfluencerUniqueStrings,
std::cref(m_InfluencerUniqueStrings[i]),
std::placeholders::_1));
}
}
bool CUniqueStringFeatureData::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
do {
const std::string& name = traverser.name();
RESTORE(UNIQUE_STRINGS_TAG,
traverser.traverseSubLevel(std::bind(&restoreUniqueStrings, std::placeholders::_1,
std::ref(m_UniqueStrings))))
RESTORE_SETUP_TEARDOWN(
INFLUENCER_UNIQUE_STRINGS_TAG,
m_InfluencerUniqueStrings.push_back(TOptionalStrWordSetUMap()),
traverser.traverseSubLevel(
std::bind(&restoreInfluencerUniqueStrings, std::placeholders::_1,
std::ref(m_InfluencerUniqueStrings.back()))),
/**/)
} while (traverser.next());
return true;
}
std::uint64_t CUniqueStringFeatureData::checksum() const {
std::uint64_t seed = maths::common::CChecksum::calculate(0, m_UniqueStrings);
return maths::common::CChecksum::calculate(seed, m_InfluencerUniqueStrings);
}
void CUniqueStringFeatureData::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CUniqueStringFeatureData", sizeof(*this));
core::memory_debug::dynamicSize("s_NoInfluenceUniqueStrings", m_UniqueStrings, mem);
core::memory_debug::dynamicSize("s_InfluenceUniqueStrings",
m_InfluencerUniqueStrings, mem);
}
std::size_t CUniqueStringFeatureData::memoryUsage() const {
std::size_t mem = sizeof(*this);
mem += core::memory::dynamicSize(m_UniqueStrings);
mem += core::memory::dynamicSize(m_InfluencerUniqueStrings);
return mem;
}
std::string CUniqueStringFeatureData::print() const {
return "(" + core::CContainerPrinter::print(m_UniqueStrings) + ", " +
core::CContainerPrinter::print(m_InfluencerUniqueStrings) + ")";
}
}
}