include/model/CResourceMonitor.h (141 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#ifndef INCLUDED_ml_model_CResourceMonitor_h
#define INCLUDED_ml_model_CResourceMonitor_h
#include <core/CoreTypes.h>
#include <maths/common/CBasicStatistics.h>
#include <model/ImportExport.h>
#include <model/ModelTypes.h>
#include <model/SCategorizerStats.h>
#include <boost/unordered_map.hpp>
#include <functional>
#include <map>
namespace CResourceMonitorTest {
class CTestFixture;
struct testMonitor;
struct testPeakUsage;
struct testPruning;
struct testUpdateMoments;
}
namespace CResourceLimitTest {
class CTestFixture;
}
namespace CAnomalyJobLimitTest {
struct testAccuracy;
struct testLimit;
}
namespace ml {
namespace model {
class CMonitoredResource;
//! \brief Assess memory used by models and decide on further memory allocations.
//!
//! DESCRIPTION:\n
//! Assess memory used by models and decide on further memory allocations.
class MODEL_EXPORT CResourceMonitor {
public:
struct MODEL_EXPORT SModelSizeStats {
std::size_t s_Usage{0};
std::size_t s_AdjustedUsage{0};
std::size_t s_PeakUsage{0};
std::size_t s_AdjustedPeakUsage{0};
std::size_t s_ByFields{0};
std::size_t s_PartitionFields{0};
std::size_t s_OverFields{0};
std::size_t s_AllocationFailures{0};
model_t::EMemoryStatus s_MemoryStatus{model_t::E_MemoryStatusOk};
model_t::EAssignmentMemoryBasis s_AssignmentMemoryBasis{model_t::E_AssignmentBasisUnknown};
core_t::TTime s_BucketStartTime{0};
std::size_t s_BytesExceeded{0};
std::size_t s_BytesMemoryLimit{0};
std::size_t s_OutputMemoryAllocatorUsage{0};
SCategorizerStats s_OverallCategorizerStats;
};
using TMemoryUsageReporterFunc =
std::function<void(const CResourceMonitor::SModelSizeStats&)>;
public:
//! The minimum time between prunes
static const core_t::TTime MINIMUM_PRUNE_FREQUENCY;
//! Default memory limit for resource monitor
static const std::size_t DEFAULT_MEMORY_LIMIT_MB;
//! The initial byte limit margin to use if none is supplied
static const double DEFAULT_BYTE_LIMIT_MARGIN;
//! The maximum value of elapsed time used to scale the byte limit margin
static const core_t::TTime MAXIMUM_BYTE_LIMIT_MARGIN_PERIOD;
public:
//! Default constructor
explicit CResourceMonitor(bool persistenceInForeground = false,
double byteLimitMargin = DEFAULT_BYTE_LIMIT_MARGIN);
//! Query the resource monitor to find out if the models are
//! taking up too much memory and further allocations should be banned
bool areAllocationsAllowed() const;
//! Return the amount of remaining space for allocations
std::size_t allocationLimit() const;
//! Register a resource with the monitor - these classes
//! contain all the model memory and are used to query
//! the current overall usage
void registerComponent(CMonitoredResource& resource);
//! Inform this resource monitor instance that a monitored resource is
//! going to be deleted.
void unRegisterComponent(CMonitoredResource& resource);
//! Register a callback to be used when the memory usage grows
void memoryUsageReporter(const TMemoryUsageReporterFunc& reporter);
//! Recalculate the memory usage if there is a memory limit
void refresh(CMonitoredResource& resource);
//! Recalculate the memory usage regardless of whether there is a memory limit
void forceRefresh(CMonitoredResource& resource);
//! Recalculate the memory usage for all monitored resources
void forceRefreshAll();
//! Set the internal memory limit, as specified in a limits config file
void memoryLimit(std::size_t limitMBs);
std::size_t getBytesMemoryLimit() const;
//! Get the memory status
model_t::EMemoryStatus memoryStatus() const;
//! Get categorizer allocation failures
std::size_t categorizerAllocationFailures() const;
//! Set categorizer allocation failures
void categorizerAllocationFailures(std::size_t categorizerAllocationFailures);
//! Send a memory usage report if it's changed by more than a certain percentage
void sendMemoryUsageReportIfSignificantlyChanged(core_t::TTime bucketStartTime,
core_t::TTime bucketLength);
//! Send a memory usage report
void sendMemoryUsageReport(core_t::TTime bucketStartTime, core_t::TTime bucketLength);
//! Create a memory usage report
SModelSizeStats createMemoryUsageReport(core_t::TTime bucketStartTime);
//! We are being told that a class has failed to allocate memory
//! based on the resource limits, and we will report this to the
//! user when we can
void acceptAllocationFailureResult(core_t::TTime time);
//! We are being told that aggressive pruning has taken place
//! to avoid hitting the resource limit, and we should report this
//! to the user when we can
void startPruning();
//! We are being told that aggressive pruning to avoid hitting the
//! resource limit is no longer necessary, and we should report this
//! to the user when we can
void endPruning();
//! Accessor for no limit flag
bool haveNoLimit() const;
//! Prune models where necessary
//! \return Was pruning required?
bool pruneIfRequired(core_t::TTime endTime);
//! Accounts for any extra memory to the one
//! reported by the components.
//! Used in conjunction with clearExtraMemory()
//! in order to ensure enough memory remains
//! for model's parts that have not been fully allocated yet.
void addExtraMemory(std::size_t reserved);
//! Clears all extra memory
void clearExtraMemory();
//! Decrease the margin on the memory limit.
//!
//! We start off applying a 'safety' margin to the memory limit because
//! it is difficult to accurately estimate the long term memory
//! usage at this point. This safety margin is gradually decreased over time
//! by calling this once per bucket processed until the initially requested memory limit is reached.
void decreaseMargin(core_t::TTime elapsedTime);
//! Returns the sum of used memory plus any extra memory
std::size_t totalMemory() const;
private:
using TMonitoredResourcePtrSizeUMap =
boost::unordered_map<CMonitoredResource*, std::size_t>;
using TMeanVarAccumulator =
maths::common::CBasicStatistics::SSampleMeanVar<double>::TAccumulator;
private:
//! Updates the memory limit fields and the prune threshold
//! to the given value.
void updateMemoryLimitsAndPruneThreshold(std::size_t limitMBs);
//! Update the given model and recalculate the total usage
void memUsage(CMonitoredResource* resource);
//! Update the moments that are used to determine whether memory is stable
void updateMoments(std::size_t totalMemory,
core_t::TTime bucketStartTime,
core_t::TTime bucketLength);
//! Determine if we need to send a usage report, based on
//! increased usage, or increased errors
bool needToSendReport(model_t::EAssignmentMemoryBasis currentAssignmentMemoryBasis,
core_t::TTime bucketStartTime,
core_t::TTime bucketLength);
//! Report whether memory usage has been sufficiently stable in
//! recent reports to justify switching from using the model
//! memory limit to actual memory usage when deciding which node
//! to assign the job to.
bool isMemoryStable(core_t::TTime bucketLength) const;
//! After a change in memory usage, check whether allocations
//! shoule be allowed or not
void updateAllowAllocations();
//! Get the high memory limit with margin applied.
std::size_t highLimit() const;
//! Get the low memory limit with margin applied.
std::size_t lowLimit() const;
//! Adjusts the amount of memory reported to take into
//! account the current value of the byte limit margin and the effects
//! of background persistence.
std::size_t adjustedUsage(std::size_t usage) const;
//! Returns the amount by which reported memory usage is scaled depending on the type of persistence in use
std::size_t persistenceMemoryIncreaseFactor() const;
private:
//! The registered collection of components
TMonitoredResourcePtrSizeUMap m_Resources;
//! Is there enough free memory to allow creating new components
bool m_AllowAllocations{true};
//! The relative margin to apply to the byte limits.
double m_ByteLimitMargin;
//! The upper limit for memory usage, checked on increasing values
std::size_t m_ByteLimitHigh{0};
//! The lower limit for memory usage, checked on decreasing values
std::size_t m_ByteLimitLow{0};
//! The memory usage of the monitored resources based on the most recent
//! calculation
std::size_t m_MonitoredResourceCurrentMemory{0};
//! Extra memory to enable accounting of soon to be allocated memory
std::size_t m_ExtraMemory{0};
//! The total memory usage on the previous usage report
std::size_t m_PreviousTotal;
//! Callback function to fire when memory usage increases by 1%
TMemoryUsageReporterFunc m_MemoryUsageReporter;
//! Keep track of the number of distinct allocation failures
std::size_t m_AllocationFailuresCount{0};
//! The time at which the last allocation failure occurred
core_t::TTime m_LastAllocationFailureTime{0};
//! The time at which the last allocation failure was reported
core_t::TTime m_LastAllocationFailureReport{0};
//! Keep track of the model memory status
model_t::EMemoryStatus m_MemoryStatus{model_t::E_MemoryStatusOk};
//! Keep track of whether pruning has started, for efficiency in most cases
bool m_HasPruningStarted{false};
//! The threshold at which pruning should kick in and head
//! towards for the sweet spot
std::size_t m_PruneThreshold{0};
//! The last time we did a full prune of all the models
core_t::TTime m_LastPruneTime{0};
//! Number of buckets to go back when pruning
std::size_t m_PruneWindow;
//! The largest that the prune window can grow to - determined from the models
std::size_t m_PruneWindowMaximum;
//! The smallest that the prune window can shrink to - determined from the models
std::size_t m_PruneWindowMinimum;
//! Don't do any sort of memory checking if this is set
bool m_NoLimit{false};
//! The number of bytes over the high limit for memory usage at the last allocation failure
std::size_t m_CurrentBytesExceeded{0};
//! Is persistence occurring in the foreground?
bool m_PersistenceInForeground;
//! Number of categorizer allocation failures to date
std::size_t m_CategorizerAllocationFailures{0};
//! Estimates of mean and variance for recent reports of model bytes
TMeanVarAccumulator m_ModelBytesMoments;
//! Time the m_ModelBytesMoments was first updated
core_t::TTime m_FirstMomentsUpdateTime{0};
//! Time the m_ModelBytesMoments was last updated
core_t::TTime m_LastMomentsUpdateTime{0};
//! Test friends
friend class CResourceLimitTest::CTestFixture;
friend class CResourceMonitorTest::CTestFixture;
friend struct CResourceMonitorTest::testMonitor;
friend struct CResourceMonitorTest::testPeakUsage;
friend struct CResourceMonitorTest::testPruning;
friend struct CResourceMonitorTest::testUpdateMoments;
friend struct CAnomalyJobLimitTest::testAccuracy;
friend struct CAnomalyJobLimitTest::testLimit;
};
}
}
#endif // INCLUDED_ml_model_CResourceMonitor_h