in worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java [102:245]
private MemoryManager(CelebornConf conf) {
double pausePushDataRatio = conf.workerDirectMemoryRatioToPauseReceive();
double pauseReplicateRatio = conf.workerDirectMemoryRatioToPauseReplicate();
double resumeRatio = conf.workerDirectMemoryRatioToResume();
double maxSortMemRatio = conf.partitionSorterDirectMemoryRatioThreshold();
double readBufferRatio = conf.workerDirectMemoryRatioForReadBuffer();
double shuffleStorageRatio = conf.workerDirectMemoryRatioForShuffleStorage();
long checkInterval = conf.workerDirectMemoryPressureCheckIntervalMs();
long reportInterval = conf.workerDirectMemoryReportIntervalSecond();
double readBufferTargetRatio = conf.readBufferTargetRatio();
long readBufferTargetUpdateInterval = conf.readBufferTargetUpdateInterval();
long readBufferTargetNotifyThreshold = conf.readBufferTargetNotifyThreshold();
maxDirectorMemory =
DynMethods.builder("maxDirectMemory")
.impl("jdk.internal.misc.VM") // for Java 10 and above
.impl("sun.misc.VM") // for Java 9 and previous
.buildStatic()
.<Long>invoke();
Preconditions.checkArgument(maxDirectorMemory > 0);
Preconditions.checkArgument(pauseReplicateRatio > pausePushDataRatio);
Preconditions.checkArgument(pausePushDataRatio > resumeRatio);
Preconditions.checkArgument(resumeRatio > (readBufferRatio + shuffleStorageRatio));
maxSortMemory = ((long) (maxDirectorMemory * maxSortMemRatio));
pausePushDataThreshold = (long) (maxDirectorMemory * pausePushDataRatio);
pauseReplicateThreshold = (long) (maxDirectorMemory * pauseReplicateRatio);
resumeThreshold = (long) (maxDirectorMemory * resumeRatio);
readBufferThreshold = (long) (maxDirectorMemory * readBufferRatio);
readBufferTarget = (long) (readBufferThreshold * readBufferTargetRatio);
memoryShuffleStorageThreshold = (long) (maxDirectorMemory * shuffleStorageRatio);
checkService.scheduleWithFixedDelay(
() -> {
try {
ServingState lastState = servingState;
servingState = currentServingState();
if (lastState != servingState) {
logger.info("Serving state transformed from {} to {}", lastState, servingState);
if (servingState == ServingState.PUSH_PAUSED) {
pausePushDataCounter.increment();
logger.info("Trigger action: PAUSE PUSH, RESUME REPLICATE");
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
trimAllListeners();
} else if (servingState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
pausePushDataAndReplicateCounter.increment();
logger.info("Trigger action: PAUSE PUSH and REPLICATE");
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
trimAllListeners();
} else {
logger.info("Trigger action: RESUME PUSH and REPLICATE");
memoryPressureListeners.forEach(
memoryPressureListener -> memoryPressureListener.onResume("all"));
}
} else {
if (servingState != ServingState.NONE_PAUSED) {
logger.debug("Trigger action: TRIM");
trimAllListeners();
}
}
} catch (Exception e) {
logger.error("Memory tracker check error", e);
}
},
checkInterval,
checkInterval,
TimeUnit.MILLISECONDS);
reportService.scheduleWithFixedDelay(
() ->
logger.info(
"Direct memory usage: {}/{}, disk buffer size: {}, sort memory size: {}, read buffer size: {}",
Utils.bytesToString(getNettyUsedDirectMemory()),
Utils.bytesToString(maxDirectorMemory),
Utils.bytesToString(diskBufferCounter.get()),
Utils.bytesToString(sortMemoryCounter.get()),
Utils.bytesToString(readBufferCounter.get())),
reportInterval,
reportInterval,
TimeUnit.SECONDS);
if (readBufferThreshold > 0) {
// if read buffer threshold is zero means that there will be no map data partitions
readBufferDispatcher = new ReadBufferDispatcher(this, conf);
readBufferTargetChangeListeners = new ArrayList<>();
readBufferTargetUpdateService.scheduleWithFixedDelay(
() -> {
try {
if (creditStreamManager != null) {
int mapDataPartitionCount = creditStreamManager.getActiveMapPartitionCount();
if (mapDataPartitionCount > 0) {
long currentTarget =
(long) Math.ceil(readBufferTarget * 1.0 / mapDataPartitionCount);
if (Math.abs(lastNotifiedTarget - currentTarget)
> readBufferTargetNotifyThreshold) {
synchronized (readBufferTargetChangeListeners) {
logger.debug(
"read buffer target changed {} -> {} active map partition count {}",
lastNotifiedTarget,
currentTarget,
mapDataPartitionCount);
for (ReadBufferTargetChangeListener changeListener :
readBufferTargetChangeListeners) {
changeListener.onChange(currentTarget);
}
lastNotifiedTarget = currentTarget;
}
}
}
}
} catch (Exception e) {
logger.warn("Failed update buffer target", e);
}
},
readBufferTargetUpdateInterval,
readBufferTargetUpdateInterval,
TimeUnit.MILLISECONDS);
}
logger.info(
"Memory tracker initialized with: "
+ "max direct memory: {}, pause push memory: {}, "
+ "pause replication memory: {}, resume memory: {}, "
+ "read buffer memory limit: {} target: {}, "
+ "memory shuffle storage limit: {}",
Utils.bytesToString(maxDirectorMemory),
Utils.bytesToString(pausePushDataThreshold),
Utils.bytesToString(pauseReplicateThreshold),
Utils.bytesToString(resumeThreshold),
Utils.bytesToString(readBufferThreshold),
Utils.bytesToString(readBufferTarget),
Utils.bytesToString(memoryShuffleStorageThreshold));
}