in worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java [127:296]
private MemoryManager(CelebornConf conf, StorageManager storageManager, AbstractSource source) {
double pausePushDataRatio = conf.workerDirectMemoryRatioToPauseReceive();
double pauseReplicateRatio = conf.workerDirectMemoryRatioToPauseReplicate();
this.directMemoryResumeRatio = conf.workerDirectMemoryRatioToResume();
this.pinnedMemoryResumeRatio = conf.workerPinnedMemoryRatioToResume();
double maxSortMemRatio = conf.workerPartitionSorterDirectMemoryRatioThreshold();
double readBufferRatio = conf.workerDirectMemoryRatioForReadBuffer();
double memoryFileStorageRatio = conf.workerDirectMemoryRatioForMemoryFilesStorage();
long checkInterval = conf.workerDirectMemoryPressureCheckIntervalMs();
this.pinnedMemoryCheckEnabled = conf.workerPinnedMemoryCheckEnabled();
this.pinnedMemoryCheckInterval = conf.workerPinnedMemoryCheckIntervalMs();
this.workerPinnedMemoryResumeKeepTime = conf.workerPinnedMemoryResumeKeepTime();
long reportInterval = conf.workerDirectMemoryReportIntervalSecond();
double readBufferTargetRatio = conf.readBufferTargetRatio();
long readBufferTargetUpdateInterval = conf.readBufferTargetUpdateInterval();
long readBufferTargetNotifyThreshold = conf.readBufferTargetNotifyThreshold();
boolean aggressiveEvictModeEnabled = conf.workerMemoryFileStorageEictAggressiveModeEnabled();
double evictRatio = conf.workerMemoryFileStorageEvictRatio();
forceAppendPauseSpentTimeThreshold = conf.metricsWorkerForceAppendPauseSpentTimeThreshold();
maxDirectMemory =
DynMethods.builder("maxDirectMemory")
.impl("jdk.internal.misc.VM") // for Java 10 and above
.impl("sun.misc.VM") // for Java 9 and previous
.buildStatic()
.<Long>invoke();
Preconditions.checkArgument(maxDirectMemory > 0);
Preconditions.checkArgument(
pauseReplicateRatio > pausePushDataRatio,
String.format(
"Invalid config, %s(%s) should be greater than %s(%s)",
CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE().key(),
pauseReplicateRatio,
CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(),
pausePushDataRatio));
Preconditions.checkArgument(pausePushDataRatio > directMemoryResumeRatio);
if (memoryFileStorageRatio > 0) {
Preconditions.checkArgument(
directMemoryResumeRatio > (readBufferRatio + memoryFileStorageRatio));
}
maxSortMemory = ((long) (maxDirectMemory * maxSortMemRatio));
pausePushDataThreshold = (long) (maxDirectMemory * pausePushDataRatio);
pauseReplicateThreshold = (long) (maxDirectMemory * pauseReplicateRatio);
readBufferThreshold = (long) (maxDirectMemory * readBufferRatio);
readBufferTarget = (long) (readBufferThreshold * readBufferTargetRatio);
memoryFileStorageThreshold = (long) (maxDirectMemory * memoryFileStorageRatio);
checkService.scheduleWithFixedDelay(
() -> {
try {
switchServingState();
} catch (Exception e) {
logger.error("Memory tracker check error", e);
}
},
checkInterval,
checkInterval,
TimeUnit.MILLISECONDS);
reportService.scheduleWithFixedDelay(
() ->
logger.info(
"Direct memory usage: {}/{}, "
+ "disk buffer size: {}, "
+ "sort memory size: {}, "
+ "read buffer size: {}, "
+ "memory file storage size : {}",
Utils.bytesToString(getNettyUsedDirectMemory()),
Utils.bytesToString(maxDirectMemory),
Utils.bytesToString(diskBufferCounter.get()),
Utils.bytesToString(sortMemoryCounter.get()),
Utils.bytesToString(readBufferCounter.get()),
Utils.bytesToString(memoryFileStorageCounter.sum())),
reportInterval,
reportInterval,
TimeUnit.SECONDS);
if (readBufferThreshold > 0) {
// if read buffer threshold is zero means that there will be no map data partitions
readBufferDispatcher = new ReadBufferDispatcher(this, conf, source);
readBufferTargetUpdateService.scheduleWithFixedDelay(
() -> {
try {
if (creditStreamManager != null) {
int mapDataPartitionCount = creditStreamManager.getActiveMapPartitionCount();
if (mapDataPartitionCount > 0) {
long currentTarget =
(long) Math.ceil(readBufferTarget * 1.0 / mapDataPartitionCount);
if (Math.abs(lastNotifiedTarget - currentTarget)
> readBufferTargetNotifyThreshold) {
synchronized (readBufferTargetChangeListeners) {
logger.debug(
"read buffer target changed {} -> {} active map partition count {}",
lastNotifiedTarget,
currentTarget,
mapDataPartitionCount);
for (ReadBufferTargetChangeListener changeListener :
readBufferTargetChangeListeners) {
changeListener.onChange(currentTarget);
}
lastNotifiedTarget = currentTarget;
}
}
}
}
} catch (Exception e) {
logger.warn("Failed update buffer target", e);
}
},
readBufferTargetUpdateInterval,
readBufferTargetUpdateInterval,
TimeUnit.MILLISECONDS);
}
this.storageManager = storageManager;
if (memoryFileStorageThreshold > 0
&& storageManager != null
&& storageManager.localOrDfsStorageAvailable()) {
ScheduledExecutorService memoryFileStorageService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-file-storage-checker");
memoryFileStorageService.scheduleWithFixedDelay(
() -> {
try {
if (shouldEvict(aggressiveEvictModeEnabled, evictRatio)) {
List<PartitionDataWriter> memoryWriters =
new ArrayList<>(storageManager.memoryWriters().values());
if (memoryWriters.isEmpty()) {
return;
}
logger.info("Start evicting {} memory file infos", memoryWriters.size());
// always evict the largest memory file info first
memoryWriters.sort(
Comparator.comparingLong(o -> o.getMemoryFileInfo().getFileLength()));
Collections.reverse(memoryWriters);
for (PartitionDataWriter writer : memoryWriters) {
// this branch means that there is no memory pressure
if (!shouldEvict(aggressiveEvictModeEnabled, evictRatio)) {
break;
}
logger.debug("Evict writer {}", writer);
writer.evict(true);
}
}
} catch (Exception e) {
logger.error("Evict thread encounter error", e);
}
},
checkInterval,
checkInterval,
TimeUnit.MILLISECONDS);
}
logger.info(
"Memory tracker initialized with: "
+ "max direct memory: {}, pause push memory: {}, "
+ "pause replication memory: {}, "
+ "read buffer memory limit: {} target: {}, "
+ "memory shuffle storage limit: {}, "
+ "resume by direct memory ratio: {}, "
+ "resume by pinned memory ratio: {}",
Utils.bytesToString(maxDirectMemory),
Utils.bytesToString(pausePushDataThreshold),
Utils.bytesToString(pauseReplicateThreshold),
Utils.bytesToString(readBufferThreshold),
Utils.bytesToString(readBufferTarget),
Utils.bytesToString(memoryFileStorageThreshold),
directMemoryResumeRatio,
pinnedMemoryResumeRatio);
}