lib/model/CCountingModel.cc (374 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CCountingModel.h>
#include <core/CAllocationStrategy.h>
#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <core/CPersistUtils.h>
#include <core/CoreTypes.h>
#include <maths/common/CBasicStatisticsPersist.h>
#include <maths/common/CChecksum.h>
#include <maths/common/COrderings.h>
#include <model/CAnnotatedProbability.h>
#include <model/CAnnotation.h>
#include <model/CDataGatherer.h>
#include <model/CInterimBucketCorrector.h>
#include <model/CModelDetailsView.h>
#include <model/CSearchKey.h>
#include <boost/unordered_set.hpp>
namespace ml {
namespace model {
namespace {
const std::string WINDOW_BUCKET_COUNT_TAG("a");
const std::string PERSON_BUCKET_COUNT_TAG("b");
const std::string MEAN_COUNT_TAG("c");
const std::string APPLIED_DETECTION_RULE_CHECKSUMS_TAG("d");
// Extra data tag deprecated at model version 34
// TODO remove on next version bump
//const std::string EXTRA_DATA_TAG("d");
//const std::string INTERIM_BUCKET_CORRECTOR_TAG("e");
const CCountingModel::TStr1Vec EMPTY_STRING_LIST;
}
CCountingModel::CCountingModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TInterimBucketCorrectorPtr& interimBucketCorrector)
: CAnomalyDetectorModel(params, dataGatherer, {}),
m_StartTime(CAnomalyDetectorModel::TIME_UNSET),
m_InterimBucketCorrector(interimBucketCorrector) {
}
CCountingModel::CCountingModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TInterimBucketCorrectorPtr& interimBucketCorrector,
core::CStateRestoreTraverser& traverser)
: CAnomalyDetectorModel(params, dataGatherer, {}),
m_StartTime(CAnomalyDetectorModel::TIME_UNSET),
m_InterimBucketCorrector(interimBucketCorrector) {
if (traverser.traverseSubLevel(std::bind(&CCountingModel::acceptRestoreTraverser,
this, std::placeholders::_1)) == false) {
traverser.setBadState();
}
}
CCountingModel::CCountingModel(bool isForPersistence, const CCountingModel& other)
: CAnomalyDetectorModel(isForPersistence, other), m_StartTime(0),
m_MeanCounts(other.m_MeanCounts) {
if (!isForPersistence) {
LOG_ABORT(<< "This constructor only creates clones for persistence");
}
}
void CCountingModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(WINDOW_BUCKET_COUNT_TAG, this->windowBucketCount(),
core::CIEEE754::E_SinglePrecision);
core::CPersistUtils::persist(PERSON_BUCKET_COUNT_TAG,
this->personBucketCounts(), inserter);
core::CPersistUtils::persist(MEAN_COUNT_TAG, m_MeanCounts, inserter);
core::CPersistUtils::persist(APPLIED_DETECTION_RULE_CHECKSUMS_TAG,
this->appliedRuleChecksums(), inserter);
}
bool CCountingModel::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
do {
const std::string& name = traverser.name();
if (name == WINDOW_BUCKET_COUNT_TAG) {
double count;
if (core::CStringUtils::stringToType(traverser.value(), count) == false) {
LOG_ERROR(<< "Invalid bucket count in " << traverser.value());
return false;
}
this->windowBucketCount(count);
} else if (name == PERSON_BUCKET_COUNT_TAG) {
if (core::CPersistUtils::restore(name, this->personBucketCounts(),
traverser) == false) {
LOG_ERROR(<< "Invalid bucket counts in " << traverser.value());
return false;
}
} else if (name == MEAN_COUNT_TAG) {
if (core::CPersistUtils::restore(name, m_MeanCounts, traverser) == false) {
LOG_ERROR(<< "Invalid mean counts");
return false;
}
} else if (name == APPLIED_DETECTION_RULE_CHECKSUMS_TAG) {
if (core::CPersistUtils::restore(name, this->appliedRuleChecksums(),
traverser) == false) {
LOG_ERROR(<< "Invalid applied detection rule checksums");
return false;
}
}
} while (traverser.next());
return true;
}
CAnomalyDetectorModel* CCountingModel::cloneForPersistence() const {
return new CCountingModel(true, *this);
}
model_t::EModelType CCountingModel::category() const {
return model_t::E_Counting;
}
bool CCountingModel::isPopulation() const {
return false;
}
bool CCountingModel::isEventRate() const {
return false;
}
bool CCountingModel::isMetric() const {
return false;
}
CCountingModel::TOptionalUInt64
CCountingModel::currentBucketCount(std::size_t pid, core_t::TTime time) const {
if (!this->bucketStatsAvailable(time)) {
LOG_ERROR(<< "No statistics at " << time
<< ", current bucket = " << this->printCurrentBucket());
return {};
}
auto result = std::lower_bound(m_Counts.begin(), m_Counts.end(), pid,
maths::common::COrderings::SFirstLess());
return result != m_Counts.end() && result->first == pid
? result->second
: static_cast<std::uint64_t>(0);
}
CCountingModel::TOptionalDouble CCountingModel::baselineBucketCount(std::size_t pid) const {
return pid < m_MeanCounts.size()
? maths::common::CBasicStatistics::mean(m_MeanCounts[pid])
: 0.0;
}
CCountingModel::TDouble1Vec CCountingModel::currentBucketValue(model_t::EFeature /*feature*/,
std::size_t pid,
std::size_t /*cid*/,
core_t::TTime time) const {
TOptionalUInt64 count = this->currentBucketCount(pid, time);
return count ? TDouble1Vec{static_cast<double>(*count)} : TDouble1Vec{};
}
CCountingModel::TDouble1Vec
CCountingModel::baselineBucketMean(model_t::EFeature /*feature*/,
std::size_t pid,
std::size_t /*cid*/,
model_t::CResultType /*type*/,
const TSizeDoublePr1Vec& /*correlated*/,
core_t::TTime /*time*/) const {
TOptionalDouble count = this->baselineBucketCount(pid);
return count ? TDouble1Vec{*count} : TDouble1Vec{};
}
void CCountingModel::currentBucketPersonIds(core_t::TTime time, TSizeVec& result) const {
using TSizeUSet = boost::unordered_set<std::size_t>;
result.clear();
if (!this->bucketStatsAvailable(time)) {
LOG_ERROR(<< "No statistics at " << time
<< ", current bucket = " << this->printCurrentBucket());
return;
}
TSizeUSet people;
for (const auto& count : m_Counts) {
people.insert(count.first);
}
result.reserve(people.size());
result.assign(people.begin(), people.end());
}
void CCountingModel::sampleBucketStatistics(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& /*resourceMonitor*/) {
CDataGatherer& gatherer = this->dataGatherer();
m_ScheduledEventDescriptions.clear();
if (!gatherer.dataAvailable(startTime)) {
return;
}
core_t::TTime bucketLength = gatherer.bucketLength();
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
gatherer.timeNow(time);
const auto& counts = gatherer.bucketCounts(time);
std::uint64_t totalCount{0u};
for (const auto& count : counts) {
totalCount += CDataGatherer::extractData(count);
}
m_InterimBucketCorrector->currentBucketCount(time, totalCount);
this->updateCurrentBucketsStats(time);
// Check for scheduled events
core_t::TTime sampleTime = model_t::sampleTime(
model_t::E_IndividualCountByBucketAndPerson, time, bucketLength);
setMatchedEventsDescriptions(sampleTime, time);
}
}
void CCountingModel::sample(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) {
CDataGatherer& gatherer = this->dataGatherer();
m_ScheduledEventDescriptions.clear();
if (!gatherer.validateSampleTimes(startTime, endTime)) {
return;
}
this->createUpdateNewModels(startTime, resourceMonitor);
core_t::TTime bucketLength = gatherer.bucketLength();
for (core_t::TTime time = startTime; time < endTime; time += bucketLength) {
gatherer.sampleNow(time);
this->CAnomalyDetectorModel::sample(time, time + bucketLength, resourceMonitor);
this->updateCurrentBucketsStats(time);
std::uint64_t totalCount{0};
for (const auto& count : m_Counts) {
m_MeanCounts[count.first].add(static_cast<double>(count.second));
totalCount += count.second;
}
m_InterimBucketCorrector->finalBucketCount(time, totalCount);
// Check for scheduled events
core_t::TTime sampleTime = model_t::sampleTime(
model_t::E_IndividualCountByBucketAndPerson, time, bucketLength);
setMatchedEventsDescriptions(sampleTime, time);
}
}
void CCountingModel::setMatchedEventsDescriptions(core_t::TTime sampleTime,
core_t::TTime bucketStartTime) {
SModelParams::TStrDetectionRulePrVec matchedEvents = this->checkScheduledEvents(sampleTime);
if (matchedEvents.empty() == false) {
TStr1Vec descriptions;
for (auto& event : matchedEvents) {
descriptions.push_back(event.first);
}
m_ScheduledEventDescriptions[bucketStartTime] = descriptions;
}
}
SModelParams::TStrDetectionRulePrVec
CCountingModel::checkScheduledEvents(core_t::TTime sampleTime) const {
const SModelParams::TStrDetectionRulePrVec& events =
this->params().s_ScheduledEvents.get();
SModelParams::TStrDetectionRulePrVec matchedEvents;
for (auto& event : events) {
// Note that as the counting model is not aware of partitions
// scheduled events cannot support partitions as the code stands.
if (event.second.apply(CDetectionRule::E_SkipModelUpdate, std::cref(*this),
model_t::E_IndividualCountByBucketAndPerson,
model_t::CResultType(), model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID,
model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID, sampleTime)) {
matchedEvents.push_back(event);
}
}
return matchedEvents;
}
void CCountingModel::doSkipSampling(core_t::TTime /*startTime*/, core_t::TTime /*endTime*/) {
}
void CCountingModel::prune(std::size_t /*maximumAge*/) {
}
bool CCountingModel::computeProbability(std::size_t pid,
core_t::TTime startTime,
core_t::TTime endTime,
CPartitioningFields& /*partitioningFields*/,
std::size_t /*numberAttributeProbabilities*/,
SAnnotatedProbability& result) const {
result = SAnnotatedProbability(1.0);
result.s_CurrentBucketCount =
this->currentBucketCount(pid, (startTime + endTime + 1) / 2 - 1);
result.s_BaselineBucketCount = this->baselineBucketCount(pid);
return true;
}
bool CCountingModel::computeTotalProbability(const std::string& /*person*/,
std::size_t /*numberAttributeProbabilities*/,
TOptionalDouble& probability,
TAttributeProbability1Vec& attributeProbabilities) const {
probability.emplace(1.0);
attributeProbabilities.clear();
return true;
}
std::uint64_t CCountingModel::checksum(bool includeCurrentBucketStats) const {
std::uint64_t result = this->CAnomalyDetectorModel::checksum(includeCurrentBucketStats);
result = maths::common::CChecksum::calculate(result, m_MeanCounts);
if (includeCurrentBucketStats) {
result = maths::common::CChecksum::calculate(result, m_StartTime);
result = maths::common::CChecksum::calculate(result, m_Counts);
}
return result;
}
void CCountingModel::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("CCountingModel");
this->CAnomalyDetectorModel::debugMemoryUsage(mem->addChild());
core::memory_debug::dynamicSize("m_Counts", m_Counts, mem);
core::memory_debug::dynamicSize("m_MeanCounts", m_MeanCounts, mem);
core::memory_debug::dynamicSize("m_InterimBucketCorrector",
m_InterimBucketCorrector, mem);
}
std::size_t CCountingModel::memoryUsage() const {
std::size_t mem = this->CAnomalyDetectorModel::memoryUsage();
mem += core::memory::dynamicSize(m_Counts);
mem += core::memory::dynamicSize(m_MeanCounts);
mem += core::memory::dynamicSize(m_InterimBucketCorrector);
return mem;
}
std::size_t CCountingModel::computeMemoryUsage() const {
return this->memoryUsage();
}
std::size_t CCountingModel::staticSize() const {
return sizeof(*this);
}
CCountingModel::TModelDetailsViewUPtr CCountingModel::details() const {
return TModelDetailsViewUPtr();
}
core_t::TTime CCountingModel::currentBucketStartTime() const {
return m_StartTime;
}
void CCountingModel::currentBucketStartTime(core_t::TTime time) {
m_StartTime = time;
}
const CCountingModel::TStr1Vec&
CCountingModel::scheduledEventDescriptions(core_t::TTime time) const {
auto it = m_ScheduledEventDescriptions.find(time);
if (it == m_ScheduledEventDescriptions.end()) {
return EMPTY_STRING_LIST;
}
return it->second;
}
const CCountingModel::TAnnotationVec& CCountingModel::annotations() const {
return m_Annotations;
}
void CCountingModel::shiftTime(core_t::TTime time, core_t::TTime shift) {
// Since counting model does not have a trend mode, only the interim
// bucket corrector needs to be shifted.
m_InterimBucketCorrector->shiftTime(time, shift);
this->addAnnotation(time, CAnnotation::E_ModelChange,
"Counting model shifted time by " + std::to_string(shift) + " seconds");
}
double CCountingModel::attributeFrequency(std::size_t /*cid*/) const {
return 1.0;
}
void CCountingModel::createUpdateNewModels(core_t::TTime /*time*/,
CResourceMonitor& /*resourceMonitor*/) {
this->updateRecycledModels();
CDataGatherer& gatherer = this->dataGatherer();
std::size_t numberNewPeople = gatherer.numberPeople();
std::size_t numberExistingPeople = m_MeanCounts.size();
numberNewPeople = numberNewPeople > numberExistingPeople ? numberNewPeople - numberExistingPeople
: 0;
if (numberNewPeople > 0) {
LOG_TRACE(<< "Creating " << numberNewPeople << " new people");
this->createNewModels(numberNewPeople, 0);
}
}
void CCountingModel::createNewModels(std::size_t n, std::size_t m) {
if (n > 0) {
core::CAllocationStrategy::resize(m_MeanCounts, m_MeanCounts.size() + n);
}
this->CAnomalyDetectorModel::createNewModels(n, m);
}
void CCountingModel::updateCurrentBucketsStats(core_t::TTime time) {
CDataGatherer& gatherer = this->dataGatherer();
// Currently, we only remember one bucket.
m_StartTime = time;
gatherer.personNonZeroCounts(time, m_Counts);
// Results are only output if currentBucketPersonIds is
// not empty. Therefore, we need to explicitly set the
// count to 0 so that we output results.
if (m_Counts.empty()) {
m_Counts.emplace_back(0, 0);
}
}
void CCountingModel::updateRecycledModels() {
for (auto pid : this->dataGatherer().recycledPersonIds()) {
if (pid < m_MeanCounts.size()) {
m_MeanCounts[pid] = TMeanAccumulator();
}
}
this->CAnomalyDetectorModel::updateRecycledModels();
}
void CCountingModel::clearPrunedResources(const TSizeVec& /*people*/,
const TSizeVec& /*attributes*/) {
}
const CInterimBucketCorrector& CCountingModel::interimValueCorrector() const {
return *m_InterimBucketCorrector;
}
bool CCountingModel::bucketStatsAvailable(core_t::TTime time) const {
return time >= m_StartTime && time < m_StartTime + this->bucketLength();
}
std::string CCountingModel::printCurrentBucket() const {
std::ostringstream result;
result << "[" << m_StartTime << "," << m_StartTime + this->bucketLength() << ")";
return result.str();
}
CMemoryUsageEstimator* CCountingModel::memoryUsageEstimator() const {
return nullptr;
}
void CCountingModel::addAnnotation(core_t::TTime time,
CAnnotation::EEvent event,
const std::string& annotation) {
m_Annotations.emplace_back(time, event, annotation,
this->dataGatherer().searchKey().detectorIndex(),
EMPTY_STRING, EMPTY_STRING, EMPTY_STRING,
EMPTY_STRING, EMPTY_STRING, EMPTY_STRING);
}
}
}