in storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java [677:771]
public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException {
if (super.isMemoryLimitViolated(withUpdatedLimits)) {
return true;
}
if (resourceIsolationManager.isResourceManaged()) {
// In the short term the goal is to not shoot anyone unless we really need to.
// The on heap should limit the memory usage in most cases to a reasonable amount
// If someone is using way more than they requested this is a bug and we should
// not allow it
long usageMb;
long memoryLimitMb;
long hardMemoryLimitOver;
String typeOfCheck;
if (withUpdatedLimits.is_set_total_node_shared()) {
//We need to do enforcement on a topology level, not a single worker level...
// Because in for cgroups each page in shared memory goes to the worker that touched it
// first. We may need to make this more plugable in the future and let the resource
// isolation manager tell us what to do
usageMb = getTotalTopologyMemoryUsed();
memoryLimitMb = getTotalTopologyMemoryReserved(withUpdatedLimits);
hardMemoryLimitOver = this.hardMemoryLimitOver * getTotalWorkersForThisTopology();
typeOfCheck = "TOPOLOGY " + topologyId;
} else {
usageMb = getMemoryUsageMb();
memoryLimitMb = this.memoryLimitMb;
hardMemoryLimitOver = this.hardMemoryLimitOver;
typeOfCheck = "WORKER " + workerId;
}
LOG.debug(
"Enforcing memory usage for {} with usage of {} out of {} total and a hard limit of {}",
typeOfCheck,
usageMb,
memoryLimitMb,
hardMemoryLimitOver);
if (usageMb <= 0) {
//Looks like usage might not be supported
return false;
}
long hardLimitMb = Math.max((long) (memoryLimitMb * hardMemoryLimitMultiplier), memoryLimitMb + hardMemoryLimitOver);
if (usageMb > hardLimitMb) {
LOG.warn(
"{} is using {} MB > adjusted hard limit {} MB", typeOfCheck, usageMb, hardLimitMb);
return true;
}
if (usageMb > memoryLimitMb) {
//For others using too much it is really a question of how much memory is free in the system
// to be use. If we cannot calculate it assume that it is bad
long systemFreeMemoryMb = 0;
try {
systemFreeMemoryMb = resourceIsolationManager.getSystemFreeMemoryMb();
} catch (IOException e) {
LOG.warn("Error trying to calculate free memory on the system {}", e);
}
LOG.debug("SYSTEM MEMORY FREE {} MB", systemFreeMemoryMb);
//If the system is low on memory we cannot be kind and need to shoot something
if (systemFreeMemoryMb <= lowMemoryThresholdMb) {
LOG.warn(
"{} is using {} MB > memory limit {} MB and system is low on memory {} free",
typeOfCheck,
usageMb,
memoryLimitMb,
systemFreeMemoryMb);
return true;
}
//If the system still has some free memory give them a grace period to
// drop back down.
if (systemFreeMemoryMb < mediumMemoryThresholdMb) {
if (memoryLimitExceededStart < 0) {
memoryLimitExceededStart = Time.currentTimeMillis();
} else {
long timeInViolation = Time.currentTimeMillis() - memoryLimitExceededStart;
if (timeInViolation > mediumMemoryGracePeriodMs) {
LOG.warn(
"{} is using {} MB > memory limit {} MB for {} seconds",
typeOfCheck,
usageMb,
memoryLimitMb,
timeInViolation / 1000);
return true;
}
}
} else {
//Otherwise don't bother them
LOG.debug("{} is using {} MB > memory limit {} MB", typeOfCheck, usageMb, memoryLimitMb);
memoryLimitExceededStart = -1;
}
} else {
memoryLimitExceededStart = -1;
}
}
return false;
}