lib/model/CResourceMonitor.cc (368 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CResourceMonitor.h>
#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <core/CProgramCounters.h>
#include <core/Constants.h>
#include <maths/common/CMathsFuncs.h>
#include <maths/common/CTools.h>
#include <model/CMonitoredResource.h>
#include <algorithm>
#include <cmath>
#include <limits>
namespace {
const std::size_t BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE{20};
const double ESTABLISHED_MEMORY_CV_THRESHOLD{0.1};
}
namespace ml {
namespace model {
// Only prune once per hour
const core_t::TTime CResourceMonitor::MINIMUM_PRUNE_FREQUENCY{60 * 60};
const std::size_t CResourceMonitor::DEFAULT_MEMORY_LIMIT_MB{4096};
const double CResourceMonitor::DEFAULT_BYTE_LIMIT_MARGIN{0.7};
const core_t::TTime CResourceMonitor::MAXIMUM_BYTE_LIMIT_MARGIN_PERIOD{2 * core::constants::HOUR};
CResourceMonitor::CResourceMonitor(bool persistenceInForeground, double byteLimitMargin)
: m_ByteLimitMargin{byteLimitMargin}, m_PreviousTotal{this->totalMemory()},
m_PruneWindow{std::numeric_limits<std::size_t>::max()},
m_PruneWindowMaximum{std::numeric_limits<std::size_t>::max()},
m_PruneWindowMinimum{std::numeric_limits<std::size_t>::max()},
m_PersistenceInForeground{persistenceInForeground} {
this->updateMemoryLimitsAndPruneThreshold(DEFAULT_MEMORY_LIMIT_MB);
}
void CResourceMonitor::memoryUsageReporter(const TMemoryUsageReporterFunc& reporter) {
m_MemoryUsageReporter = reporter;
}
void CResourceMonitor::registerComponent(CMonitoredResource& resource) {
LOG_TRACE(<< "Registering component: " << &resource);
m_Resources.emplace(&resource, 0);
}
void CResourceMonitor::unRegisterComponent(CMonitoredResource& resource) {
LOG_TRACE(<< "Unregistering component: " << &resource);
auto itr = m_Resources.find(&resource);
if (itr == m_Resources.end()) {
LOG_ERROR(<< "Inconsistency - component has not been registered: " << &resource);
return;
}
m_MonitoredResourceCurrentMemory -= itr->second;
m_Resources.erase(itr);
std::size_t total{this->totalMemory()};
core::CProgramCounters::counter(counter_t::E_TSADMemoryUsage) = total;
core::CProgramCounters::counter(counter_t::E_TSADPeakMemoryUsage) = std::max(
static_cast<std::size_t>(core::CProgramCounters::counter(counter_t::E_TSADPeakMemoryUsage)),
total);
}
void CResourceMonitor::memoryLimit(std::size_t limitMBs) {
this->updateMemoryLimitsAndPruneThreshold(limitMBs);
if (m_NoLimit) {
LOG_INFO(<< "Setting no model memory limit");
} else {
LOG_INFO(<< "Setting model memory limit to " << limitMBs << " MB");
}
}
std::size_t CResourceMonitor::getBytesMemoryLimit() const {
return m_ByteLimitHigh * this->persistenceMemoryIncreaseFactor();
}
void CResourceMonitor::updateMemoryLimitsAndPruneThreshold(std::size_t limitMBs) {
// The threshold for no limit is set such that any negative limit cast to
// a size_t (which is unsigned) will be taken to mean no limit
if (limitMBs > std::numeric_limits<std::size_t>::max() / 2) {
m_NoLimit = true;
// The high limit is set to around half what it could potentially be.
// The reason is that other code will do "what if" calculations on this
// number, such as "what would total memory usage be if we allocated 10
// more models?", and it causes problems if these calculations overflow.
m_ByteLimitHigh = std::numeric_limits<std::size_t>::max() / 2 + 1;
} else {
m_ByteLimitHigh = (limitMBs * core::constants::BYTES_IN_MEGABYTES) /
this->persistenceMemoryIncreaseFactor();
}
m_ByteLimitLow = (m_ByteLimitHigh * 49) / 50;
m_PruneThreshold = (m_ByteLimitHigh * 3) / 5;
}
model_t::EMemoryStatus CResourceMonitor::memoryStatus() const {
return m_MemoryStatus;
}
std::size_t CResourceMonitor::categorizerAllocationFailures() const {
return m_CategorizerAllocationFailures;
}
void CResourceMonitor::categorizerAllocationFailures(std::size_t categorizerAllocationFailures) {
m_CategorizerAllocationFailures = categorizerAllocationFailures;
}
void CResourceMonitor::refresh(CMonitoredResource& resource) {
if (m_NoLimit) {
return;
}
this->forceRefresh(resource);
}
void CResourceMonitor::forceRefresh(CMonitoredResource& resource) {
this->memUsage(&resource);
this->updateAllowAllocations();
}
void CResourceMonitor::forceRefreshAll() {
for (auto& resource : m_Resources) {
this->memUsage(resource.first);
}
this->updateAllowAllocations();
}
void CResourceMonitor::updateAllowAllocations() {
std::size_t total{this->totalMemory()};
core::CProgramCounters::counter(counter_t::E_TSADMemoryUsage) = total;
LOG_TRACE(<< "Checking allocations: currently at " << total);
if (m_AllowAllocations) {
if (total > this->highLimit()) {
LOG_INFO(<< "Over current allocation high limit. " << total
<< " bytes used, the limit is " << this->highLimit());
m_AllowAllocations = false;
std::size_t bytesExceeded{total - this->highLimit()};
m_CurrentBytesExceeded = this->adjustedUsage(bytesExceeded);
}
} else if (total < this->lowLimit()) {
LOG_INFO(<< "Below current allocation low limit. " << total
<< " bytes used, the limit is " << this->lowLimit());
m_AllowAllocations = true;
}
}
bool CResourceMonitor::pruneIfRequired(core_t::TTime endTime) {
// The basic idea here is that as the memory usage goes up, we
// prune models to bring it down again. If usage declines, we
// relax the pruning window to let it go back up again.
std::size_t total{this->totalMemory()};
bool aboveThreshold = total > m_PruneThreshold;
if (m_HasPruningStarted == false && !aboveThreshold) {
LOG_TRACE(<< "No pruning required. " << total << " / " << m_PruneThreshold);
return false;
}
if (endTime < m_LastPruneTime + MINIMUM_PRUNE_FREQUENCY) {
LOG_TRACE(<< "Too soon since last prune to prune again");
return false;
}
if (m_Resources.empty()) {
return false;
}
if (m_HasPruningStarted == false) {
for (const auto& resource : m_Resources) {
if (resource.first->supportsPruning() &&
resource.first->initPruneWindow(m_PruneWindowMaximum, m_PruneWindowMinimum)) {
m_PruneWindow = m_PruneWindowMaximum;
this->startPruning();
break;
}
}
if (m_HasPruningStarted == false) {
return false;
}
}
if (aboveThreshold) {
// Do a prune and see how much we got back
// These are the expensive operations
std::size_t usageAfter = 0;
for (auto& resource : m_Resources) {
if (resource.first->supportsPruning()) {
resource.first->prune(m_PruneWindow);
resource.second = core::memory::dynamicSize(resource.first);
}
usageAfter += resource.second;
}
m_MonitoredResourceCurrentMemory = usageAfter;
total = this->totalMemory();
this->updateAllowAllocations();
}
LOG_TRACE(<< "Pruning models. Usage: " << total
<< ". Current window: " << m_PruneWindow << " buckets");
if (total < m_PruneThreshold) {
// Expand the window
for (const auto& resource : m_Resources) {
if (resource.first->supportsPruning()) {
m_PruneWindow = m_PruneWindow +
std::size_t((endTime - m_LastPruneTime) /
resource.first->bucketLength());
if (m_PruneWindow > m_PruneWindowMaximum) {
// If we increase the prune window to a size that's bigger
// than what we started with then we should stop pruning to
// be consistent with what would happen if the job was
// closed and reopened
m_PruneWindow = m_PruneWindowMaximum;
this->endPruning();
} else {
LOG_TRACE(<< "Expanding window to " << m_PruneWindow);
}
break;
}
}
} else {
// Shrink the window
m_PruneWindow = std::max(m_PruneWindow * 99 / 100, m_PruneWindowMinimum);
LOG_TRACE(<< "Shrinking window, to " << m_PruneWindow);
}
m_LastPruneTime = endTime;
return aboveThreshold;
}
bool CResourceMonitor::areAllocationsAllowed() const {
return m_AllowAllocations;
}
std::size_t CResourceMonitor::allocationLimit() const {
return this->highLimit() - std::min(this->highLimit(), this->totalMemory());
}
void CResourceMonitor::memUsage(CMonitoredResource* resource) {
auto itr = m_Resources.find(resource);
if (itr == m_Resources.end()) {
LOG_ERROR(<< "Inconsistency - component has not been registered: " << resource);
return;
}
std::size_t modelPreviousUsage = itr->second;
std::size_t modelCurrentUsage = core::memory::dynamicSize(itr->first);
itr->second = modelCurrentUsage;
m_MonitoredResourceCurrentMemory += (modelCurrentUsage - modelPreviousUsage);
}
void CResourceMonitor::sendMemoryUsageReportIfSignificantlyChanged(core_t::TTime bucketStartTime,
core_t::TTime bucketLength) {
std::uint64_t assignmentMemoryBasis{
core::CProgramCounters::counter(counter_t::E_TSADAssignmentMemoryBasis)};
if (this->needToSendReport(static_cast<model_t::EAssignmentMemoryBasis>(assignmentMemoryBasis),
bucketStartTime, bucketLength)) {
this->sendMemoryUsageReport(bucketStartTime, bucketLength);
}
}
void CResourceMonitor::updateMoments(std::size_t totalMemory,
core_t::TTime bucketStartTime,
core_t::TTime bucketLength) {
if (m_FirstMomentsUpdateTime <= 0) {
m_FirstMomentsUpdateTime = bucketStartTime;
} else {
if (bucketLength > 0 && bucketStartTime >= m_LastMomentsUpdateTime + bucketLength) {
// The idea is to age this so that observations from more than 20
// buckets ago have little effect. This means the end results will
// be close to what the old Java calculation did - it literally
// searched the last 20 buckets. Aging at e^-0.1 seems reasonable
// to reduce the variance at the required rate.
double factor{std::exp(
-static_cast<double>((bucketStartTime - m_LastMomentsUpdateTime) / bucketLength) /
static_cast<double>(BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE / 2))};
m_ModelBytesMoments.age(factor);
}
}
m_ModelBytesMoments.add(static_cast<double>(totalMemory));
m_LastMomentsUpdateTime = bucketStartTime;
}
bool CResourceMonitor::needToSendReport(model_t::EAssignmentMemoryBasis currentAssignmentMemoryBasis,
core_t::TTime bucketStartTime,
core_t::TTime bucketLength) {
std::size_t total{this->totalMemory()};
// Update the moments that are used to determine whether memory is stable
this->updateMoments(total, bucketStartTime, bucketLength);
// Has the usage changed by more than 1% ?
if ((std::max(total, m_PreviousTotal) - std::min(total, m_PreviousTotal)) >
m_PreviousTotal / 100) {
return true;
}
// Is the assignment memory basis changing?
if ((currentAssignmentMemoryBasis == model_t::E_AssignmentBasisUnknown ||
currentAssignmentMemoryBasis == model_t::E_AssignmentBasisModelMemoryLimit) &&
this->isMemoryStable(bucketLength)) {
return true;
}
// Have we had new allocation failures
if (m_AllocationFailuresCount != 0) {
if (m_LastAllocationFailureTime > m_LastAllocationFailureReport) {
return true;
}
}
return false;
}
bool CResourceMonitor::isMemoryStable(core_t::TTime bucketLength) const {
// Sanity check
if (maths::common::CBasicStatistics::count(m_ModelBytesMoments) == 0.0) {
LOG_ERROR(<< "Programmatic error: checking memory stability before adding any measurements");
return false;
}
// Must have been monitoring for 20 buckets
std::size_t bucketCount{
static_cast<std::size_t>((m_LastMomentsUpdateTime - m_FirstMomentsUpdateTime) / bucketLength) +
1};
if (bucketCount < BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) {
return false;
}
// Coefficient of variation must be less than 0.1
double mean{maths::common::CBasicStatistics::mean(m_ModelBytesMoments)};
double variance{maths::common::CBasicStatistics::variance(m_ModelBytesMoments)};
LOG_TRACE(<< "Model memory stability at " << m_LastMomentsUpdateTime
<< ": bucket count = " << bucketCount << ", sample count = "
<< maths::common::CBasicStatistics::count(m_ModelBytesMoments)
<< ", mean = " << mean << ", variance = " << variance
<< ", coefficient of variation = " << (std::sqrt(variance) / mean));
// Instead of literally testing the coefficient of variation it's more
// robust against zeroes and NaNs to rearrange it as follows
return maths::common::CMathsFuncs::isNan(variance) == false &&
variance <= maths::common::CTools::pow2(ESTABLISHED_MEMORY_CV_THRESHOLD * mean);
}
void CResourceMonitor::sendMemoryUsageReport(core_t::TTime bucketStartTime,
core_t::TTime bucketLength) {
std::size_t total{this->totalMemory()};
if (this->isMemoryStable(bucketLength)) {
core::CProgramCounters::counter(counter_t::E_TSADAssignmentMemoryBasis) =
static_cast<std::uint64_t>(model_t::E_AssignmentBasisCurrentModelBytes);
}
core::CProgramCounters::counter(counter_t::E_TSADPeakMemoryUsage) = std::max(
static_cast<std::size_t>(core::CProgramCounters::counter(counter_t::E_TSADPeakMemoryUsage)),
total);
if (m_MemoryUsageReporter) {
m_MemoryUsageReporter(this->createMemoryUsageReport(bucketStartTime));
if (m_AllocationFailuresCount != 0) {
m_LastAllocationFailureReport = m_LastAllocationFailureTime;
}
}
m_PreviousTotal = total;
}
CResourceMonitor::SModelSizeStats
CResourceMonitor::createMemoryUsageReport(core_t::TTime bucketStartTime) {
SModelSizeStats res;
res.s_Usage = this->totalMemory();
res.s_AdjustedUsage = this->adjustedUsage(res.s_Usage);
res.s_PeakUsage = static_cast<std::size_t>(
core::CProgramCounters::counter(counter_t::E_TSADPeakMemoryUsage));
res.s_AdjustedPeakUsage = this->adjustedUsage(res.s_PeakUsage);
res.s_BytesMemoryLimit = this->persistenceMemoryIncreaseFactor() * m_ByteLimitHigh;
res.s_BytesExceeded = m_CurrentBytesExceeded;
res.s_MemoryStatus = m_MemoryStatus;
std::uint64_t assignmentMemoryBasis{
core::CProgramCounters::counter(counter_t::E_TSADAssignmentMemoryBasis)};
res.s_AssignmentMemoryBasis =
static_cast<model_t::EAssignmentMemoryBasis>(assignmentMemoryBasis);
res.s_BucketStartTime = bucketStartTime;
for (const auto& resource : m_Resources) {
resource.first->updateModelSizeStats(res);
}
res.s_AllocationFailures += m_AllocationFailuresCount;
res.s_OutputMemoryAllocatorUsage = static_cast<std::size_t>(
core::CProgramCounters::counter(counter_t::E_TSADOutputMemoryAllocatorUsage));
res.s_OverallCategorizerStats.s_MemoryCategorizationFailures += m_CategorizerAllocationFailures;
return res;
}
std::size_t CResourceMonitor::adjustedUsage(std::size_t usage) const {
// We scale the reported memory usage by the inverse of the byte limit margin.
// This gives the user a fairer indication of how close the job is to hitting
// the model memory limit in a concise manner (as the limit is scaled down by
// the margin during the beginning period of the job's existence).
std::size_t adjustedUsage{
static_cast<std::size_t>(static_cast<double>(usage) / m_ByteLimitMargin)};
adjustedUsage *= this->persistenceMemoryIncreaseFactor();
return adjustedUsage;
}
std::size_t CResourceMonitor::persistenceMemoryIncreaseFactor() const {
// Background persist causes the memory size to double due to copying
// the models. On top of that, after the persist is done we may not
// be able to retrieve that memory back. Thus, we report twice the
// memory usage in order to allow for that.
// See https://github.com/elastic/x-pack-elasticsearch/issues/1020.
// Issue https://github.com/elastic/x-pack-elasticsearch/issues/857
// discusses adding an option to perform only foreground persist.
// If that gets implemented, we should only double when background
// persist is configured.
return m_PersistenceInForeground ? 1 : 2;
}
void CResourceMonitor::acceptAllocationFailureResult(core_t::TTime time) {
m_MemoryStatus = model_t::E_MemoryStatusHardLimit;
if (time > m_LastAllocationFailureTime) {
m_AllocationFailuresCount += 1;
m_LastAllocationFailureTime = time;
}
}
void CResourceMonitor::startPruning() {
LOG_DEBUG(<< "Pruning started. Window (buckets): " << m_PruneWindow);
m_HasPruningStarted = true;
if (m_MemoryStatus == model_t::E_MemoryStatusOk) {
m_MemoryStatus = model_t::E_MemoryStatusSoftLimit;
}
}
void CResourceMonitor::endPruning() {
LOG_DEBUG(<< "Pruning no longer necessary.");
m_HasPruningStarted = false;
if (m_MemoryStatus == model_t::E_MemoryStatusSoftLimit) {
m_MemoryStatus = model_t::E_MemoryStatusOk;
}
}
bool CResourceMonitor::haveNoLimit() const {
return m_NoLimit;
}
void CResourceMonitor::addExtraMemory(std::size_t mem) {
m_ExtraMemory += mem;
this->updateAllowAllocations();
}
void CResourceMonitor::clearExtraMemory() {
if (m_ExtraMemory != 0) {
m_ExtraMemory = 0;
this->updateAllowAllocations();
}
}
void CResourceMonitor::decreaseMargin(core_t::TTime elapsedTime) {
// We choose to increase the margin to close to 1 on the order
// time it takes to detect diurnal periodic components. These
// will be the overwhelmingly common source of additional memory
// so the model memory should be accurate (on average) in this
// time frame.
double scale{1.0 - static_cast<double>(std::min(elapsedTime, MAXIMUM_BYTE_LIMIT_MARGIN_PERIOD)) /
static_cast<double>(core::constants::DAY)};
m_ByteLimitMargin = 1.0 - scale * (1.0 - m_ByteLimitMargin);
}
std::size_t CResourceMonitor::highLimit() const {
return static_cast<std::size_t>(m_ByteLimitMargin * static_cast<double>(m_ByteLimitHigh));
}
std::size_t CResourceMonitor::lowLimit() const {
return static_cast<std::size_t>(m_ByteLimitMargin * static_cast<double>(m_ByteLimitLow));
}
std::size_t CResourceMonitor::totalMemory() const {
return m_MonitoredResourceCurrentMemory + m_ExtraMemory +
static_cast<size_t>(core::CProgramCounters::counter(
counter_t::E_TSADOutputMemoryAllocatorUsage));
}
} // model
} // ml