lib/api/CAnomalyJob.cc (1,336 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <api/CAnomalyJob.h> #include <core/CDataAdder.h> #include <core/CDataSearcher.h> #include <core/CJsonStatePersistInserter.h> #include <core/CJsonStateRestoreTraverser.h> #include <core/CLogger.h> #include <core/CPersistUtils.h> #include <core/CProgramCounters.h> #include <core/CScopedBoostJsonPoolAllocator.h> #include <core/CStateCompressor.h> #include <core/CStateDecompressor.h> #include <core/CStopWatch.h> #include <core/CStringUtils.h> #include <core/CTimeUtils.h> #include <core/UnwrapRef.h> #include <maths/common/CIntegerTools.h> #include <maths/common/COrderings.h> #include <model/CHierarchicalResultsAggregator.h> #include <model/CHierarchicalResultsPopulator.h> #include <model/CHierarchicalResultsProbabilityFinalizer.h> #include <model/CLimits.h> #include <model/CModelFactory.h> #include <model/CSearchKey.h> #include <model/CSimpleCountDetector.h> #include <api/CAnnotationJsonWriter.h> #include <api/CAnomalyJobConfig.h> #include <api/CConfigUpdater.h> #include <api/CHierarchicalResultsWriter.h> #include <api/CJsonOutputWriter.h> #include <api/CModelPlotDataJsonWriter.h> #include <api/CPersistenceManager.h> #include <algorithm> #include <fstream> #include <memory> #include <string> #include <utility> namespace ml { namespace api { // We use short field names to reduce the state size namespace { using TStrCRef = std::reference_wrapper<const std::string>; //! Convert a (string, key) pair to something readable. template<typename T> inline std::string pairDebug(const T& t) { return core::unwrap_ref(t.second).debug() + '/' + core::unwrap_ref(t.first); } const std::string TOP_LEVEL_DETECTOR_TAG("detector"); // do not shorten this const std::string RESULTS_AGGREGATOR_TAG("aggregator"); const std::string TIME_TAG("a"); const std::string VERSION_TAG("b"); const std::string KEY_TAG("c"); const std::string PARTITION_FIELD_TAG("d"); const std::string DETECTOR_TAG("e"); // This is no longer used - removed in 6.6 // const std::string HIERARCHICAL_RESULTS_TAG("f"); const std::string LATEST_RECORD_TIME_TAG("h"); // This is no longer used - removed in 6.6 // const std::string MODEL_PLOT_TAG("i"); const std::string LAST_RESULTS_TIME_TAG("j"); const std::string INTERIM_BUCKET_CORRECTOR_TAG("k"); const std::string INITIAL_LAST_FINALISED_BUCKET_END_TIME("l"); //! The minimum version required to read the state corresponding to a model snapshot. //! This should be updated every time there is a breaking change to the model state. //! Newer versions are able to read the model state of older versions, but older //! versions cannot read the model state of newer versions following a breaking //! change. This constant tells the node assignment code not to load new model states //! on old nodes in a mixed version cluster. (Most recently this has been updated to //! 9.0.0 so that we have a clean break of state compatibility on the major version //! boundary. Model snapshots generated in 9.x will not be loadable by 8.x, and //! when 8.x is end-of-life we'll be able to remove all the 8.x state backwards //! compatibility code.) const std::string MODEL_SNAPSHOT_MIN_VERSION("9.0.0"); //! Persist state as JSON with meaningful tag names. class CReadableJsonStatePersistInserter : public core::CJsonStatePersistInserter { public: explicit CReadableJsonStatePersistInserter(std::ostream& outputStream) : core::CJsonStatePersistInserter(outputStream) {} bool readableTags() const override { return true; } }; } // Statics const std::string CAnomalyJob::STATE_TYPE("model_state"); const std::string CAnomalyJob::DEFAULT_TIME_FIELD_NAME("time"); const std::string CAnomalyJob::EMPTY_STRING; const CAnomalyJob::TAnomalyDetectorPtr CAnomalyJob::NULL_DETECTOR; CAnomalyJob::CAnomalyJob(std::string jobId, model::CLimits& limits, CAnomalyJobConfig& jobConfig, model::CAnomalyDetectorModelConfig& modelConfig, core::CJsonOutputStreamWrapper& outputStream, TPersistCompleteFunc persistCompleteFunc, CPersistenceManager* persistenceManager, core_t::TTime maxQuantileInterval, const std::string& timeFieldName, const std::string& timeFieldFormat, size_t maxAnomalyRecords) : CDataProcessor{timeFieldName, timeFieldFormat}, m_JobId{std::move(jobId)}, m_Limits{limits}, m_OutputStream{outputStream}, m_ForecastRunner{m_JobId, m_OutputStream, limits.resourceMonitor()}, m_JsonOutputWriter{m_JobId, m_OutputStream}, m_JobConfig{jobConfig}, m_ModelConfig{modelConfig}, m_PersistCompleteFunc{std::move(persistCompleteFunc)}, m_MaxDetectors{std::numeric_limits<size_t>::max()}, m_PersistenceManager{persistenceManager}, m_MaxQuantileInterval{maxQuantileInterval}, m_LastNormalizerPersistTime{core::CTimeUtils::now()}, m_Aggregator{modelConfig}, m_Normalizer{limits, modelConfig} { m_JsonOutputWriter.limitNumberRecords(maxAnomalyRecords); m_Limits.resourceMonitor().memoryUsageReporter( [ObjectPtr = &m_JsonOutputWriter]<typename T>(T && PH1) { ObjectPtr->reportMemoryUsage(std::forward<T>(PH1)); }); } CAnomalyJob::~CAnomalyJob() { m_ForecastRunner.finishForecasts(); } bool CAnomalyJob::handleRecord(const TStrStrUMap& dataRowFields, TOptionalTime time) { // Non-empty control fields take precedence over everything else if (TStrStrUMapCItr const iter = dataRowFields.find(CONTROL_FIELD_NAME); iter != dataRowFields.end() && !iter->second.empty()) { return this->handleControlMessage(iter->second); } // Time may have been parsed already further back along the chain if (time == std::nullopt) { time = this->parseTime(dataRowFields); if (time == std::nullopt) { // Time is compulsory for anomaly detection - the base class will // have logged the parse error return true; } } // This record must be within the specified latency. If latency // is zero, then it should be after the current bucket end. If // latency is non-zero, then it should be after the current bucket // end minus the latency. if (*time < m_LastFinalisedBucketEndTime) { ++core::CProgramCounters::counter(counter_t::E_TSADNumberTimeOrderErrors); std::ostringstream ss; ss << "Records must be in ascending time order. " << "Record '" << ml::api::CAnomalyJob::debugPrintRecord(dataRowFields) << "' time " << *time << " is before bucket time " << m_LastFinalisedBucketEndTime; LOG_ERROR(<< ss.str()); return true; } LOG_TRACE(<< "Handling record " << this->debugPrintRecord(dataRowFields)); this->outputBucketResultsUntil(*time); if (m_DetectorKeys.empty()) { ml::api::CAnomalyJob::populateDetectorKeys(m_JobConfig, m_DetectorKeys); } for (const auto& m_DetectorKey : m_DetectorKeys) { const std::string& partitionFieldName(m_DetectorKey.partitionFieldName()); // An empty partitionFieldName means no partitioning TStrStrUMapCItr const itr = partitionFieldName.empty() ? dataRowFields.end() : dataRowFields.find(partitionFieldName); const std::string& partitionFieldValue( itr == dataRowFields.end() ? EMPTY_STRING : itr->second); // TODO(valeriy): - should usenull apply to the partition field too? const TAnomalyDetectorPtr& detector = this->detectorForKey( false, // not restoring *time, m_DetectorKey, partitionFieldValue, m_Limits.resourceMonitor()); if (detector == nullptr) { // There wasn't enough memory to create the detector continue; } ml::api::CAnomalyJob::addRecord(detector, *time, dataRowFields); } ++core::CProgramCounters::counter(counter_t::E_TSADNumberApiRecordsHandled); ++m_NumRecordsHandled; m_LatestRecordTime = std::max(m_LatestRecordTime, *time); return true; } void CAnomalyJob::finalise() { // Persist final state of normalizer iff an input record has been handled or time has been advanced. if (this->isPersistenceNeeded("quantiles state and model size stats")) { m_JsonOutputWriter.persistNormalizer(m_Normalizer, m_LastNormalizerPersistTime); // Prune the models so that the final persisted state is as neat as possible this->pruneAllModels(); this->refreshMemoryAndReport(); } // Wait for any ongoing periodic persist to complete, so that the data adder // is not used by both a periodic background persist and foreground persist at the // same time if (m_PersistenceManager != nullptr) { m_PersistenceManager->waitForIdle(); } m_JsonOutputWriter.finalise(); } bool CAnomalyJob::initNormalizer(const std::string& quantilesStateFile) { std::ifstream inputStream(quantilesStateFile.c_str()); return m_Normalizer.fromJsonStream(inputStream) == model::CHierarchicalResultsNormalizer::E_Ok; } std::uint64_t CAnomalyJob::numRecordsHandled() const { return m_NumRecordsHandled; } void CAnomalyJob::description() const { if (m_Detectors.empty()) { return; } TKeyCRefAnomalyDetectorPtrPrVec detectors; this->sortedDetectors(detectors); LOG_INFO(<< "Anomaly detectors:"); TStrCRef partition = detectors[0].first.first; LOG_INFO(<< "\tpartition " << partition.get()); LOG_INFO(<< "\t\tkey " << detectors[0].first.second.get()); LOG_INFO(<< "\t\t\t" << detectors[0].second->description()); for (std::size_t i = 1; i < detectors.size(); ++i) { if (detectors[i].first.first.get() != partition.get()) { partition = detectors[i].first.first; LOG_INFO(<< "\tpartition " << partition.get()); } LOG_INFO(<< "\t\tkey " << detectors[i].first.second.get()); LOG_INFO(<< "\t\t\t" << detectors[i].second->description()); } } void CAnomalyJob::descriptionAndDebugMemoryUsage() const { if (m_Detectors.empty()) { LOG_INFO(<< "No detectors"); return; } TKeyCRefAnomalyDetectorPtrPrVec detectors; this->sortedDetectors(detectors); std::ostringstream ss; ss << "Anomaly detectors:" << '\n'; TStrCRef partition = detectors[0].first.first; ss << "\tpartition " << partition.get() << '\n'; ss << "\t\tkey " << detectors[0].first.second.get() << '\n'; ss << "\t\t\t" << detectors[0].second->description() << '\n'; detectors[0].second->showMemoryUsage(ss); for (std::size_t i = 1; i < detectors.size(); ++i) { ss << '\n'; if (detectors[i].first.first.get() != partition.get()) { partition = detectors[i].first.first; ss << "\tpartition " << partition.get() << '\n'; } ss << "\t\tkey " << detectors[i].first.second.get() << '\n'; ss << "\t\t\t" << detectors[i].second->description() << '\n'; detectors[i].second->showMemoryUsage(ss); } LOG_INFO(<< ss.str()); } const CAnomalyJob::SRestoredStateDetail& CAnomalyJob::restoreStateStatus() const { return m_RestoredStateDetail; } bool CAnomalyJob::handleControlMessage(const std::string& controlMessage) { if (controlMessage.empty()) { LOG_ERROR(<< "Programmatic error - handleControlMessage should only be " "called with non-empty control messages"); return false; } bool refreshRequired{true}; switch (controlMessage[0]) { case ' ': // Spaces are just used to fill the buffers and force prior messages // through the system - we don't need to do anything else LOG_TRACE(<< "Received space control message of length " << controlMessage.length()); break; case CONTROL_FIELD_NAME_CHAR: // Silent no-op. This is a simple way to ignore repeated header // rows in input. break; case 'f': // Flush ID comes after the initial f this->acknowledgeFlush(controlMessage.substr(1)); break; case 'i': this->generateInterimResults(controlMessage); break; case 'r': this->resetBuckets(controlMessage); break; case 's': this->skipTime(controlMessage.substr(1)); break; case 't': this->advanceTime(controlMessage.substr(1)); break; case 'u': this->updateConfig(controlMessage.substr(1)); break; case 'p': this->doForecast(controlMessage); break; case 'w': this->processPersistControlMessage(controlMessage.substr(1)); break; case 'z': LOG_TRACE(<< "Received control message '" << controlMessage << "'"); // "refreshRequired" parameter comes after the initial z. if (core::CStringUtils::stringToType(controlMessage.substr(1), refreshRequired) == false) { LOG_ERROR(<< "Received request to flush with invalid control message '" << controlMessage << "'"); } else { m_RefreshRequired = refreshRequired; } break; default: LOG_WARN(<< "Ignoring unknown control message of length " << controlMessage.length() << " beginning with '" << controlMessage[0] << '\''); // Don't return false here (for the time being at least), as it // seems excessive to cause the entire job to fail break; } return true; } bool CAnomalyJob::parsePersistControlMessageArgs(const std::string& controlMessageArgs, core_t::TTime& snapshotTimestamp, std::string& snapshotId, std::string& snapshotDescription) { // Expect at least 3 space separated strings - timestamp snapshotId snapshotDescription, where: // timestamp = string representation of seconds since epoch // snapshotId = short string identifier for snapshot - containing no spaces // snapshotDescription = description of snapshot. May contain spaces. std::size_t const pos{controlMessageArgs.find(' ')}; if (pos == std::string::npos) { LOG_ERROR(<< "Invalid control message format: \"" << controlMessageArgs << "\""); return false; } std::string const timestampStr{controlMessageArgs.substr(0, pos)}; if (timestampStr.empty()) { LOG_ERROR(<< "Received empty snapshot timestamp."); return false; } if (core::CStringUtils::stringToType(timestampStr, snapshotTimestamp) == false) { LOG_ERROR(<< "Received invalid snapshotTimestamp " << timestampStr); return false; } std::size_t const pos2{controlMessageArgs.find(' ', pos + 1)}; if (pos2 == std::string::npos) { LOG_ERROR(<< "Invalid control message format: \"" << controlMessageArgs << "\""); return false; } snapshotId = controlMessageArgs.substr(pos + 1, pos2 - pos - 1); snapshotDescription = controlMessageArgs.substr(pos2 + 1); if (snapshotId.empty()) { LOG_ERROR(<< "Received empty snapshotId."); return false; } return true; } void CAnomalyJob::processPersistControlMessage(const std::string& controlMessageArgs) { if (m_PersistenceManager != nullptr) { // There is a subtle difference between these two cases. When there // are no control message arguments this triggers persistence of all // chained processors, i.e. maybe the categorizer as well as the anomaly // detector if there is one. But when control message arguments are // passed, ONLY the persistence of the anomaly detector is triggered. if (controlMessageArgs.empty()) { if (this->isPersistenceNeeded("state")) { m_PersistenceManager->startPersist(core::CTimeUtils::now()); } } else { core_t::TTime snapshotTimestamp{0}; std::string snapshotId; std::string snapshotDescription; if (parsePersistControlMessageArgs(controlMessageArgs, snapshotTimestamp, snapshotId, snapshotDescription)) { // Since this is not going through the full persistence call // chain, make sure model size stats are up to date before // persisting m_Limits.resourceMonitor().forceRefreshAll(); if (m_PersistenceManager->doForegroundPersist( [this, &snapshotDescription, &snapshotId, &snapshotTimestamp](core::CDataAdder& persister) { return this->doPersistStateInForeground( persister, snapshotDescription, snapshotId, snapshotTimestamp); }) == false) { LOG_ERROR(<< "Failed to persist state with parameters \"" << controlMessageArgs << "\""); } } } } } void CAnomalyJob::acknowledgeFlush(const std::string& flushId) { if (flushId.empty()) { LOG_ERROR(<< "Received flush control message with no ID"); } else { LOG_TRACE(<< "Received flush control message with ID " << flushId); } m_JsonOutputWriter.acknowledgeFlush(flushId, m_LastFinalisedBucketEndTime, m_RefreshRequired); } void CAnomalyJob::updateConfig(const std::string& config) { LOG_DEBUG(<< "Received update config request: " << config); CConfigUpdater configUpdater(m_JobConfig, m_ModelConfig); if (configUpdater.update(config) == false) { LOG_ERROR(<< "Failed to update configuration"); } } void CAnomalyJob::advanceTime(const std::string& time_) { if (time_.empty()) { LOG_ERROR(<< "Received request to advance time with no time"); return; } core_t::TTime time(0); if (core::CStringUtils::stringToType(time_, time) == false) { LOG_ERROR(<< "Received request to advance time to invalid time " << time_); return; } if (m_LastFinalisedBucketEndTime == 0) { LOG_DEBUG(<< "Manually advancing time to " << time << " before any valid data has been seen"); } else { LOG_TRACE(<< "Received request to advance time to " << time); } m_TimeAdvanced = true; this->outputBucketResultsUntil(time); this->timeNow(time); } bool CAnomalyJob::isPersistenceNeeded(const std::string& description) const { if ((m_NumRecordsHandled == 0) && (m_TimeAdvanced == false)) { LOG_DEBUG(<< "Will not attempt to persist " << description << ". Zero records were handled and time has not been advanced."); return false; } return true; } void CAnomalyJob::outputBucketResultsUntil(core_t::TTime time) { // If the bucket time has increased, output results for all field names core_t::TTime const bucketLength = m_ModelConfig.bucketLength(); core_t::TTime const latency = m_ModelConfig.latency(); if (m_LastFinalisedBucketEndTime == 0) { m_LastFinalisedBucketEndTime = std::max( m_LastFinalisedBucketEndTime, maths::common::CIntegerTools::floor(time, bucketLength) - latency); m_InitialLastFinalisedBucketEndTime = m_LastFinalisedBucketEndTime; } m_Normalizer.resetBigChange(); for (core_t::TTime lastBucketEndTime = m_LastFinalisedBucketEndTime; lastBucketEndTime + bucketLength + latency <= time; lastBucketEndTime += bucketLength) { if (lastBucketEndTime == m_InitialLastFinalisedBucketEndTime && m_RestoredStateDetail.s_RestoredStateStatus == E_Success) { LOG_DEBUG(<< "Skipping incomplete first bucket with lastBucketEndTime = " << lastBucketEndTime << ", detected after state restoration"); continue; } this->outputResults(lastBucketEndTime); m_Limits.resourceMonitor().decreaseMargin(bucketLength); m_Limits.resourceMonitor().sendMemoryUsageReportIfSignificantlyChanged( lastBucketEndTime, bucketLength); m_LastFinalisedBucketEndTime = lastBucketEndTime + bucketLength; // Check for periodic persistence immediately after calculating results // for the last bucket but before adding the first piece of data for the // next bucket if (m_PersistenceManager != nullptr) { m_PersistenceManager->startPersistIfAppropriate(); } } if (m_Normalizer.hasLastUpdateCausedBigChange() || (m_MaxQuantileInterval > 0 && core::CTimeUtils::now() > m_LastNormalizerPersistTime + m_MaxQuantileInterval)) { m_JsonOutputWriter.persistNormalizer(m_Normalizer, m_LastNormalizerPersistTime); } } void CAnomalyJob::skipTime(const std::string& time_) { if (time_.empty()) { LOG_ERROR(<< "Received request to skip time with no time"); return; } core_t::TTime time(0); if (core::CStringUtils::stringToType(time_, time) == false) { LOG_ERROR(<< "Received request to skip time to invalid time " << time_); return; } this->skipSampling( maths::common::CIntegerTools::ceil(time, m_ModelConfig.bucketLength())); } void CAnomalyJob::skipSampling(core_t::TTime endTime) { LOG_INFO(<< "Skipping time to: " << endTime); for (const auto& detector_ : m_Detectors) { model::CAnomalyDetector* detector(detector_.second.get()); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } detector->skipSampling(endTime); } m_LastFinalisedBucketEndTime = endTime; } void CAnomalyJob::timeNow(core_t::TTime time) { for (const auto& detector_ : m_Detectors) { model::CAnomalyDetector* detector(detector_.second.get()); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } detector->timeNow(time); } } void CAnomalyJob::generateInterimResults(const std::string& controlMessage) { LOG_TRACE(<< "Generating interim results"); if (m_LastFinalisedBucketEndTime == 0) { LOG_TRACE(<< "Cannot create interim results having seen data for less than one bucket ever"); return; } core_t::TTime start = m_LastFinalisedBucketEndTime; core_t::TTime end = m_LastFinalisedBucketEndTime + ((m_ModelConfig.latencyBuckets() + 1) * m_ModelConfig.bucketLength()); if (ml::api::CAnomalyJob::parseTimeRangeInControlMessage(controlMessage, start, end)) { LOG_TRACE(<< "Time range for results: " << start << " : " << end); this->outputResultsWithinRange(true, start, end); } } bool CAnomalyJob::parseTimeRangeInControlMessage(const std::string& controlMessage, core_t::TTime& start, core_t::TTime& end) { using TStrVec = core::CStringUtils::TStrVec; TStrVec tokens; std::string remainder; core::CStringUtils::tokenise(" ", controlMessage.substr(1, std::string::npos), tokens, remainder); if (!remainder.empty()) { tokens.push_back(remainder); } std::size_t const tokensSize = tokens.size(); if (tokensSize == 0) { // Default range return true; } if (tokensSize != 2) { LOG_ERROR(<< "Control message " << controlMessage << " has " << tokensSize << " parameters when only zero or two are allowed."); return false; } if (core::CStringUtils::stringToType(tokens[0], start) && core::CStringUtils::stringToType(tokens[1], end)) { return true; } LOG_ERROR(<< "Cannot parse control message: " << controlMessage); return false; } void CAnomalyJob::doForecast(const std::string& controlMessage) { // make a copy of the detectors vector, note: this is a shallow, not a deep copy TAnomalyDetectorPtrVec detectorVector; this->detectors(detectorVector); // push request into forecast queue, validates if (!m_ForecastRunner.pushForecastJob(controlMessage, detectorVector, m_LastResultsTime)) { // ForecastRunner already logged about it and send a status, so no need to log at info here LOG_DEBUG(<< "Forecast request failed"); } } void CAnomalyJob::outputResults(core_t::TTime bucketStartTime) { core::CStopWatch timer(true); core_t::TTime const bucketLength = m_ModelConfig.bucketLength(); model::CHierarchicalResults results; TModelPlotDataVec modelPlotData; TAnnotationVec annotations; TKeyCRefAnomalyDetectorPtrPrVec detectors; this->sortedDetectors(detectors); for (const auto& detector_ : detectors) { model::CAnomalyDetector* detector(detector_.second.get()); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } detector->buildResults(bucketStartTime, bucketStartTime + bucketLength, results); detector->releaseMemory(bucketStartTime - m_ModelConfig.samplingAgeCutoff()); this->generateModelPlot(bucketStartTime, bucketStartTime + bucketLength, *detector, modelPlotData); detector->generateAnnotations(bucketStartTime, bucketStartTime + bucketLength, annotations); } if (!results.empty()) { results.buildHierarchy(); this->updateAggregatorAndAggregate(false, results); model::CHierarchicalResultsProbabilityFinalizer finalizer; results.bottomUpBreadthFirst(finalizer); results.pivotsBottomUpBreadthFirst(finalizer); model::CHierarchicalResultsPopulator populator(m_Limits); results.bottomUpBreadthFirst(populator); results.pivotsBottomUpBreadthFirst(populator); this->updateNormalizerAndNormalizeResults(false, results); } std::uint64_t const processingTime = timer.stop(); // Model plots must be written first so the Java persists them // once the bucket result is processed this->writeOutModelPlot(modelPlotData); this->writeOutAnnotations(annotations); this->writeOutResults(false, results, bucketStartTime, processingTime); if (m_ModelConfig.modelPruneWindow() > 0) { // Ensure that bucketPruneWindow is always rounded _up_ // to the next whole number of buckets (this doesn't really matter if we enforce // that the model prune window always be an exact multiple of bucket span in the // corresponding Java code) core_t::TTime const bucketPruneWindow{ (m_ModelConfig.modelPruneWindow() + m_ModelConfig.bucketLength() - 1) / m_ModelConfig.bucketLength()}; this->pruneAllModels(bucketPruneWindow); } // Prune models based on memory resource limits m_Limits.resourceMonitor().pruneIfRequired(bucketStartTime); } void CAnomalyJob::outputInterimResults(core_t::TTime bucketStartTime) { core::CStopWatch timer(true); core_t::TTime const bucketLength = m_ModelConfig.bucketLength(); model::CHierarchicalResults results; results.setInterim(); TKeyCRefAnomalyDetectorPtrPrVec detectors; this->sortedDetectors(detectors); for (const auto& detector_ : detectors) { model::CAnomalyDetector* detector(detector_.second.get()); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } detector->buildInterimResults(bucketStartTime, bucketStartTime + bucketLength, results); } if (!results.empty()) { results.buildHierarchy(); this->updateAggregatorAndAggregate(true, results); model::CHierarchicalResultsProbabilityFinalizer finalizer; results.bottomUpBreadthFirst(finalizer); results.pivotsBottomUpBreadthFirst(finalizer); model::CHierarchicalResultsPopulator populator(m_Limits); results.bottomUpBreadthFirst(populator); results.pivotsBottomUpBreadthFirst(populator); this->updateNormalizerAndNormalizeResults(true, results); } std::uint64_t const processingTime = timer.stop(); this->writeOutResults(true, results, bucketStartTime, processingTime); } void CAnomalyJob::writeOutResults(bool interim, model::CHierarchicalResults& results, core_t::TTime bucketTime, std::uint64_t processingTime) { if (!results.empty()) { LOG_TRACE(<< "Got results object here: " << results.root()->s_RawAnomalyScore << " / " << results.root()->s_NormalizedAnomalyScore << ", count " << results.resultCount() << " at " << bucketTime); using TScopedAllocator = core::CScopedBoostJsonPoolAllocator<CJsonOutputWriter>; static const std::string ALLOCATOR_ID("CAnomalyJob::writeOutResults"); TScopedAllocator const scopedAllocator(ALLOCATOR_ID, m_JsonOutputWriter); api::CHierarchicalResultsWriter writer( m_Limits, [ObjectPtr = &m_JsonOutputWriter]<typename T>(T && PH1) { return ObjectPtr->acceptResult(std::forward<T>(PH1)); }, [ObjectPtr = &m_JsonOutputWriter]<typename T, typename U, typename V>( T && PH1, U && PH2, V && PH3) { return ObjectPtr->acceptInfluencer( std::forward<T>(PH1), std::forward<U>(PH2), std::forward<V>(PH3)); }); results.bottomUpBreadthFirst(writer); results.pivotsBottomUpBreadthFirst(writer); // Add the bucketTime bucket influencer. // Note that the influencer will only be accepted if there are records. m_JsonOutputWriter.acceptBucketTimeInfluencer( bucketTime, results.root()->s_AnnotatedProbability.s_Probability, results.root()->s_RawAnomalyScore, results.root()->s_NormalizedAnomalyScore); core::CProgramCounters::counter(counter_t::E_TSADOutputMemoryAllocatorUsage) = m_JsonOutputWriter.getOutputMemoryAllocatorUsage(); if (m_JsonOutputWriter.endOutputBatch(interim, processingTime) == false) { LOG_ERROR(<< "Problem writing anomaly output"); } m_LastResultsTime = bucketTime; } } void CAnomalyJob::resetBuckets(const std::string& controlMessage) { if (controlMessage.length() == 1) { LOG_ERROR(<< "Received reset buckets control message without time range"); return; } core_t::TTime start = 0; core_t::TTime end = 0; if (ml::api::CAnomalyJob::parseTimeRangeInControlMessage(controlMessage, start, end) == false) { return; } core_t::TTime const bucketLength = m_ModelConfig.bucketLength(); core_t::TTime time = maths::common::CIntegerTools::floor(start, bucketLength); core_t::TTime const bucketEnd = maths::common::CIntegerTools::ceil(end, bucketLength); while (time < bucketEnd) { for (const auto& detector_ : m_Detectors) { model::CAnomalyDetector* detector = detector_.second.get(); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } LOG_TRACE(<< "Resetting bucket = " << time); detector->resetBucket(time); } time += bucketLength; } } void CAnomalyJob::setDetectorsLastBucketEndTime(core_t::TTime lastBucketEndTime) { for (const auto& detector_ : m_Detectors) { model::CAnomalyDetector* detector(detector_.second.get()); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } const std::string& description = detector->description(); LOG_DEBUG(<< "Setting lastBucketEndTime to " << lastBucketEndTime << " in detector for '" << description << '\''); detector->lastBucketEndTime() = lastBucketEndTime; } } bool CAnomalyJob::restoreState(core::CDataSearcher& restoreSearcher, core_t::TTime& completeToTime) { size_t numDetectors(0); try { // Restore from Elasticsearch compressed data. // (To restore from uncompressed data for testing, comment the next line // and substitute decompressor with restoreSearcher two lines below.) core::CStateDecompressor decompressor(restoreSearcher); core::CDataSearcher::TIStreamP const strm(decompressor.search(1, 1)); if (strm == nullptr) { LOG_ERROR(<< "Unable to connect to data store"); return false; } if (strm->bad()) { LOG_ERROR(<< "State restoration search returned bad stream"); return false; } if (strm->fail()) { // This is fatal. If the stream exists and has failed then state is missing LOG_ERROR(<< "State restoration search returned failed stream"); return false; } // We're dealing with streaming JSON state core::CJsonStateRestoreTraverser traverser(*strm); if (this->restoreState(traverser, completeToTime, numDetectors) == false || traverser.haveBadState()) { LOG_ERROR(<< "Failed to restore detectors"); return false; } LOG_DEBUG(<< "Finished restoration, with " << numDetectors << " detectors"); if (numDetectors == 1 && m_Detectors.empty()) { // non fatal error m_RestoredStateDetail.s_RestoredStateStatus = E_NoDetectorsRecovered; return true; } if (completeToTime > 0) { core_t::TTime const lastBucketEndTime(maths::common::CIntegerTools::ceil( completeToTime, m_ModelConfig.bucketLength())); this->setDetectorsLastBucketEndTime(lastBucketEndTime); } else { if (!m_Detectors.empty()) { LOG_ERROR(<< "Inconsistency - " << m_Detectors.size() << " detectors have been restored but completeToTime is " << completeToTime); } } } catch (std::exception& e) { LOG_ERROR(<< "Failed to restore state! " << e.what()); return false; } return true; } bool CAnomalyJob::restoreState(core::CStateRestoreTraverser& traverser, core_t::TTime& completeToTime, std::size_t& numDetectors) { m_RestoredStateDetail.s_RestoredStateStatus = E_Failure; m_RestoredStateDetail.s_Extra = std::nullopt; // Call name() to prime the traverser if it hasn't started traverser.name(); if (traverser.isEof()) { m_RestoredStateDetail.s_RestoredStateStatus = E_NoDetectorsRecovered; LOG_ERROR(<< "Expected persisted state but no state exists"); return false; } core_t::TTime lastBucketEndTime(0); if (traverser.name() != TIME_TAG || core::CStringUtils::stringToType(traverser.value(), lastBucketEndTime) == false) { m_RestoredStateDetail.s_RestoredStateStatus = E_UnexpectedTag; LOG_ERROR(<< "Cannot restore anomaly detector - '" << TIME_TAG << "' element expected but found " << traverser.name() << '=' << traverser.value()); return false; } m_LastFinalisedBucketEndTime = lastBucketEndTime; if (lastBucketEndTime > completeToTime) { LOG_INFO(<< "Processing is already complete to time " << lastBucketEndTime); completeToTime = lastBucketEndTime; } if ((traverser.next() == false) || (traverser.name() != VERSION_TAG)) { m_RestoredStateDetail.s_RestoredStateStatus = E_UnexpectedTag; LOG_ERROR(<< "Cannot restore anomaly detector " << VERSION_TAG << " was expected"); return false; } const std::string& stateVersion = traverser.value(); if (stateVersion != model::CAnomalyDetector::STATE_VERSION) { m_RestoredStateDetail.s_RestoredStateStatus = E_IncorrectVersion; LOG_ERROR(<< "Restored anomaly detector state version is " << stateVersion << " - ignoring it as current state version is " << model::CAnomalyDetector::STATE_VERSION); // This counts as successful restoration return true; } while (traverser.next()) { const std::string& name = traverser.name(); if (name == INTERIM_BUCKET_CORRECTOR_TAG) { // Note that this has to be persisted and restored before any detectors. auto interimBucketCorrector = std::make_shared<model::CInterimBucketCorrector>( m_ModelConfig.bucketLength()); if (traverser.traverseSubLevel( [capture0 = interimBucketCorrector.get()]<typename T>(T && PH1) { return capture0->acceptRestoreTraverser(std::forward<T>(PH1)); }) == false) { LOG_ERROR(<< "Cannot restore interim bucket corrector"); return false; } m_ModelConfig.interimBucketCorrector(interimBucketCorrector); } else if (name == TOP_LEVEL_DETECTOR_TAG) { if (traverser.traverseSubLevel([this]<typename T>(T && PH1) { return restoreSingleDetector(std::forward<T>(PH1)); }) == false) { LOG_ERROR(<< "Cannot restore anomaly detector"); return false; } ++numDetectors; } else if (name == RESULTS_AGGREGATOR_TAG) { if (traverser.traverseSubLevel([ObjectPtr = &m_Aggregator]<typename T>(T && PH1) { return ObjectPtr->acceptRestoreTraverser(std::forward<T>(PH1)); }) == false) { LOG_ERROR(<< "Cannot restore results aggregator"); return false; } } else if (name == LATEST_RECORD_TIME_TAG) { core::CPersistUtils::restore(LATEST_RECORD_TIME_TAG, m_LatestRecordTime, traverser); } else if (name == LAST_RESULTS_TIME_TAG) { core::CPersistUtils::restore(LAST_RESULTS_TIME_TAG, m_LastResultsTime, traverser); } else if (name == INITIAL_LAST_FINALISED_BUCKET_END_TIME) { core::CPersistUtils::restore(INITIAL_LAST_FINALISED_BUCKET_END_TIME, m_InitialLastFinalisedBucketEndTime, traverser); } } m_RestoredStateDetail.s_RestoredStateStatus = E_Success; return true; } bool CAnomalyJob::restoreSingleDetector(core::CStateRestoreTraverser& traverser) { if (traverser.name() != KEY_TAG) { LOG_ERROR(<< "Cannot restore anomaly detector - " << KEY_TAG << " element expected but found " << traverser.name() << '=' << traverser.value()); m_RestoredStateDetail.s_RestoredStateStatus = E_UnexpectedTag; return false; } model::CSearchKey key; if (traverser.traverseSubLevel([&key]<typename T>(T && PH1) { return model::CAnomalyDetector::keyAcceptRestoreTraverser( std::forward<T>(PH1), key); }) == false) { LOG_ERROR(<< "Cannot restore anomaly detector - no key found in " << KEY_TAG); m_RestoredStateDetail.s_RestoredStateStatus = E_UnexpectedTag; return false; } if (traverser.next() == false) { LOG_ERROR(<< "Cannot restore anomaly detector - end of object reached when " << PARTITION_FIELD_TAG << " was expected"); m_RestoredStateDetail.s_RestoredStateStatus = E_UnexpectedTag; return false; } if (traverser.name() != PARTITION_FIELD_TAG) { LOG_ERROR(<< "Cannot restore anomaly detector - " << PARTITION_FIELD_TAG << " element expected but found " << traverser.name() << '=' << traverser.value()); m_RestoredStateDetail.s_RestoredStateStatus = E_UnexpectedTag; return false; } std::string partitionFieldValue; if (traverser.traverseSubLevel([&partitionFieldValue]<typename T>(T && PH1) { return model::CAnomalyDetector::partitionFieldAcceptRestoreTraverser( std::forward<T>(PH1), partitionFieldValue); }) == false) { LOG_ERROR(<< "Cannot restore anomaly detector - " "no partition field value found in " << PARTITION_FIELD_TAG); m_RestoredStateDetail.s_RestoredStateStatus = E_UnexpectedTag; return false; } if (traverser.next() == false) { LOG_ERROR(<< "Cannot restore anomaly detector - end of object reached when " << DETECTOR_TAG << " was expected"); m_RestoredStateDetail.s_RestoredStateStatus = E_UnexpectedTag; return false; } if (traverser.name() != DETECTOR_TAG) { LOG_ERROR(<< "Cannot restore anomaly detector - " << DETECTOR_TAG << " element expected but found " << traverser.name() << '=' << traverser.value()); m_RestoredStateDetail.s_RestoredStateStatus = E_UnexpectedTag; return false; } if (this->restoreDetectorState(key, partitionFieldValue, traverser) == false) { LOG_ERROR(<< "Delegated portion of anomaly detector restore failed"); m_RestoredStateDetail.s_RestoredStateStatus = E_Failure; return false; } LOG_TRACE(<< "Restored state for " << key.toCue() << "/" << partitionFieldValue); return true; } bool CAnomalyJob::restoreDetectorState(const model::CSearchKey& key, const std::string& partitionFieldValue, core::CStateRestoreTraverser& traverser) { const TAnomalyDetectorPtr& detector = this->detectorForKey(true, // for restoring 0, // time reset later key, partitionFieldValue, m_Limits.resourceMonitor()); if (!detector) { LOG_ERROR(<< "Detector with key '" << key.debug() << '/' << partitionFieldValue << "' was not recreated on restore - " "memory limit is too low to continue this job"); m_RestoredStateDetail.s_RestoredStateStatus = E_MemoryLimitReached; return false; } LOG_DEBUG(<< "Restoring state for detector with key '" << key.debug() << '/' << partitionFieldValue << '\''); if (traverser.traverseSubLevel([ capture0 = detector.get(), capture1 = std::cref(partitionFieldValue) ]<typename T>(T && PH1) { return capture0->acceptRestoreTraverser(capture1, std::forward<T>(PH1)); }) == false) { LOG_ERROR(<< "Error restoring anomaly detector for key '" << key.debug() << '/' << partitionFieldValue << '\''); return false; } return true; } bool CAnomalyJob::persistModelsState(core::CDataAdder& persister, core_t::TTime timestamp) { TKeyCRefAnomalyDetectorPtrPrVec detectors; this->sortedDetectors(detectors); // Persistence operates on a cached collection of counters rather than on the live counters directly. // This is in order that background persistence operates on a consistent set of counters however we // also must ensure that foreground persistence has access to an up-to-date cache of counters as well. core::CProgramCounters::cacheCounters(); return this->persistModelsState(detectors, persister, timestamp); } bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister, const std::string& descriptionPrefix) { if (m_LastFinalisedBucketEndTime == 0) { LOG_INFO(<< "Will not persist detectors as no results have been output"); return true; } core_t::TTime const snapshotTimestamp{core::CTimeUtils::now()}; const std::string snapshotId{core::CStringUtils::typeToString(snapshotTimestamp)}; const std::string description{descriptionPrefix + core::CTimeUtils::toIso8601(snapshotTimestamp)}; return this->doPersistStateInForeground(persister, description, snapshotId, snapshotTimestamp); } bool CAnomalyJob::doPersistStateInForeground(core::CDataAdder& persister, const std::string& description, const std::string& snapshotId, core_t::TTime snapshotTimestamp) { if (m_PersistenceManager != nullptr) { // This will not happen if finalise() was called before persisting state if (m_PersistenceManager->isBusy()) { LOG_ERROR(<< "Cannot perform foreground persistence of state - periodic " "background persistence is still in progress"); return false; } } TKeyCRefAnomalyDetectorPtrPrVec detectors; this->sortedDetectors(detectors); std::string normaliserState; m_Normalizer.toJson(m_LastResultsTime, "api", normaliserState, true); // Persistence of static counters is expected to operate on a cached collection of counters rather // than on the live counters directly. This is in order that the more frequently used background persistence // operates on a consistent set of counters. Hence, to avoid an error regarding the cache not existing, we // also must ensure that foreground persistence has access to an up-to-date cache of counters. core::CProgramCounters::cacheCounters(); return this->persistCopiedState( description, snapshotId, snapshotTimestamp, m_LastFinalisedBucketEndTime, detectors, m_Limits.resourceMonitor().createMemoryUsageReport( m_LastFinalisedBucketEndTime - m_ModelConfig.bucketLength()), m_ModelConfig.interimBucketCorrector(), m_Aggregator, normaliserState, m_LatestRecordTime, m_LastResultsTime, m_InitialLastFinalisedBucketEndTime, persister); } bool CAnomalyJob::backgroundPersistState() { LOG_INFO(<< "Background persist starting data copy"); if (m_PersistenceManager == nullptr) { return false; } // Pass arguments by value: this is what we want for // passing to a new thread. // Do NOT add std::ref wrappers around these arguments - they // MUST be copied for thread safety auto const args = std::make_shared<SBackgroundPersistArgs>( m_LastFinalisedBucketEndTime, m_Limits.resourceMonitor().createMemoryUsageReport( m_LastFinalisedBucketEndTime - m_ModelConfig.bucketLength()), m_ModelConfig.interimBucketCorrector(), m_Aggregator, m_LatestRecordTime, m_LastResultsTime, m_InitialLastFinalisedBucketEndTime); // The normaliser is non-copyable, so we have to make do with JSONifying it now; // it should be relatively fast though m_Normalizer.toJson(m_LastResultsTime, "api", args->s_NormalizerState, true); TKeyCRefAnomalyDetectorPtrPrVec& copiedDetectors = args->s_Detectors; copiedDetectors.reserve(m_Detectors.size()); for (const auto& detector_ : m_Detectors) { model::CAnomalyDetector* detector(detector_.second.get()); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } model::CSearchKey::TStrCRefKeyCRefPr const key( std::cref(detector_.first.first), std::cref(detector_.first.second)); if (detector->isSimpleCount()) { copiedDetectors.emplace_back( key, TAnomalyDetectorPtr(std::make_shared<model::CSimpleCountDetector>( true, *detector))); } else { copiedDetectors.emplace_back( key, std::make_shared<model::CAnomalyDetector>(true, *detector)); } } std::sort(copiedDetectors.begin(), copiedDetectors.end(), maths::common::COrderings::SFirstLess()); if (m_PersistenceManager->addPersistFunc([ this, args ]<typename T>(T && PH1) { return runBackgroundPersist(args, std::forward<T>(PH1)); }) == false) { LOG_ERROR(<< "Failed to add anomaly detector background persistence function"); return false; } m_PersistenceManager->useBackgroundPersistence(); return true; } bool CAnomalyJob::runForegroundPersist(core::CDataAdder& persister) { LOG_INFO(<< "Foreground persist commencing..."); // Prune the models so that the persisted state is as neat as possible this->pruneAllModels(); return this->persistStateInForeground(persister, "Periodic foreground persist at "); } bool CAnomalyJob::runBackgroundPersist(const TBackgroundPersistArgsPtr& args, core::CDataAdder& persister) { if (!args) { LOG_ERROR(<< "Unexpected NULL pointer passed to background persist"); return false; } core_t::TTime const snapshotTimestamp(core::CTimeUtils::now()); const std::string snapshotId(core::CStringUtils::typeToString(snapshotTimestamp)); const std::string description{"Periodic background persist at " + core::CTimeUtils::toIso8601(snapshotTimestamp)}; return this->persistCopiedState( description, snapshotId, snapshotTimestamp, args->s_Time, args->s_Detectors, args->s_ModelSizeStats, args->s_InterimBucketCorrector, args->s_Aggregator, args->s_NormalizerState, args->s_LatestRecordTime, args->s_LastResultsTime, args->s_InitialLastFinalizedBucketEndTime, persister); } bool CAnomalyJob::persistModelsState(const TKeyCRefAnomalyDetectorPtrPrVec& detectors, core::CDataAdder& persister, core_t::TTime timestamp) { try { const std::string snapShotId{core::CStringUtils::typeToString(timestamp)}; core::CDataAdder::TOStreamP strm = persister.addStreamed(m_JobId + '_' + STATE_TYPE + '_' + snapShotId); if (strm != nullptr) { { // The JSON inserter must be destroyed before the stream is complete using TStatePersistInserterUPtr = std::unique_ptr<core::CStatePersistInserter>; TStatePersistInserterUPtr inserter{[&strm]() -> TStatePersistInserterUPtr { return std::make_unique<CReadableJsonStatePersistInserter>(*strm); }()}; for (const auto& detector_ : detectors) { const model::CAnomalyDetector* detector(detector_.second.get()); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } detector->persistModelsState(*inserter); const std::string& description = detector->description(); LOG_DEBUG(<< "Persisted state for '" << description << "', at time " << timestamp << "detector->lastBucketEndTime() = " << detector->lastBucketEndTime()); } } if (persister.streamComplete(strm, true) == false || strm->bad()) { LOG_ERROR(<< "Failed to complete last persistence stream"); return false; } } } catch (std::exception& e) { LOG_ERROR(<< "Failed to persist state! " << e.what()); return false; } return true; } bool CAnomalyJob::persistCopiedState(const std::string& description, const std::string& snapshotId, core_t::TTime snapshotTimestamp, core_t::TTime time, const TKeyCRefAnomalyDetectorPtrPrVec& detectors, const model::CResourceMonitor::SModelSizeStats& modelSizeStats, const model::CInterimBucketCorrector& interimBucketCorrector, const model::CHierarchicalResultsAggregator& aggregator, const std::string& normalizerState, core_t::TTime latestRecordTime, core_t::TTime lastResultsTime, core_t::TTime initialLastFinalisedBucketEndTime, core::CDataAdder& persister) { // Ensure that the cache of program counters is cleared upon exiting the current scope. // As the cache is cleared when the simple count detector is persisted this may seem // unnecessary at first, but there are occasions when the simple count detector does not exist, // e.g. when no data is seen but time is advanced. core::CProgramCounters::CCacheManager const cacheMgr; // Persist state for each detector separately by streaming try { core::CStateCompressor compressor(persister); core::CDataAdder::TOStreamP strm = compressor.addStreamed(m_JobId + '_' + STATE_TYPE + '_' + snapshotId); if (strm != nullptr) { // IMPORTANT - this method can run in a background thread while the // analytics carries on processing new buckets in the main thread. // Therefore, this method must NOT access any member variables whose // values can change. There should be no use of m_ variables in the // following code block. { // The JSON inserter must be destructed before the stream is complete core::CJsonStatePersistInserter inserter(*strm); inserter.insertValue(TIME_TAG, time); inserter.insertValue(VERSION_TAG, model::CAnomalyDetector::STATE_VERSION); inserter.insertLevel( INTERIM_BUCKET_CORRECTOR_TAG, [ObjectPtr = &interimBucketCorrector]<typename T>(T && PH1) { ObjectPtr->acceptPersistInserter(std::forward<T>(PH1)); }); for (const auto& detector_ : detectors) { const model::CAnomalyDetector* detector(detector_.second.get()); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } if (detector->shouldPersistDetector() == false) { LOG_TRACE(<< "Not persisting state for '" << detector->description() << "'"); continue; } inserter.insertLevel( TOP_LEVEL_DETECTOR_TAG, [capture0 = std::cref(*detector)]<typename T>(T && PH1) { CAnomalyJob::persistIndividualDetector( capture0, std::forward<T>(PH1)); }); LOG_DEBUG(<< "Persisted state for '" << detector->description() << "'"); } inserter.insertLevel( RESULTS_AGGREGATOR_TAG, [ObjectPtr = &aggregator]<typename T>(T && PH1) { ObjectPtr->acceptPersistInserter(std::forward<T>(PH1)); }); core::CPersistUtils::persist(LATEST_RECORD_TIME_TAG, latestRecordTime, inserter); core::CPersistUtils::persist(LAST_RESULTS_TIME_TAG, lastResultsTime, inserter); core::CPersistUtils::persist(INITIAL_LAST_FINALISED_BUCKET_END_TIME, initialLastFinalisedBucketEndTime, inserter); } if (compressor.streamComplete(strm, true) == false || strm->bad()) { LOG_ERROR(<< "Failed to complete last persistence stream"); return false; } if (m_PersistCompleteFunc) { CModelSnapshotJsonWriter::SModelSnapshotReport const modelSnapshotReport{ MODEL_SNAPSHOT_MIN_VERSION, snapshotTimestamp, description, snapshotId, compressor.numCompressedDocs(), modelSizeStats, normalizerState, latestRecordTime, // This needs to be the last final result time as it serves // as the time after which all results are deleted when a // model snapshot is reverted time - m_ModelConfig.bucketLength()}; m_PersistCompleteFunc(modelSnapshotReport); } } } catch (std::exception& e) { LOG_ERROR(<< "Failed to persist state! " << e.what()); return false; } return true; } bool CAnomalyJob::periodicPersistStateInBackground() { // Prune the models so that the persisted state is as neat as possible this->pruneAllModels(); // Make sure model size stats are up to date for (const auto& detector_ : m_Detectors) { model::CAnomalyDetector* detector = detector_.second.get(); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } m_Limits.resourceMonitor().forceRefresh(*detector); } return this->backgroundPersistState(); } bool CAnomalyJob::periodicPersistStateInForeground() { // Do NOT pass this request on to the output chainer. // That logic is already present in persistStateInForeground. if (m_PersistenceManager == nullptr) { return false; } if (m_PersistenceManager->addPersistFunc([this]<typename T>(T && PH1) { return runForegroundPersist(std::forward<T>(PH1)); }) == false) { LOG_ERROR(<< "Failed to add anomaly detector foreground persistence function"); return false; } m_PersistenceManager->useForegroundPersistence(); return true; } void CAnomalyJob::updateAggregatorAndAggregate(bool isInterim, model::CHierarchicalResults& results) { m_Aggregator.refresh(m_ModelConfig); m_Aggregator.setJob(model::CHierarchicalResultsAggregator::E_Correct); // The equalizers are NOT updated with interim results. if (isInterim == false) { m_Aggregator.setJob(model::CHierarchicalResultsAggregator::E_UpdateAndCorrect); m_Aggregator.propagateForwardByTime(1.0); } results.bottomUpBreadthFirst(m_Aggregator); results.createPivots(); results.pivotsBottomUpBreadthFirst(m_Aggregator); } void CAnomalyJob::updateNormalizerAndNormalizeResults(bool isInterim, model::CHierarchicalResults& results) { m_Normalizer.setJob(model::CHierarchicalResultsNormalizer::E_RefreshSettings); results.bottomUpBreadthFirst(m_Normalizer); results.pivotsBottomUpBreadthFirst(m_Normalizer); // The normalizers are NOT updated with interim results, in other // words interim results are normalized with respect to previous // final results. if (isInterim == false) { m_Normalizer.propagateForwardByTime(1.0); m_Normalizer.setJob(model::CHierarchicalResultsNormalizer::E_UpdateQuantiles); results.bottomUpBreadthFirst(m_Normalizer); results.pivotsBottomUpBreadthFirst(m_Normalizer); } m_Normalizer.setJob(model::CHierarchicalResultsNormalizer::E_NormalizeScores); results.bottomUpBreadthFirst(m_Normalizer); results.pivotsBottomUpBreadthFirst(m_Normalizer); } void CAnomalyJob::outputResultsWithinRange(bool isInterim, core_t::TTime start, core_t::TTime end) { if (m_LastFinalisedBucketEndTime <= 0) { return; } if (start < m_LastFinalisedBucketEndTime) { LOG_WARN(<< "Cannot output results for range (" << start << ", " << m_LastFinalisedBucketEndTime << "): Start time is before last finalized bucket end time " << m_LastFinalisedBucketEndTime << '.'); start = m_LastFinalisedBucketEndTime; } if (start > end) { LOG_ERROR(<< "Cannot output results for range (" << start << ", " << end << "): Start time is later than end time."); return; } core_t::TTime const bucketLength = m_ModelConfig.bucketLength(); core_t::TTime time = maths::common::CIntegerTools::floor(start, bucketLength); core_t::TTime const bucketEnd = maths::common::CIntegerTools::ceil(end, bucketLength); while (time < bucketEnd) { if (isInterim) { this->outputInterimResults(time); } else { this->outputResults(time); } m_Limits.resourceMonitor().sendMemoryUsageReportIfSignificantlyChanged(time, bucketLength); time += bucketLength; } } void CAnomalyJob::generateModelPlot(core_t::TTime startTime, core_t::TTime endTime, const model::CAnomalyDetector& detector, TModelPlotDataVec& modelPlotData) { double const modelPlotBoundsPercentile(m_ModelConfig.modelPlotBoundsPercentile()); if (modelPlotBoundsPercentile > 0.0) { LOG_TRACE(<< "Generating model debug data at " << startTime); detector.generateModelPlot(startTime, endTime, m_ModelConfig.modelPlotBoundsPercentile(), m_ModelConfig.modelPlotTerms(), modelPlotData); } } void CAnomalyJob::writeOutModelPlot(const TModelPlotDataVec& modelPlotData) { CModelPlotDataJsonWriter modelPlotWriter(m_OutputStream); for (const auto& plot : modelPlotData) { modelPlotWriter.writeFlat(m_JobId, plot); } } void CAnomalyJob::writeOutAnnotations(const TAnnotationVec& annotations) { CAnnotationJsonWriter annotationWriter(m_OutputStream); for (const auto& annotation : annotations) { annotationWriter.writeResult(m_JobId, annotation); } } void CAnomalyJob::refreshMemoryAndReport() { core_t::TTime const bucketLength{m_ModelConfig.bucketLength()}; if (m_LastFinalisedBucketEndTime < bucketLength) { LOG_ERROR(<< "Cannot report memory usage because last finalized bucket end time (" << m_LastFinalisedBucketEndTime << ") is smaller than bucket span (" << bucketLength << ')'); return; } // Make sure model size stats are up to date and then send a final memory // usage report for (const auto& detector_ : m_Detectors) { model::CAnomalyDetector* detector = detector_.second.get(); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } m_Limits.resourceMonitor().forceRefresh(*detector); } m_Limits.resourceMonitor().sendMemoryUsageReport( m_LastFinalisedBucketEndTime - bucketLength, bucketLength); } void CAnomalyJob::persistIndividualDetector(const model::CAnomalyDetector& detector, core::CStatePersistInserter& inserter) { inserter.insertLevel(KEY_TAG, [ObjectPtr = &detector]<typename T>(T && PH1) { ObjectPtr->keyAcceptPersistInserter(std::forward<T>(PH1)); }); inserter.insertLevel(PARTITION_FIELD_TAG, [ObjectPtr = &detector]<typename T>(T && PH1) { ObjectPtr->partitionFieldAcceptPersistInserter(std::forward<T>(PH1)); }); inserter.insertLevel(DETECTOR_TAG, [ObjectPtr = &detector]<typename T>(T && PH1) { ObjectPtr->acceptPersistInserter(std::forward<T>(PH1)); }); } void CAnomalyJob::detectors(TAnomalyDetectorPtrVec& detectors) const { detectors.clear(); detectors.reserve(m_Detectors.size()); for (const auto& detector : m_Detectors) { detectors.push_back(detector.second); } } void CAnomalyJob::sortedDetectors(TKeyCRefAnomalyDetectorPtrPrVec& detectors) const { detectors.reserve(m_Detectors.size()); for (const auto& detector : m_Detectors) { detectors.emplace_back( model::CSearchKey::TStrCRefKeyCRefPr(std::cref(detector.first.first), std::cref(detector.first.second)), detector.second); } std::sort(detectors.begin(), detectors.end(), maths::common::COrderings::SFirstLess()); } const CAnomalyJob::TKeyAnomalyDetectorPtrUMap& CAnomalyJob::detectorPartitionMap() const { return m_Detectors; } const CAnomalyJob::TAnomalyDetectorPtr& CAnomalyJob::detectorForKey(bool isRestoring, core_t::TTime time, const model::CSearchKey& key, const std::string& partitionFieldValue, const model::CResourceMonitor& resourceMonitor) { // The simple count detector always lives in a special null partition. const std::string& partition = key.isSimpleCount() ? EMPTY_STRING : partitionFieldValue; // Try and get the detector. auto itr = m_Detectors.find( model::CSearchKey::TStrCRefKeyCRefPr(std::cref(partition), std::cref(key)), model::CStrKeyPrHash(), model::CStrKeyPrEqual()); // Check if we need to and are allowed to create a new detector. if (itr == m_Detectors.end() && resourceMonitor.areAllocationsAllowed()) { // Create an placeholder for the anomaly detector. TAnomalyDetectorPtr& detector = m_Detectors .emplace(model::CSearchKey::TStrKeyPr(partition, key), TAnomalyDetectorPtr()) .first->second; LOG_TRACE(<< "Creating new detector for key '" << key.debug() << '/' << partition << '\'' << ", time " << time); LOG_TRACE(<< "Detector count " << m_Detectors.size()); detector = ml::api::CAnomalyJob::makeDetector( m_ModelConfig, m_Limits, partition, time, m_ModelConfig.factory(key)); if (detector == nullptr) { // This should never happen as CAnomalyDetectorUtils::makeDetector() // contracts to never return NULL LOG_ABORT(<< "Failed to create anomaly detector for key '" << key.debug() << '\''); } detector->zeroModelsToTime(time - m_ModelConfig.latency()); if (isRestoring == false) { m_Limits.resourceMonitor().forceRefresh(*detector); } return detector; } if (itr == m_Detectors.end()) { LOG_TRACE(<< "No memory to create new detector for key '" << key.debug() << '/' << partition << '\''); return NULL_DETECTOR; } return itr->second; } void CAnomalyJob::pruneAllModels(std::size_t buckets) const { if (buckets == 0) { LOG_INFO(<< "Pruning obsolete models"); } else { LOG_DEBUG(<< "Pruning all models older than " << buckets << " buckets"); } for (const auto& detector_ : m_Detectors) { model::CAnomalyDetector* detector = detector_.second.get(); if (detector == nullptr) { LOG_ERROR(<< "Unexpected NULL pointer for key '" << pairDebug(detector_.first) << '\''); continue; } (buckets == 0) ? detector->pruneModels() : detector->pruneModels(buckets); } } const model::CHierarchicalResultsNormalizer& CAnomalyJob::normalizer() const { return m_Normalizer; } CAnomalyJob::TAnomalyDetectorPtr CAnomalyJob::makeDetector(const model::CAnomalyDetectorModelConfig& modelConfig, model::CLimits& limits, const std::string& partitionFieldValue, core_t::TTime firstTime, const model::CAnomalyDetector::TModelFactoryCPtr& modelFactory) { return modelFactory->isSimpleCount() ? std::make_shared<model::CSimpleCountDetector>( modelFactory->summaryMode(), modelConfig, std::ref(limits), partitionFieldValue, firstTime, modelFactory) : std::make_shared<model::CAnomalyDetector>(std::ref(limits), modelConfig, partitionFieldValue, firstTime, modelFactory); } void CAnomalyJob::populateDetectorKeys(const CAnomalyJobConfig& jobConfig, TKeyVec& keys) { keys.clear(); // Always add a key for the simple count detector. keys.push_back(model::CSearchKey::simpleCountKey()); for (const auto& fieldOptions : jobConfig.analysisConfig().detectorsConfig()) { keys.emplace_back(fieldOptions.detectorIndex(), fieldOptions.function(), fieldOptions.useNull(), fieldOptions.excludeFrequent(), fieldOptions.fieldName(), fieldOptions.byFieldName(), fieldOptions.overFieldName(), fieldOptions.partitionFieldName(), jobConfig.analysisConfig().influencers()); } } const std::string* CAnomalyJob::fieldValue(const std::string& fieldName, const TStrStrUMap& dataRowFields) { TStrStrUMapCItr const itr = fieldName.empty() ? dataRowFields.end() : dataRowFields.find(fieldName); const std::string& fieldValue(itr == dataRowFields.end() ? EMPTY_STRING : itr->second); return !fieldName.empty() && fieldValue.empty() ? nullptr : &fieldValue; } void CAnomalyJob::addRecord(const TAnomalyDetectorPtr& detector, core_t::TTime time, const TStrStrUMap& dataRowFields) { model::CAnomalyDetector::TStrCPtrVec fieldValues; const TStrVec& fieldNames = detector->fieldsOfInterest(); fieldValues.reserve(fieldNames.size()); for (const auto& fieldName : fieldNames) { fieldValues.push_back(fieldValue(fieldName, dataRowFields)); } detector->addRecord(time, fieldValues); } CAnomalyJob::SBackgroundPersistArgs::SBackgroundPersistArgs( core_t::TTime time, const model::CResourceMonitor::SModelSizeStats& modelSizeStats, model::CInterimBucketCorrector interimBucketCorrector, const model::CHierarchicalResultsAggregator& aggregator, core_t::TTime latestRecordTime, core_t::TTime lastResultsTime, core_t::TTime initialLastFinalisedBucketEndTime) : s_Time(time), s_ModelSizeStats(modelSizeStats), s_InterimBucketCorrector(std::move(interimBucketCorrector)), s_Aggregator(aggregator), s_LatestRecordTime(latestRecordTime), s_LastResultsTime(lastResultsTime), s_InitialLastFinalizedBucketEndTime(initialLastFinalisedBucketEndTime) { } } }