lib/model/CAnomalyDetector.cc (595 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <model/CAnomalyDetector.h> #include <core/CLogger.h> #include <core/CMemoryDef.h> #include <core/CProgramCounters.h> #include <core/CStatePersistInserter.h> #include <core/CStateRestoreTraverser.h> #include <maths/common/CIntegerTools.h> #include <maths/common/CSampling.h> #include <model/CAnnotation.h> #include <model/CAnomalyDetectorModel.h> #include <model/CDataGatherer.h> #include <model/CForecastModelPersist.h> #include <model/CLimits.h> #include <model/CModelDetailsView.h> #include <model/CModelFactory.h> #include <model/CSearchKey.h> #include <sstream> #include <vector> namespace ml { namespace model { // We use short field names to reduce the state size namespace { using TModelDetailsViewUPtr = CAnomalyDetectorModel::TModelDetailsViewUPtr; // tag 'a' was previously used for persisting first time; // DO NOT USE; unless it is decided to break model state BWC const std::string MODEL_AND_GATHERER_TAG("b"); const std::string PARTITION_FIELD_VALUE_TAG("c"); const std::string KEY_TAG("d"); const std::string SIMPLE_COUNT_STATICS("f"); // classes containing static members needing persistence //const std::string RANDOMIZED_PERIODIC_TAG("a"); // No longer used const std::string PROGRAM_COUNTERS_TAG("b"); const std::string SAMPLING_TAG("c"); // tags for the parts that used to be in model ensemble. // !!! NOTE: Tags 'c' & 'e' were previously used for removed // state. If new state is added here, tags from `f` onwards // should be used in order not to break model state BWC. const std::string DATA_GATHERER_TAG("a"); const std::string MODELS_TAG("b"); const std::string MODEL_TAG("d"); CAnomalyDetector::TDataGathererPtr makeDataGatherer(const CAnomalyDetector::TModelFactoryCPtr& factory, core_t::TTime startTime, const std::string& partitionFieldValue) { CModelFactory::SGathererInitializationData initData(startTime, partitionFieldValue); return CAnomalyDetector::TDataGathererPtr(factory->makeDataGatherer(initData)); } CAnomalyDetector::TModelPtr makeModel(const CAnomalyDetector::TModelFactoryCPtr& factory, const CAnomalyDetector::TDataGathererPtr& dataGatherer) { CModelFactory::SModelInitializationData initData(dataGatherer); return CAnomalyDetector::TModelPtr(factory->makeModel(initData)); } } // Increment this every time a change to the state is made that requires // existing state to be discarded const std::string CAnomalyDetector::STATE_VERSION("34"); const std::string CAnomalyDetector::COUNT_NAME("count"); const std::string CAnomalyDetector::TIME_NAME("time"); const std::string CAnomalyDetector::DISTINCT_COUNT_NAME("distinct_count"); const std::string CAnomalyDetector::RARE_NAME("rare"); const std::string CAnomalyDetector::INFO_CONTENT_NAME("info_content"); const std::string CAnomalyDetector::MEAN_NAME("mean"); const std::string CAnomalyDetector::MEDIAN_NAME("median"); const std::string CAnomalyDetector::MIN_NAME("min"); const std::string CAnomalyDetector::MAX_NAME("max"); const std::string CAnomalyDetector::VARIANCE_NAME("varp"); const std::string CAnomalyDetector::SUM_NAME("sum"); const std::string CAnomalyDetector::LAT_LONG_NAME("lat_long"); const std::string CAnomalyDetector::EMPTY_STRING; CAnomalyDetector::CAnomalyDetector(CLimits& limits, const CAnomalyDetectorModelConfig& modelConfig, const std::string& partitionFieldValue, core_t::TTime firstTime, const TModelFactoryCPtr& modelFactory) : m_Limits(limits), m_ModelConfig(modelConfig), m_LastBucketEndTime( maths::common::CIntegerTools::ceil(firstTime, modelConfig.bucketLength())), m_DataGatherer(makeDataGatherer(modelFactory, m_LastBucketEndTime, partitionFieldValue)), m_ModelFactory(modelFactory), m_Model(makeModel(modelFactory, m_DataGatherer)), m_IsForPersistence(false) { if (m_DataGatherer == nullptr) { LOG_ABORT(<< "Failed to construct data gatherer for detector: " << this->description()); } if (m_Model == nullptr) { LOG_ABORT(<< "Failed to construct model for detector: " << this->description()); } limits.resourceMonitor().registerComponent(*this); LOG_DEBUG(<< "CAnomalyDetector(): " << this->description() << " for '" << m_DataGatherer->partitionFieldValue() << "'" << ", first time = " << firstTime << ", bucketLength = " << modelConfig.bucketLength() << ", m_LastBucketEndTime = " << m_LastBucketEndTime); } CAnomalyDetector::CAnomalyDetector(bool isForPersistence, const CAnomalyDetector& other) : m_Limits(other.m_Limits), m_ModelConfig(other.m_ModelConfig), // Empty result function is fine in this case // Empty result count function is fine in this case m_LastBucketEndTime(other.m_LastBucketEndTime), m_DataGatherer(other.m_DataGatherer->cloneForPersistence()), m_ModelFactory(other.m_ModelFactory), // Shallow copy of model factory is OK m_Model(other.m_Model->cloneForPersistence()), // Empty message propagation function is fine in this case m_IsForPersistence(isForPersistence) { if (!isForPersistence) { LOG_ABORT(<< "This constructor only creates clones for persistence"); } } CAnomalyDetector::~CAnomalyDetector() { if (!m_IsForPersistence) { m_Limits.resourceMonitor().unRegisterComponent(*this); } } size_t CAnomalyDetector::numberActivePeople() const { return m_DataGatherer->numberActivePeople(); } size_t CAnomalyDetector::numberActiveAttributes() const { return m_DataGatherer->numberActiveAttributes(); } size_t CAnomalyDetector::maxDimension() const { return m_DataGatherer->maxDimension(); } void CAnomalyDetector::zeroModelsToTime(core_t::TTime time) { // If there has been a big gap in the times, we might need to sample // many buckets; if there has been no gap, the loop may legitimately // have no iterations. core_t::TTime bucketLength = m_ModelConfig.bucketLength(); while (time >= (m_LastBucketEndTime + bucketLength)) { core_t::TTime bucketStartTime = m_LastBucketEndTime; m_LastBucketEndTime += bucketLength; LOG_TRACE(<< "sample: m_DetectorKey = '" << this->description() << "', bucketStartTime = " << bucketStartTime << ", m_LastBucketEndTime = " << m_LastBucketEndTime); // Update the statistical models. m_Model->sample(bucketStartTime, m_LastBucketEndTime, m_Limits.resourceMonitor()); } } bool CAnomalyDetector::acceptRestoreTraverser(const std::string& partitionFieldValue, core::CStateRestoreTraverser& traverser) { m_DataGatherer->clear(); m_Model.reset(); // We expect tags immediately below the root storing the first time the // models were created and the models IN THAT ORDER. do { const std::string& name = traverser.name(); if (name == MODEL_AND_GATHERER_TAG) { if (traverser.traverseSubLevel(std::bind( &CAnomalyDetector::legacyModelEnsembleAcceptRestoreTraverser, this, std::cref(partitionFieldValue), std::placeholders::_1)) == false) { LOG_ERROR(<< "Invalid model ensemble section in " << traverser.value()); return false; } } else if (name == SIMPLE_COUNT_STATICS) { if (traverser.traverseSubLevel(std::bind(&CAnomalyDetector::staticsAcceptRestoreTraverser, this, std::placeholders::_1)) == false) { LOG_ERROR(<< "Invalid simple count statics in " << traverser.value()); return false; } } } while (traverser.next()); return true; } bool CAnomalyDetector::legacyModelEnsembleAcceptRestoreTraverser(const std::string& partitionFieldValue, core::CStateRestoreTraverser& traverser) { do { const std::string& name = traverser.name(); if (name == DATA_GATHERER_TAG) { m_DataGatherer = m_ModelFactory->makeDataGatherer(partitionFieldValue, traverser); if (m_DataGatherer == nullptr || m_DataGatherer->checkInvariants() == false) { LOG_ERROR(<< "Failed to restore the data gatherer from " << traverser.value()); return false; } } else if (name == MODELS_TAG) { if (traverser.traverseSubLevel(std::bind(&CAnomalyDetector::legacyModelsAcceptRestoreTraverser, this, std::placeholders::_1)) == false) { LOG_ERROR(<< "Failed to restore live models from " << traverser.value()); return false; } } } while (traverser.next()); return true; } bool CAnomalyDetector::legacyModelsAcceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { do { const std::string& name = traverser.name(); if (name == MODEL_TAG) { CModelFactory::SModelInitializationData initData(m_DataGatherer); m_Model.reset(m_ModelFactory->makeModel(initData, traverser)); if (!m_Model) { LOG_ERROR(<< "Failed to extract model from " << traverser.value()); return false; } } } while (traverser.next()); return true; } bool CAnomalyDetector::staticsAcceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { do { const std::string& name = traverser.name(); if (name == PROGRAM_COUNTERS_TAG) { if (traverser.traverseSubLevel( &core::CProgramCounters::staticsAcceptRestoreTraverser) == false) { LOG_ERROR(<< "Failed to restore program counters"); return false; } } else if (name == SAMPLING_TAG) { if (traverser.traverseSubLevel( &maths::common::CSampling::staticsAcceptRestoreTraverser) == false) { LOG_ERROR(<< "Failed to restore sampling state"); return false; } } } while (traverser.next()); return true; } bool CAnomalyDetector::partitionFieldAcceptRestoreTraverser(core::CStateRestoreTraverser& traverser, std::string& partitionFieldValue) { do { const std::string& name = traverser.name(); if (name == PARTITION_FIELD_VALUE_TAG) { partitionFieldValue = traverser.value(); return true; } } while (traverser.next()); return false; } bool CAnomalyDetector::keyAcceptRestoreTraverser(core::CStateRestoreTraverser& traverser, CSearchKey& key) { do { const std::string& name = traverser.name(); if (name == KEY_TAG) { bool successful(true); key = CSearchKey(traverser, successful); if (successful == false) { LOG_ERROR(<< "Invalid key in " << traverser.value()); return false; } return true; } } while (traverser.next()); return false; } void CAnomalyDetector::keyAcceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertLevel(KEY_TAG, std::bind(&CSearchKey::acceptPersistInserter, &m_DataGatherer->searchKey(), std::placeholders::_1)); } void CAnomalyDetector::partitionFieldAcceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertValue(PARTITION_FIELD_VALUE_TAG, m_DataGatherer->partitionFieldValue()); } bool CAnomalyDetector::shouldPersistDetector() const { // Query the model to determine if it should be persisted. // This may return false if every constituent feature model is effectively // empty, i.e. all the models are stubs due to them being pruned. // If the model should not be persisted neither should the detector. if (m_Model->shouldPersist() == false) { LOG_TRACE(<< "NOT persisting detector \"" << this->description() << "\" due to all feature models being pruned"); return false; } return true; } void CAnomalyDetector::acceptPersistInserter(core::CStatePersistInserter& inserter) const { // Persist static members only once within the simple count detector // and do this first so that other model components can use // static strings if (this->isSimpleCount()) { inserter.insertLevel(SIMPLE_COUNT_STATICS, std::bind(&CAnomalyDetector::staticsAcceptPersistInserter, this, std::placeholders::_1)); } // Persist what used to belong in model ensemble at a separate level to ensure BWC inserter.insertLevel(MODEL_AND_GATHERER_TAG, std::bind(&CAnomalyDetector::legacyModelEnsembleAcceptPersistInserter, this, std::placeholders::_1)); } void CAnomalyDetector::staticsAcceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertLevel(PROGRAM_COUNTERS_TAG, &core::CProgramCounters::staticsAcceptPersistInserter); inserter.insertLevel(SAMPLING_TAG, &maths::common::CSampling::staticsAcceptPersistInserter); } void CAnomalyDetector::legacyModelEnsembleAcceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertLevel(DATA_GATHERER_TAG, std::bind(&CDataGatherer::acceptPersistInserter, std::cref(*m_DataGatherer), std::placeholders::_1)); // This level seems redundant but it is simulating state as it was when CModelEnsemble // was around. inserter.insertLevel(MODELS_TAG, std::bind(&CAnomalyDetector::legacyModelsAcceptPersistInserter, this, std::placeholders::_1)); } void CAnomalyDetector::persistModelsState(core::CStatePersistInserter& inserter) const { m_Model->persistModelsState(inserter); } void CAnomalyDetector::legacyModelsAcceptPersistInserter(core::CStatePersistInserter& inserter) const { inserter.insertLevel(MODEL_TAG, std::bind(&CAnomalyDetectorModel::acceptPersistInserter, m_Model.get(), std::placeholders::_1)); } const CAnomalyDetector::TStrVec& CAnomalyDetector::fieldsOfInterest() const { return m_DataGatherer->fieldsOfInterest(); } void CAnomalyDetector::addRecord(core_t::TTime time, const TStrCPtrVec& fieldValues) { const TStrCPtrVec& processedFieldValues = this->preprocessFieldValues(fieldValues); CEventData eventData; eventData.time(time); m_DataGatherer->addArrival(processedFieldValues, eventData, m_Limits.resourceMonitor()); } const CAnomalyDetector::TStrCPtrVec& CAnomalyDetector::preprocessFieldValues(const TStrCPtrVec& fieldValues) { return fieldValues; } void CAnomalyDetector::buildResults(core_t::TTime bucketStartTime, core_t::TTime bucketEndTime, CHierarchicalResults& results) { core_t::TTime bucketLength = m_ModelConfig.bucketLength(); bucketStartTime = maths::common::CIntegerTools::floor(bucketStartTime, bucketLength); bucketEndTime = maths::common::CIntegerTools::floor(bucketEndTime, bucketLength); if (bucketEndTime <= m_LastBucketEndTime) { return; } m_Limits.resourceMonitor().clearExtraMemory(); this->buildResultsHelper( bucketStartTime, bucketEndTime, std::bind(&CAnomalyDetector::sample, this, std::placeholders::_1, std::placeholders::_2, std::ref(m_Limits.resourceMonitor())), std::bind(&CAnomalyDetector::updateLastSampledBucket, this, std::placeholders::_1), results); } void CAnomalyDetector::sample(core_t::TTime startTime, core_t::TTime endTime, CResourceMonitor& resourceMonitor) { if (endTime <= startTime) { // Nothing to sample. return; } core_t::TTime bucketLength = m_ModelConfig.bucketLength(); for (core_t::TTime time = startTime; time < endTime; time += bucketLength) { m_Model->sample(time, time + bucketLength, resourceMonitor); } if ((endTime / bucketLength) % 10 == 0) { // Even if memory limiting is disabled, force a refresh every 10 buckets // so the user has some idea what's going on with memory. (Note: the // 10 bucket interval is inexact as sampling may not take place for // every bucket. However, it's probably good enough.) resourceMonitor.forceRefresh(*this); } else { resourceMonitor.refresh(*this); } } void CAnomalyDetector::sampleBucketStatistics(core_t::TTime startTime, core_t::TTime endTime, CResourceMonitor& resourceMonitor) { if (endTime <= startTime) { // Nothing to sample. return; } core_t::TTime bucketLength = m_ModelConfig.bucketLength(); for (core_t::TTime time = startTime; time < endTime; time += bucketLength) { m_Model->sampleBucketStatistics(time, time + bucketLength, resourceMonitor); } resourceMonitor.refresh(*this); } void CAnomalyDetector::generateModelPlot(core_t::TTime bucketStartTime, core_t::TTime bucketEndTime, double boundsPercentile, const TStrSet& terms, TModelPlotDataVec& modelPlots) const { if (bucketEndTime <= bucketStartTime) { return; } if (terms.empty() || m_DataGatherer->partitionFieldValue().empty() || terms.find(m_DataGatherer->partitionFieldValue()) != terms.end()) { const CSearchKey& key = m_DataGatherer->searchKey(); TModelDetailsViewUPtr view = m_Model.get()->details(); if (view.get()) { core_t::TTime bucketLength = m_ModelConfig.bucketLength(); for (core_t::TTime time = bucketStartTime; time < bucketEndTime; time += bucketLength) { modelPlots.emplace_back(time, key.partitionFieldName(), m_DataGatherer->partitionFieldValue(), key.overFieldName(), key.byFieldName(), bucketLength, key.detectorIndex()); view->modelPlot(time, boundsPercentile, terms, modelPlots.back()); } } } } void CAnomalyDetector::generateAnnotations(core_t::TTime bucketStartTime, core_t::TTime bucketEndTime, TAnnotationVec& annotations) const { if (bucketEndTime <= bucketStartTime) { return; } for (const auto& annotation : m_Model->annotations()) { if (annotation.time() >= bucketStartTime && annotation.time() < bucketEndTime) { annotations.push_back(annotation); } } } CForecastDataSink::SForecastModelPrerequisites CAnomalyDetector::getForecastPrerequisites() const { CForecastDataSink::SForecastModelPrerequisites prerequisites{0, 0, 0, true, false}; TModelDetailsViewUPtr view = m_Model->details(); // The view can be empty, e.g. for the counting model. if (view.get() == nullptr) { return prerequisites; } prerequisites.s_IsPopulation = m_DataGatherer->isPopulation(); if (prerequisites.s_IsPopulation) { return prerequisites; } const CSearchKey& key = m_DataGatherer->searchKey(); prerequisites.s_IsSupportedFunction = function_t::isForecastSupported(key.function()); if (prerequisites.s_IsSupportedFunction == false) { return prerequisites; } for (std::size_t pid = 0u, maxPid = m_DataGatherer->numberPeople(); pid < maxPid; ++pid) { // todo: Add terms filtering here if (m_DataGatherer->isPersonActive(pid)) { for (auto feature : view->features()) { const maths::common::CModel* model = view->model(feature, pid); // The model might not exist, e.g. for categorical features. if (model != nullptr) { ++prerequisites.s_NumberOfModels; if (model->isForecastPossible()) { ++prerequisites.s_NumberOfForecastableModels; } prerequisites.s_MemoryUsageForDetector += model->memoryUsage(); } } } } return prerequisites; } CForecastDataSink::SForecastResultSeries CAnomalyDetector::getForecastModels(bool persistOnDisk, const std::string& persistenceFolder) const { CForecastDataSink::SForecastResultSeries series(m_ModelFactory->modelParams()); if (m_DataGatherer->isPopulation()) { return series; } TModelDetailsViewUPtr view{m_Model.get()->details()}; // The view can be empty, e.g. for the counting model. if (view.get() == nullptr) { return series; } const CSearchKey& key{m_DataGatherer->searchKey()}; series.s_ByFieldName = key.byFieldName(); series.s_DetectorIndex = key.detectorIndex(); series.s_PartitionFieldName = key.partitionFieldName(); series.s_PartitionFieldValue = m_DataGatherer->partitionFieldValue(); series.s_MinimumSeasonalVarianceScale = m_ModelFactory->minimumSeasonalVarianceScale(); if (persistOnDisk) { CForecastModelPersist::CPersist persister(persistenceFolder); for (std::size_t pid = 0; pid < m_DataGatherer->numberPeople(); ++pid) { // todo: Add terms filtering here if (m_DataGatherer->isPersonActive(pid)) { for (auto feature : view->features()) { const maths::common::CModel* model{view->model(feature, pid)}; if (model != nullptr && model->isForecastPossible()) { core_t::TTime firstDataTime; core_t::TTime lastDataTime; std::tie(firstDataTime, lastDataTime) = view->dataTimeInterval(pid); persister.addModel(model, firstDataTime, lastDataTime, feature, m_DataGatherer->personName(pid)); } } } } series.s_ToForecastPersisted = persister.finalizePersistAndGetFile(); } else { for (std::size_t pid = 0; pid < m_DataGatherer->numberPeople(); ++pid) { // todo: Add terms filtering here if (m_DataGatherer->isPersonActive(pid)) { for (auto feature : view->features()) { const maths::common::CModel* model{view->model(feature, pid)}; if (model != nullptr && model->isForecastPossible()) { core_t::TTime firstDataTime; core_t::TTime lastDataTime; std::tie(firstDataTime, lastDataTime) = view->dataTimeInterval(pid); series.s_ToForecast.emplace_back( feature, m_DataGatherer->personName(pid), CForecastDataSink::TMathsModelPtr(model->cloneForForecast()), firstDataTime, lastDataTime); } } } } } return series; } void CAnomalyDetector::buildInterimResults(core_t::TTime bucketStartTime, core_t::TTime bucketEndTime, CHierarchicalResults& results) { this->buildResultsHelper( bucketStartTime, bucketEndTime, std::bind(&CAnomalyDetector::sampleBucketStatistics, this, std::placeholders::_1, std::placeholders::_2, std::ref(m_Limits.resourceMonitor())), std::bind(&CAnomalyDetector::noUpdateLastSampledBucket, this, std::placeholders::_1), results); } void CAnomalyDetector::pruneModels() { // Purge out any ancient models which are effectively dead. m_Model->prune(m_Model->defaultPruneWindow()); } void CAnomalyDetector::pruneModels(std::size_t buckets) { // Purge out any models that haven't seen activity in the given number of buckets. function_t::EFunction function{m_DataGatherer->function()}; if (function_t::isAggressivePruningSupported(function)) { m_Model->prune(buckets); } } void CAnomalyDetector::resetBucket(core_t::TTime bucketStart) { m_DataGatherer->resetBucket(bucketStart); } void CAnomalyDetector::releaseMemory(core_t::TTime samplingCutoffTime) { m_DataGatherer->releaseMemory(samplingCutoffTime); } void CAnomalyDetector::showMemoryUsage(std::ostream& stream) const { core::CMemoryUsage mem; this->debugMemoryUsage(mem.addChild()); mem.compress(); mem.print(stream); if (mem.usage() != this->memoryUsage()) { LOG_ERROR(<< "Discrepancy in memory report: " << mem.usage() << " from debug, but " << this->memoryUsage() << " from normal"); } } void CAnomalyDetector::debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const { mem->setName("Anomaly Detector Memory Usage"); core::memory_debug::dynamicSize("m_DataGatherer", m_DataGatherer, mem); core::memory_debug::dynamicSize("m_Model", m_Model, mem); } std::size_t CAnomalyDetector::memoryUsage() const { return core::memory::dynamicSize(m_DataGatherer) + core::memory::dynamicSize(m_Model); } std::size_t CAnomalyDetector::staticSize() const { return sizeof(*this); } bool CAnomalyDetector::supportsPruning() const { return true; } bool CAnomalyDetector::initPruneWindow(std::size_t& defaultPruneWindow, std::size_t& minimumPruneWindow) const { // The longest we'll consider keeping priors for is 1M buckets. defaultPruneWindow = m_Model->defaultPruneWindow(); minimumPruneWindow = m_Model->minimumPruneWindow(); return true; } core_t::TTime CAnomalyDetector::bucketLength() const { return m_Model->bucketLength(); } void CAnomalyDetector::prune(std::size_t maximumAge) { m_Model->prune(maximumAge); } void CAnomalyDetector::updateModelSizeStats(CResourceMonitor::SModelSizeStats& modelSizeStats) const { ++modelSizeStats.s_PartitionFields; const auto& dataGatherer = m_Model->dataGatherer(); modelSizeStats.s_OverFields += dataGatherer.numberOverFieldValues(); modelSizeStats.s_ByFields += dataGatherer.numberByFieldValues(); } const core_t::TTime& CAnomalyDetector::lastBucketEndTime() const { return m_LastBucketEndTime; } core_t::TTime& CAnomalyDetector::lastBucketEndTime() { return m_LastBucketEndTime; } core_t::TTime CAnomalyDetector::modelBucketLength() const { return m_ModelConfig.bucketLength(); } std::string CAnomalyDetector::description() const { auto beginInfluencers = m_DataGatherer->beginInfluencers(); auto endInfluencers = m_DataGatherer->endInfluencers(); return m_DataGatherer->description() + (m_DataGatherer->partitionFieldValue().empty() ? "" : "/") + m_DataGatherer->partitionFieldValue() + (beginInfluencers != endInfluencers ? (" " + core::CContainerPrinter::print(beginInfluencers, endInfluencers)) : ""); } void CAnomalyDetector::timeNow(core_t::TTime time) { m_DataGatherer->timeNow(time); } void CAnomalyDetector::skipSampling(core_t::TTime endTime) { m_Model->skipSampling(endTime); m_LastBucketEndTime = endTime; } template<typename SAMPLE_FUNC, typename LAST_SAMPLED_BUCKET_UPDATE_FUNC> void CAnomalyDetector::buildResultsHelper(core_t::TTime bucketStartTime, core_t::TTime bucketEndTime, SAMPLE_FUNC sampleFunc, LAST_SAMPLED_BUCKET_UPDATE_FUNC lastSampledBucketUpdateFunc, CHierarchicalResults& results) { core_t::TTime bucketLength = m_ModelConfig.bucketLength(); LOG_TRACE(<< "sample: m_DetectorKey = '" << this->description() << "', bucketStartTime = " << bucketStartTime << ", bucketEndTime = " << bucketEndTime); // Update the statistical models. sampleFunc(bucketStartTime, bucketEndTime); LOG_TRACE(<< "detect: m_DetectorKey = '" << this->description() << "'"); CSearchKey key = m_DataGatherer->searchKey(); LOG_TRACE(<< "OutputResults, for " << key.toCue()); if (m_Model->addResults(bucketStartTime, bucketEndTime, 10, // TODO max number of attributes results)) { if (bucketEndTime % bucketLength == 0) { lastSampledBucketUpdateFunc(bucketEndTime); } } } void CAnomalyDetector::updateLastSampledBucket(core_t::TTime bucketEndTime) { m_LastBucketEndTime = std::max(m_LastBucketEndTime, bucketEndTime); } void CAnomalyDetector::noUpdateLastSampledBucket(core_t::TTime /*bucketEndTime*/) const { // Do nothing } std::string CAnomalyDetector::toCue() const { return m_DataGatherer->searchKey().toCue() + m_DataGatherer->searchKey().CUE_DELIMITER + m_DataGatherer->partitionFieldValue(); } std::string CAnomalyDetector::debug() const { return m_DataGatherer->searchKey().debug() + '/' + m_DataGatherer->partitionFieldValue(); } bool CAnomalyDetector::isSimpleCount() const { return false; } void CAnomalyDetector::initSimpleCounting() { bool addedPerson = false; m_DataGatherer->addPerson(COUNT_NAME, m_Limits.resourceMonitor(), addedPerson); } const CAnomalyDetector::TModelPtr& CAnomalyDetector::model() const { return m_Model; } CAnomalyDetector::TModelPtr& CAnomalyDetector::model() { return m_Model; } std::ostream& operator<<(std::ostream& strm, const CAnomalyDetector& detector) { strm << detector.m_DataGatherer->searchKey() << '/' << detector.m_DataGatherer->partitionFieldValue(); return strm; } } }