lib/model/CBucketGatherer.cc (535 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CBucketGatherer.h>
#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <core/CStatePersistInserter.h>
#include <core/CStateRestoreTraverser.h>
#include <core/CStringUtils.h>
#include <core/RestoreMacros.h>
#include <maths/common/CChecksum.h>
#include <maths/common/CIntegerTools.h>
#include <maths/common/COrderings.h>
#include <model/CDataGatherer.h>
#include <boost/tuple/tuple.hpp>
#include <algorithm>
#include <map>
namespace ml {
namespace model {
namespace {
// We use short field names to reduce the state size
const std::string BUCKET_START_TAG("b");
const std::string BUCKET_COUNT_TAG("k");
const std::string INFLUENCERS_COUNT_TAG("l");
const std::string BUCKET_EXPLICIT_NULLS_TAG("m");
namespace detail {
using TSizeSizePr = std::pair<std::size_t, std::size_t>;
using TSizeSizePrUInt64Pr = std::pair<TSizeSizePr, std::uint64_t>;
using TSizeSizePrOptionalStrPrUInt64UMap = CBucketGatherer::TSizeSizePrOptionalStrPrUInt64UMap;
using TSizeSizePrOptionalStrPrUInt64UMapCItr = CBucketGatherer::TSizeSizePrOptionalStrPrUInt64UMapCItr;
const std::string PERSON_ATTRIBUTE_COUNT_TAG("f");
const std::string PERSON_UID_TAG("a");
const std::string ATTRIBUTE_UID_TAG("b");
const std::string COUNT_TAG("c");
const std::string INFLUENCER_TAG("d");
const std::string INFLUENCE_ITEM_TAG("a");
const std::string INFLUENCE_COUNT_TAG("b");
const std::string EMPTY_MAP_TAG("e");
//! Persist a person, attribute and count tuple.
void insertPersonAttributeCounts(const TSizeSizePrUInt64Pr& tuple,
core::CStatePersistInserter& inserter) {
inserter.insertValue(PERSON_UID_TAG, CDataGatherer::extractPersonId(tuple));
inserter.insertValue(ATTRIBUTE_UID_TAG, CDataGatherer::extractAttributeId(tuple));
inserter.insertValue(COUNT_TAG, CDataGatherer::extractData(tuple));
}
//! Restore a person, attribute and count.
bool restorePersonAttributeCounts(core::CStateRestoreTraverser& traverser,
TSizeSizePr& key,
std::uint64_t& count) {
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(PERSON_UID_TAG, key.first)
RESTORE_BUILT_IN(ATTRIBUTE_UID_TAG, key.second)
RESTORE_BUILT_IN(COUNT_TAG, count)
} while (traverser.next());
return true;
}
//! Persist a collection of influencer person and attribute counts.
void insertInfluencerPersonAttributeCounts(const TSizeSizePrOptionalStrPrUInt64UMap& map,
core::CStatePersistInserter& inserter) {
std::vector<TSizeSizePrOptionalStrPrUInt64UMapCItr> ordered;
ordered.reserve(map.size());
for (auto i = map.begin(); i != map.end(); ++i) {
ordered.push_back(i);
}
std::sort(ordered.begin(), ordered.end(), [](auto lhs, auto rhs) {
return maths::common::COrderings::lexicographicalCompare(
lhs->first.first, lhs->first.second, lhs->second, rhs->first.first,
rhs->first.second, rhs->second);
});
if (ordered.empty()) {
inserter.insertValue(EMPTY_MAP_TAG, "");
}
for (auto& pair : ordered) {
inserter.insertValue(PERSON_UID_TAG, CDataGatherer::extractPersonId(pair->first));
inserter.insertValue(ATTRIBUTE_UID_TAG,
CDataGatherer::extractAttributeId(pair->first));
inserter.insertValue(INFLUENCER_TAG, CDataGatherer::extractData(pair->first));
inserter.insertValue(COUNT_TAG, pair->second);
}
}
//! Restore a collection of influencer person and attribute counts.
bool restoreInfluencerPersonAttributeCounts(core::CStateRestoreTraverser& traverser,
TSizeSizePrOptionalStrPrUInt64UMap& map) {
std::size_t person = 0;
std::size_t attribute = 0;
std::string influence;
std::uint64_t count = 0;
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(PERSON_UID_TAG, person)
RESTORE_BUILT_IN(ATTRIBUTE_UID_TAG, attribute)
RESTORE_NO_ERROR(INFLUENCER_TAG, influence = traverser.value())
if (name == COUNT_TAG) {
if (core::CStringUtils::stringToType(traverser.value(), count) == false) {
LOG_ERROR(<< "Failed to restore COUNT_TAG, got " << traverser.value());
return false;
}
map[{{person, attribute}, influence}] = count;
}
} while (traverser.next());
return true;
}
//! \brief Manages persistence of bucket counts.
struct SBucketCountsPersister {
using TSizeSizePrUInt64UMap = CBucketGatherer::TSizeSizePrUInt64UMap;
void operator()(const TSizeSizePrUInt64UMap& bucketCounts,
core::CStatePersistInserter& inserter) {
CBucketGatherer::TSizeSizePrUInt64PrVec personAttributeCounts;
personAttributeCounts.reserve(bucketCounts.size());
personAttributeCounts.assign(bucketCounts.begin(), bucketCounts.end());
std::sort(personAttributeCounts.begin(), personAttributeCounts.end());
for (std::size_t i = 0; i < personAttributeCounts.size(); ++i) {
inserter.insertLevel(
PERSON_ATTRIBUTE_COUNT_TAG, [tuple = std::cref(personAttributeCounts[i])](
core::CStatePersistInserter & inserter_) {
insertPersonAttributeCounts(tuple, inserter_);
});
}
}
bool operator()(TSizeSizePrUInt64UMap& bucketCounts,
core::CStateRestoreTraverser& traverser) {
do {
TSizeSizePr key;
std::uint64_t count{0U};
if (!traverser.hasSubLevel()) {
continue;
}
if (traverser.traverseSubLevel([&key, &count](core::CStateRestoreTraverser& traverser_) {
return restorePersonAttributeCounts(traverser_, key, count);
}) == false) {
LOG_ERROR(<< "Invalid person attribute count");
continue;
}
bucketCounts[key] = count;
} while (traverser.next());
return true;
}
};
//! \brief Manages persistence influencer bucket counts.
struct SInfluencerCountsPersister {
using TSizeSizePrOptionalStrPrUInt64UMapVec = CBucketGatherer::TSizeSizePrOptionalStrPrUInt64UMapVec;
void operator()(const TSizeSizePrOptionalStrPrUInt64UMapVec& data,
core::CStatePersistInserter& inserter) {
for (std::size_t i = 0; i < data.size(); ++i) {
inserter.insertValue(INFLUENCE_COUNT_TAG, i);
inserter.insertLevel(INFLUENCE_ITEM_TAG, [map = std::cref(data[i])](
core::CStatePersistInserter & inserter_) {
insertInfluencerPersonAttributeCounts(map, inserter_);
});
}
}
bool operator()(TSizeSizePrOptionalStrPrUInt64UMapVec& data,
core::CStateRestoreTraverser& traverser) const {
std::size_t i = 0;
do {
const std::string name = traverser.name();
RESTORE_BUILT_IN(INFLUENCE_COUNT_TAG, i)
RESTORE_SETUP_TEARDOWN(
INFLUENCE_ITEM_TAG, data.resize(std::max(data.size(), i + 1)),
traverser.traverseSubLevel(
std::bind(&restoreInfluencerPersonAttributeCounts,
std::placeholders::_1, std::ref(data[i]))),
/**/)
} while (traverser.next());
return true;
}
};
} // detail::
} // unnamed::
const std::string CBucketGatherer::EVENTRATE_BUCKET_GATHERER_TAG("a");
const std::string CBucketGatherer::METRIC_BUCKET_GATHERER_TAG("b");
CBucketGatherer::CBucketGatherer(CDataGatherer& dataGatherer,
const SBucketGathererInitData& initData)
: m_DataGatherer(dataGatherer), m_EarliestTime(initData.s_StartTime),
m_BucketStart(initData.s_StartTime),
m_PersonAttributeCounts(dataGatherer.params().s_LatencyBuckets,
dataGatherer.params().s_BucketLength,
initData.s_StartTime,
TSizeSizePrUInt64UMap(1)),
m_PersonAttributeExplicitNulls(dataGatherer.params().s_LatencyBuckets,
dataGatherer.params().s_BucketLength,
initData.s_StartTime,
TSizeSizePrUSet(1)),
m_InfluencerCounts(dataGatherer.params().s_LatencyBuckets + 3,
dataGatherer.params().s_BucketLength,
initData.s_StartTime,
TSizeSizePrOptionalStrPrUInt64UMapVec(
initData.s_InfluenceFieldNames.size())) {
}
CBucketGatherer::CBucketGatherer(bool isForPersistence, const CBucketGatherer& other)
: m_DataGatherer(other.m_DataGatherer),
m_EarliestTime(other.m_EarliestTime), m_BucketStart(other.m_BucketStart),
m_PersonAttributeCounts(other.m_PersonAttributeCounts),
m_PersonAttributeExplicitNulls(other.m_PersonAttributeExplicitNulls),
m_InfluencerCounts(other.m_InfluencerCounts) {
if (!isForPersistence) {
LOG_ABORT(<< "This constructor only creates clones for persistence");
}
}
bool CBucketGatherer::addEventData(CEventData& data) {
core_t::TTime const time = data.time();
if (time < this->earliestBucketStartTime()) {
// Ignore records that are out of the latency window
// Records in an incomplete first bucket will end up here
LOG_TRACE(<< "Ignored = " << time);
return false;
}
this->timeNow(time);
if (!data.personId() || !data.attributeId() || !data.count()) {
// The record was incomplete.
return false;
}
std::size_t const pid = *data.personId();
std::size_t const cid = *data.attributeId();
std::size_t const count = *data.count();
if ((pid != CDynamicStringIdRegistry::INVALID_ID) &&
(cid != CDynamicStringIdRegistry::INVALID_ID)) {
// Has the person/attribute been deleted from the gatherer?
if (!m_DataGatherer.isPersonActive(pid)) {
LOG_DEBUG(<< "Not adding value for deleted person " << pid);
return false;
}
if (m_DataGatherer.isPopulation() && !m_DataGatherer.isAttributeActive(cid)) {
LOG_DEBUG(<< "Not adding value for deleted attribute " << cid);
return false;
}
TSizeSizePr const pidCid = std::make_pair(pid, cid);
// If record is explicit null just note that a null record has been seen
// for the given (pid, cid) pair.
if (data.isExplicitNull()) {
TSizeSizePrUSet& bucketExplicitNulls =
m_PersonAttributeExplicitNulls.get(time);
bucketExplicitNulls.insert(pidCid);
return true;
}
TSizeSizePrUInt64UMap& bucketCounts = m_PersonAttributeCounts.get(time);
if (count > 0) {
bucketCounts[pidCid] += count;
}
const CEventData::TOptionalStrVec& influences = data.influences();
auto& influencerCounts = m_InfluencerCounts.get(time);
if (influences.size() != influencerCounts.size()) {
LOG_ERROR(<< "Unexpected influences: " << influences << " expected "
<< core::CContainerPrinter::print(this->beginInfluencers(),
this->endInfluencers()));
return false;
}
TOptionalStrVec canonicalInfluences(influencerCounts.size());
for (std::size_t i = 0; i < influences.size(); ++i) {
const CEventData::TOptionalStr& influence = influences[i];
if (influence) {
const std::string& inf = *influence;
canonicalInfluences[i] = inf;
if (count > 0) {
influencerCounts[i]
.emplace(boost::unordered::piecewise_construct,
boost::make_tuple(pidCid, inf),
boost::make_tuple(static_cast<std::uint64_t>(0)))
.first->second += count;
}
}
}
this->addValue(pid, cid, time, data.values(), count, data.stringValue(),
canonicalInfluences);
}
return true;
}
void CBucketGatherer::timeNow(core_t::TTime time) {
this->hiddenTimeNow(time, false);
}
void CBucketGatherer::hiddenTimeNow(core_t::TTime time, bool skipUpdates) {
m_EarliestTime = std::min(m_EarliestTime, time);
core_t::TTime const n = (time - m_BucketStart) / this->bucketLength();
core_t::TTime newBucketStart = m_BucketStart;
for (core_t::TTime i = 0; i < n; ++i) {
newBucketStart += this->bucketLength();
// The order here is important. While starting new buckets
// the gatherers may finalise the earliest bucket within
// the latency window, thus we push a new count bucket only
// after startNewBucket has been called.
std::ptrdiff_t const numberInfluences{this->endInfluencers() -
this->beginInfluencers()};
this->startNewBucket(newBucketStart, skipUpdates);
m_PersonAttributeCounts.push(TSizeSizePrUInt64UMap(1), newBucketStart);
m_PersonAttributeExplicitNulls.push(TSizeSizePrUSet(1), newBucketStart);
m_InfluencerCounts.push(TSizeSizePrOptionalStrPrUInt64UMapVec(numberInfluences),
newBucketStart);
m_BucketStart = newBucketStart;
}
}
void CBucketGatherer::sampleNow(core_t::TTime sampleBucketStart) {
core_t::TTime const timeNow =
sampleBucketStart +
((m_DataGatherer.params().s_LatencyBuckets + 1) * this->bucketLength()) - 1;
this->timeNow(timeNow);
this->sample(sampleBucketStart);
}
void CBucketGatherer::skipSampleNow(core_t::TTime sampleBucketStart) {
core_t::TTime const timeNow =
sampleBucketStart +
((m_DataGatherer.params().s_LatencyBuckets + 1) * this->bucketLength()) - 1;
this->hiddenTimeNow(timeNow, true);
}
void CBucketGatherer::personNonZeroCounts(core_t::TTime time, TSizeUInt64PrVec& result) const {
using TSizeUInt64Map = std::map<std::size_t, std::uint64_t>;
result.clear();
if (!this->dataAvailable(time)) {
LOG_ERROR(<< "No statistics at " << time
<< ", current bucket = " << this->printCurrentBucket());
return;
}
TSizeUInt64Map personCounts;
for (const auto& count : this->bucketCounts(time)) {
personCounts[CDataGatherer::extractPersonId(count)] +=
CDataGatherer::extractData(count);
}
result.reserve(personCounts.size());
result.assign(personCounts.begin(), personCounts.end());
}
void CBucketGatherer::recyclePeople(const TSizeVec& peopleToRemove) {
if (!peopleToRemove.empty()) {
remove(peopleToRemove, CDataGatherer::SExtractPersonId(), m_PersonAttributeCounts);
remove(peopleToRemove, CDataGatherer::SExtractPersonId(), m_PersonAttributeExplicitNulls);
remove(peopleToRemove, CDataGatherer::SExtractPersonId(), m_InfluencerCounts);
}
}
void CBucketGatherer::removePeople(std::size_t lowestPersonToRemove) {
if (lowestPersonToRemove < m_DataGatherer.numberPeople()) {
TSizeVec peopleToRemove;
std::size_t const maxPersonId = m_DataGatherer.numberPeople();
peopleToRemove.reserve(maxPersonId - lowestPersonToRemove);
for (std::size_t pid = lowestPersonToRemove; pid < maxPersonId; ++pid) {
peopleToRemove.push_back(pid);
}
remove(peopleToRemove, CDataGatherer::SExtractPersonId(), m_PersonAttributeCounts);
remove(peopleToRemove, CDataGatherer::SExtractPersonId(), m_PersonAttributeExplicitNulls);
remove(peopleToRemove, CDataGatherer::SExtractPersonId(), m_InfluencerCounts);
}
}
void CBucketGatherer::recycleAttributes(const TSizeVec& attributesToRemove) {
if (!attributesToRemove.empty()) {
remove(attributesToRemove, CDataGatherer::SExtractAttributeId(), m_PersonAttributeCounts);
remove(attributesToRemove, CDataGatherer::SExtractAttributeId(),
m_PersonAttributeExplicitNulls);
remove(attributesToRemove, CDataGatherer::SExtractAttributeId(), m_InfluencerCounts);
}
}
void CBucketGatherer::removeAttributes(std::size_t lowestAttributeToRemove) {
if (lowestAttributeToRemove < m_DataGatherer.numberAttributes()) {
TSizeVec attributesToRemove;
const std::size_t numAttributes = m_DataGatherer.numberAttributes();
attributesToRemove.reserve(numAttributes - lowestAttributeToRemove);
for (std::size_t cid = lowestAttributeToRemove; cid < numAttributes; ++cid) {
attributesToRemove.push_back(cid);
}
remove(attributesToRemove, CDataGatherer::SExtractAttributeId(), m_PersonAttributeCounts);
remove(attributesToRemove, CDataGatherer::SExtractAttributeId(),
m_PersonAttributeExplicitNulls);
remove(attributesToRemove, CDataGatherer::SExtractAttributeId(), m_InfluencerCounts);
}
}
core_t::TTime CBucketGatherer::currentBucketStartTime() const {
return m_BucketStart;
}
core_t::TTime CBucketGatherer::earliestBucketStartTime() const {
return this->currentBucketStartTime() -
(m_DataGatherer.params().s_LatencyBuckets * this->bucketLength());
}
core_t::TTime CBucketGatherer::bucketLength() const {
return m_DataGatherer.params().s_BucketLength;
}
bool CBucketGatherer::dataAvailable(core_t::TTime time) const {
return time >= m_EarliestTime && time >= this->earliestBucketStartTime();
}
bool CBucketGatherer::validateSampleTimes(core_t::TTime& startTime, core_t::TTime endTime) const {
// Sanity checks:
// 1) The start and end times are aligned to bucket boundaries.
// 2) The end time is greater than the start time,
// 3) The start time is greater than or equal to the start time
// of the current bucket of the counter,
// 4) The start time is greater than or equal to the start time
// of the last sampled bucket
if (!maths::common::CIntegerTools::aligned(startTime - m_BucketStart,
this->bucketLength())) {
LOG_ERROR(<< "Sample start time " << startTime << " is not bucket aligned");
return false;
}
if (!maths::common::CIntegerTools::aligned(endTime - m_BucketStart, this->bucketLength())) {
LOG_ERROR(<< "Sample end time " << endTime << " is not bucket aligned");
return false;
}
if (endTime <= startTime) {
LOG_ERROR(<< "End time " << endTime
<< " is not greater than the start time " << startTime);
return false;
}
for (/**/; startTime < endTime; startTime += this->bucketLength()) {
if (!this->dataAvailable(startTime)) {
LOG_ERROR(<< "No counts available at " << startTime
<< ", current bucket = " << this->printCurrentBucket());
continue;
}
return true;
}
return false;
}
const CDataGatherer& CBucketGatherer::dataGatherer() const {
return m_DataGatherer;
}
std::string CBucketGatherer::printCurrentBucket() const {
std::ostringstream result;
result << "[" << m_BucketStart << "," << m_BucketStart + this->bucketLength() << ")";
return result.str();
}
const CBucketGatherer::TSizeSizePrUInt64UMap&
CBucketGatherer::bucketCounts(core_t::TTime time) const {
return m_PersonAttributeCounts.get(time);
}
const CBucketGatherer::TSizeSizePrOptionalStrPrUInt64UMapVec&
CBucketGatherer::influencerCounts(core_t::TTime time) const {
return m_InfluencerCounts.get(time);
}
bool CBucketGatherer::hasExplicitNullsOnly(core_t::TTime time, std::size_t pid, std::size_t cid) const {
const TSizeSizePrUSet& bucketExplicitNulls = m_PersonAttributeExplicitNulls.get(time);
if (bucketExplicitNulls.empty()) {
return false;
}
const TSizeSizePrUInt64UMap& bucketCounts = m_PersonAttributeCounts.get(time);
TSizeSizePr const pidCid = std::make_pair(pid, cid);
return bucketExplicitNulls.find(pidCid) != bucketExplicitNulls.end() &&
bucketCounts.find(pidCid) == bucketCounts.end();
}
std::uint64_t CBucketGatherer::checksum() const {
using TStrCRef = std::reference_wrapper<const std::string>;
using TStrCRefStrCRefPr = std::pair<TStrCRef, TStrCRef>;
using TStrCRefStrCRefPrVec = std::vector<TStrCRefStrCRefPr>;
using TStrCRefStrCRefPrUInt64Pr = std::pair<TStrCRefStrCRefPr, std::uint64_t>;
using TStrCRefStrCRefPrUInt64PrVec = std::vector<TStrCRefStrCRefPrUInt64Pr>;
std::uint64_t result = maths::common::CChecksum::calculate(0, m_BucketStart);
result = maths::common::CChecksum::calculate(
result, m_PersonAttributeCounts.latestBucketEnd());
for (const auto& bucketCounts : m_PersonAttributeCounts) {
TStrCRefStrCRefPrUInt64PrVec personAttributeCounts;
personAttributeCounts.reserve(bucketCounts.size());
for (const auto& count : bucketCounts) {
std::size_t const pid = CDataGatherer::extractPersonId(count);
std::size_t const cid = CDataGatherer::extractAttributeId(count);
TStrCRefStrCRefPr const key(TStrCRef(m_DataGatherer.personName(pid)),
TStrCRef(m_DataGatherer.attributeName(cid)));
personAttributeCounts.emplace_back(key, CDataGatherer::extractData(count));
}
std::sort(personAttributeCounts.begin(), personAttributeCounts.end(),
maths::common::COrderings::SLess());
result = maths::common::CChecksum::calculate(result, personAttributeCounts);
}
result = maths::common::CChecksum::calculate(
result, m_PersonAttributeExplicitNulls.latestBucketEnd());
for (const auto& bucketExplicitNulls : m_PersonAttributeExplicitNulls) {
TStrCRefStrCRefPrVec personAttributeExplicitNulls;
personAttributeExplicitNulls.reserve(bucketExplicitNulls.size());
for (const auto& nulls : bucketExplicitNulls) {
std::size_t const pid = CDataGatherer::extractPersonId(nulls);
std::size_t const cid = CDataGatherer::extractAttributeId(nulls);
TStrCRefStrCRefPr const key(TStrCRef(m_DataGatherer.personName(pid)),
TStrCRef(m_DataGatherer.attributeName(cid)));
personAttributeExplicitNulls.push_back(key);
}
std::sort(personAttributeExplicitNulls.begin(),
personAttributeExplicitNulls.end(), maths::common::COrderings::SLess());
result = maths::common::CChecksum::calculate(result, personAttributeExplicitNulls);
}
LOG_TRACE(<< "checksum = " << result);
return result;
}
void CBucketGatherer::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CBucketGatherer");
core::memory_debug::dynamicSize("m_PersonAttributeCounts", m_PersonAttributeCounts, mem);
core::memory_debug::dynamicSize("m_PersonAttributeExplicitNulls",
m_PersonAttributeExplicitNulls, mem);
core::memory_debug::dynamicSize("m_Influencers", m_InfluencerCounts, mem);
}
std::size_t CBucketGatherer::memoryUsage() const {
std::size_t mem = core::memory::dynamicSize(m_PersonAttributeCounts);
mem += core::memory::dynamicSize(m_PersonAttributeExplicitNulls);
mem += core::memory::dynamicSize(m_InfluencerCounts);
return mem;
}
void CBucketGatherer::clear() {
m_PersonAttributeCounts.clear(TSizeSizePrUInt64UMap(1));
m_PersonAttributeExplicitNulls.clear(TSizeSizePrUSet(1));
m_InfluencerCounts.clear(TSizeSizePrOptionalStrPrUInt64UMapVec(
this->endInfluencers() - this->beginInfluencers()));
}
bool CBucketGatherer::resetBucket(core_t::TTime bucketStart) {
if (!maths::common::CIntegerTools::aligned(bucketStart, this->bucketLength())) {
LOG_ERROR(<< "Bucket start time " << bucketStart << " is not bucket aligned");
return false;
}
if (!this->dataAvailable(bucketStart) ||
bucketStart >= this->currentBucketStartTime() + this->bucketLength()) {
LOG_WARN(<< "No data available at " << bucketStart
<< ", current bucket = " << this->printCurrentBucket());
return false;
}
LOG_TRACE(<< "Resetting bucket starting at " << bucketStart);
std::ptrdiff_t const numberInfluences{this->endInfluencers() - this->beginInfluencers()};
m_PersonAttributeCounts.get(bucketStart).clear();
m_PersonAttributeExplicitNulls.get(bucketStart).clear();
m_InfluencerCounts.get(bucketStart) = TSizeSizePrOptionalStrPrUInt64UMapVec(numberInfluences);
return true;
}
void CBucketGatherer::baseAcceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(BUCKET_START_TAG, m_BucketStart);
inserter.insertLevel(
BUCKET_COUNT_TAG,
std::bind<void>(TSizeSizePrUInt64UMapQueue::CSerializer<detail::SBucketCountsPersister>(),
std::cref(m_PersonAttributeCounts), std::placeholders::_1));
// Clear any empty collections before persist these are resized on restore.
TSizeSizePrOptionalStrPrUInt64UMapVecQueue const influencerCounts{m_InfluencerCounts};
inserter.insertLevel(
INFLUENCERS_COUNT_TAG,
std::bind<void>(TSizeSizePrOptionalStrPrUInt64UMapVecQueue::CSerializer<detail::SInfluencerCountsPersister>(),
std::cref(m_InfluencerCounts), std::placeholders::_1));
core::CPersistUtils::persist(BUCKET_EXPLICIT_NULLS_TAG,
m_PersonAttributeExplicitNulls, inserter);
}
bool CBucketGatherer::baseAcceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
this->clear();
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(BUCKET_START_TAG, m_BucketStart)
RESTORE_SETUP_TEARDOWN(
BUCKET_COUNT_TAG,
m_PersonAttributeCounts = TSizeSizePrUInt64UMapQueue(
m_DataGatherer.params().s_LatencyBuckets, this->bucketLength(),
m_BucketStart, TSizeSizePrUInt64UMap(1)),
traverser.traverseSubLevel(std::bind<bool>(
TSizeSizePrUInt64UMapQueue::CSerializer<detail::SBucketCountsPersister>(
TSizeSizePrUInt64UMap(1)),
std::ref(m_PersonAttributeCounts), std::placeholders::_1)),
/**/)
RESTORE_SETUP_TEARDOWN(
INFLUENCERS_COUNT_TAG,
m_InfluencerCounts = TSizeSizePrOptionalStrPrUInt64UMapVecQueue(
m_DataGatherer.params().s_LatencyBuckets + 3, this->bucketLength(), m_BucketStart),
traverser.traverseSubLevel(std::bind<bool>(
TSizeSizePrOptionalStrPrUInt64UMapVecQueue::CSerializer<detail::SInfluencerCountsPersister>(),
std::ref(m_InfluencerCounts), std::placeholders::_1)),
/**/)
RESTORE_SETUP_TEARDOWN(
BUCKET_EXPLICIT_NULLS_TAG,
m_PersonAttributeExplicitNulls = TSizeSizePrUSetQueue(
m_DataGatherer.params().s_LatencyBuckets, this->bucketLength(),
m_BucketStart, TSizeSizePrUSet(1)),
core::CPersistUtils::restore(BUCKET_EXPLICIT_NULLS_TAG,
m_PersonAttributeExplicitNulls, traverser),
/**/)
} while (traverser.next());
return true;
}
}
}